redis_stream/
lib.rs

1//! Simple API for producing and consuming redis streams.
2//!
3//! # Basic usage:
4//!
5//! ```
6//! use redis_stream::consumer::{Consumer, ConsumerOpts, Message};
7//!
8//! let redis_url =
9//!   std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
10//!
11//! let mut redis = redis::Client::open(redis_url)
12//!   .expect("client")
13//!   .get_connection()
14//!   .expect("connection");
15//!
16//! // Message handler
17//! let handler = |_id: &str, message: &Message| {
18//!   // do something
19//!   Ok(())
20//! };
21//!
22//! // Consumer config
23//! let opts = ConsumerOpts::default();
24//! let mut consumer = Consumer::init(&mut redis, "my-stream", handler, opts).expect("consumer");
25//!
26//! // Consume some messages through handler.
27//! consumer.consume().expect("consume messages");
28//!
29//! // Clean up redis
30//! use redis::Commands;
31//! redis.del::<&str, bool>("my-stream").expect("del");
32//! ```
33//!
34//! # Consumer groups usage:
35//!
36//! ```
37//! use redis_stream::consumer::{Consumer, ConsumerOpts, Message};
38//!
39//! let redis_url =
40//!   std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
41//!
42//! let mut redis = redis::Client::open(redis_url)
43//!   .expect("client")
44//!   .get_connection()
45//!   .expect("connection");
46//!
47//! // Message handler
48//! let handler = |_id: &str, message: &Message| {
49//!   // do something
50//!   Ok(())
51//! };
52//!
53//! // Consumer config
54//! let opts = ConsumerOpts::default().group("my-group", "worker.1");
55//! let mut consumer = Consumer::init(&mut redis, "my-stream-2", handler, opts).unwrap();
56//!
57//! // Consume some messages through handler.
58//! consumer.consume().expect("consume messages");
59//!
60//! // Clean up redis
61//! use redis::Commands;
62//! redis.xgroup_destroy::<&str, &str, bool>("my-stream-2", "my-group").expect("xgroup destroy");
63//! redis.del::<&str, bool>("my-stream-2").expect("del");
64//! ```
65//!
66//! see:
67//!
68//! - [`ConsumerOpts`](types/struct.ConsumerOpts.html)
69//! - [`Consumer::init`](consumer/struct.Consumer.html#method.init)
70//! - [`Consumer::consume`](consumer/struct.Consumer.html#method.consume)
71//! - [`produce`](fn.produce.html)
72use anyhow::{Context, Result};
73use redis::{Commands, Connection};
74
75pub mod consumer;
76pub mod types;
77
78/// Produces a new message into a Redis stream.
79pub fn produce(
80  redis: &mut Connection,
81  stream: &str,
82  key_values: &[(&str, &str)],
83) -> Result<String> {
84  let id = redis
85    .xadd::<&str, &str, &str, &str, String>(stream, "*", key_values)
86    .context(format!(
87      "failed to run redis command:\n\
88       XADD {} * {}",
89      stream,
90      key_values
91        .iter()
92        .map(|(k, v)| format!("{} {}", k, v))
93        .collect::<Vec<String>>()
94        .join(" ")
95    ))?;
96  Ok(id)
97}
98
99#[cfg(test)]
100pub mod test_helpers {
101  use rand::distributions::Alphanumeric;
102  use rand::{thread_rng, Rng};
103  use redis::{Commands, Connection, RedisResult};
104
105  pub fn delete_stream(stream: &str) {
106    redis_connection().del::<&str, bool>(stream).unwrap();
107  }
108
109  pub fn key_exists(redis: &mut Connection, key: &str) -> bool {
110    let exists: RedisResult<bool> = redis.exists(key);
111    exists.unwrap()
112  }
113
114  pub fn redis_connection() -> Connection {
115    let redis_url =
116      std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
117    redis::Client::open(redis_url)
118      .expect("failed to open redis client")
119      .get_connection()
120      .expect("failed to get redis connection")
121  }
122
123  pub fn random_string(n: usize) -> String {
124    thread_rng()
125      .sample_iter(&Alphanumeric)
126      .take(n)
127      .map(char::from)
128      .collect()
129  }
130}
131
132#[cfg(test)]
133mod tests {
134  use super::*;
135  use crate::test_helpers::*;
136  use regex::Regex;
137
138  #[test]
139  fn test_produce() -> Result<()> {
140    let mut redis = redis_connection();
141
142    let key_values = &[("temperature", "31")];
143    let stream = &format!("test-stream-{}", random_string(25));
144    let id =
145      produce(&mut redis, stream, key_values).context("failed to produce entry to stream")?;
146    let re = Regex::new(r"^\d+-\d+$").unwrap();
147    assert!(re.is_match(&id), "{:?} doesn't match Regex: {:?}", id, re);
148    delete_stream(stream);
149
150    Ok(())
151  }
152}