redis_stream/lib.rs
1//! Simple API for producing and consuming redis streams.
2//!
3//! # Basic usage:
4//!
5//! ```
6//! use redis_stream::consumer::{Consumer, ConsumerOpts, Message};
7//!
8//! let redis_url =
9//! std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
10//!
11//! let mut redis = redis::Client::open(redis_url)
12//! .expect("client")
13//! .get_connection()
14//! .expect("connection");
15//!
16//! // Message handler
17//! let handler = |_id: &str, message: &Message| {
18//! // do something
19//! Ok(())
20//! };
21//!
22//! // Consumer config
23//! let opts = ConsumerOpts::default();
24//! let mut consumer = Consumer::init(&mut redis, "my-stream", handler, opts).expect("consumer");
25//!
26//! // Consume some messages through handler.
27//! consumer.consume().expect("consume messages");
28//!
29//! // Clean up redis
30//! use redis::Commands;
31//! redis.del::<&str, bool>("my-stream").expect("del");
32//! ```
33//!
34//! # Consumer groups usage:
35//!
36//! ```
37//! use redis_stream::consumer::{Consumer, ConsumerOpts, Message};
38//!
39//! let redis_url =
40//! std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
41//!
42//! let mut redis = redis::Client::open(redis_url)
43//! .expect("client")
44//! .get_connection()
45//! .expect("connection");
46//!
47//! // Message handler
48//! let handler = |_id: &str, message: &Message| {
49//! // do something
50//! Ok(())
51//! };
52//!
53//! // Consumer config
54//! let opts = ConsumerOpts::default().group("my-group", "worker.1");
55//! let mut consumer = Consumer::init(&mut redis, "my-stream-2", handler, opts).unwrap();
56//!
57//! // Consume some messages through handler.
58//! consumer.consume().expect("consume messages");
59//!
60//! // Clean up redis
61//! use redis::Commands;
62//! redis.xgroup_destroy::<&str, &str, bool>("my-stream-2", "my-group").expect("xgroup destroy");
63//! redis.del::<&str, bool>("my-stream-2").expect("del");
64//! ```
65//!
66//! see:
67//!
68//! - [`ConsumerOpts`](types/struct.ConsumerOpts.html)
69//! - [`Consumer::init`](consumer/struct.Consumer.html#method.init)
70//! - [`Consumer::consume`](consumer/struct.Consumer.html#method.consume)
71//! - [`produce`](fn.produce.html)
72use anyhow::{Context, Result};
73use redis::{Commands, Connection};
74
75pub mod consumer;
76pub mod types;
77
78/// Produces a new message into a Redis stream.
79pub fn produce(
80 redis: &mut Connection,
81 stream: &str,
82 key_values: &[(&str, &str)],
83) -> Result<String> {
84 let id = redis
85 .xadd::<&str, &str, &str, &str, String>(stream, "*", key_values)
86 .context(format!(
87 "failed to run redis command:\n\
88 XADD {} * {}",
89 stream,
90 key_values
91 .iter()
92 .map(|(k, v)| format!("{} {}", k, v))
93 .collect::<Vec<String>>()
94 .join(" ")
95 ))?;
96 Ok(id)
97}
98
99#[cfg(test)]
100pub mod test_helpers {
101 use rand::distributions::Alphanumeric;
102 use rand::{thread_rng, Rng};
103 use redis::{Commands, Connection, RedisResult};
104
105 pub fn delete_stream(stream: &str) {
106 redis_connection().del::<&str, bool>(stream).unwrap();
107 }
108
109 pub fn key_exists(redis: &mut Connection, key: &str) -> bool {
110 let exists: RedisResult<bool> = redis.exists(key);
111 exists.unwrap()
112 }
113
114 pub fn redis_connection() -> Connection {
115 let redis_url =
116 std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
117 redis::Client::open(redis_url)
118 .expect("failed to open redis client")
119 .get_connection()
120 .expect("failed to get redis connection")
121 }
122
123 pub fn random_string(n: usize) -> String {
124 thread_rng()
125 .sample_iter(&Alphanumeric)
126 .take(n)
127 .map(char::from)
128 .collect()
129 }
130}
131
132#[cfg(test)]
133mod tests {
134 use super::*;
135 use crate::test_helpers::*;
136 use regex::Regex;
137
138 #[test]
139 fn test_produce() -> Result<()> {
140 let mut redis = redis_connection();
141
142 let key_values = &[("temperature", "31")];
143 let stream = &format!("test-stream-{}", random_string(25));
144 let id =
145 produce(&mut redis, stream, key_values).context("failed to produce entry to stream")?;
146 let re = Regex::new(r"^\d+-\d+$").unwrap();
147 assert!(re.is_match(&id), "{:?} doesn't match Regex: {:?}", id, re);
148 delete_stream(stream);
149
150 Ok(())
151 }
152}