Expand description
Simple API for producing and consuming redis streams.
§Basic usage:
use redis_stream::consumer::{Consumer, ConsumerOpts, Message};
let redis_url =
std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
let mut redis = redis::Client::open(redis_url)
.expect("client")
.get_connection()
.expect("connection");
// Message handler
let handler = |_id: &str, message: &Message| {
// do something
Ok(())
};
// Consumer config
let opts = ConsumerOpts::default();
let mut consumer = Consumer::init(&mut redis, "my-stream", handler, opts).expect("consumer");
// Consume some messages through handler.
consumer.consume().expect("consume messages");
// Clean up redis
use redis::Commands;
redis.del::<&str, bool>("my-stream").expect("del");
§Consumer groups usage:
use redis_stream::consumer::{Consumer, ConsumerOpts, Message};
let redis_url =
std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
let mut redis = redis::Client::open(redis_url)
.expect("client")
.get_connection()
.expect("connection");
// Message handler
let handler = |_id: &str, message: &Message| {
// do something
Ok(())
};
// Consumer config
let opts = ConsumerOpts::default().group("my-group", "worker.1");
let mut consumer = Consumer::init(&mut redis, "my-stream-2", handler, opts).unwrap();
// Consume some messages through handler.
consumer.consume().expect("consume messages");
// Clean up redis
use redis::Commands;
redis.xgroup_destroy::<&str, &str, bool>("my-stream-2", "my-group").expect("xgroup destroy");
redis.del::<&str, bool>("my-stream-2").expect("del");
see:
Modules§
- Defines types to use with the consumer commands.
Functions§
- Produces a new message into a Redis stream.