Crate redis_stream

Source
Expand description

Simple API for producing and consuming redis streams.

§Basic usage:

use redis_stream::consumer::{Consumer, ConsumerOpts, Message};

let redis_url =
  std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());

let mut redis = redis::Client::open(redis_url)
  .expect("client")
  .get_connection()
  .expect("connection");

// Message handler
let handler = |_id: &str, message: &Message| {
  // do something
  Ok(())
};

// Consumer config
let opts = ConsumerOpts::default();
let mut consumer = Consumer::init(&mut redis, "my-stream", handler, opts).expect("consumer");

// Consume some messages through handler.
consumer.consume().expect("consume messages");

// Clean up redis
use redis::Commands;
redis.del::<&str, bool>("my-stream").expect("del");

§Consumer groups usage:

use redis_stream::consumer::{Consumer, ConsumerOpts, Message};

let redis_url =
  std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());

let mut redis = redis::Client::open(redis_url)
  .expect("client")
  .get_connection()
  .expect("connection");

// Message handler
let handler = |_id: &str, message: &Message| {
  // do something
  Ok(())
};

// Consumer config
let opts = ConsumerOpts::default().group("my-group", "worker.1");
let mut consumer = Consumer::init(&mut redis, "my-stream-2", handler, opts).unwrap();

// Consume some messages through handler.
consumer.consume().expect("consume messages");

// Clean up redis
use redis::Commands;
redis.xgroup_destroy::<&str, &str, bool>("my-stream-2", "my-group").expect("xgroup destroy");
redis.del::<&str, bool>("my-stream-2").expect("del");

see:

Modules§

Functions§

  • Produces a new message into a Redis stream.