use anyhow::{Context, Result};
use redis::streams::{StreamReadOptions, StreamReadReply};
use redis::{Commands, Connection, RedisResult, Value};
use std::collections::HashMap;
pub use super::types::{ConsumerOpts, StartPosition};
pub type Message = HashMap<String, Value>;
pub struct Consumer<'a, F>
where
F: FnMut(&str, &Message) -> Result<()>,
{
pub count: Option<usize>,
pub group: Option<(String, String)>,
pub handled_messages: u32,
pub handler: F,
pub next_pos: String,
pub process_pending: bool,
pub redis: &'a mut Connection,
pub stream: String,
pub timeout: usize,
}
impl<'a, F> Consumer<'a, F>
where
F: FnMut(&str, &Message) -> Result<()>,
{
pub fn init(
redis: &'a mut Connection,
stream: &str,
handler: F,
opts: ConsumerOpts,
) -> Result<Self> {
let count = opts.count;
let timeout = opts.timeout;
let group = opts.group;
let create_stream_if_not_exists = opts.create_stream_if_not_exists;
let process_pending = opts.process_pending;
let start_pos = opts.start_pos;
let (group_create_pos, consumer_start_pos) = positions(&group, process_pending, start_pos);
if let Some((group_name, _)) = &group {
ensure_stream_and_group(
redis,
&stream,
group_name.as_ref(),
&group_create_pos.unwrap(),
create_stream_if_not_exists,
)?;
}
Ok(Consumer {
count,
group,
handled_messages: 0,
handler,
next_pos: consumer_start_pos,
process_pending,
redis,
stream: stream.to_string(),
timeout,
})
}
pub fn consume(&mut self) -> Result<()> {
let opts = if let Some((group_name, consumer_name)) = &self.group {
StreamReadOptions::default()
.group(group_name, consumer_name)
.block(self.timeout)
} else {
StreamReadOptions::default().block(self.timeout)
};
let stream_results: StreamReadReply =
self
.redis
.xread_options(&[&self.stream], &[&self.next_pos], opts)?;
if !stream_results.keys.is_empty() {
let stream = &stream_results.keys[0];
if self.group.is_some() && self.process_pending && stream.ids.is_empty() {
self.process_pending = false;
self.next_pos = String::from(">");
return self.consume();
} else {
for message in &stream.ids {
if self.next_pos != ">" {
self.next_pos = message.id.to_string();
}
let items = &message.map;
self.process_message(&message.id, items)?;
}
}
}
Ok(())
}
fn process_message(&mut self, id: &str, message: &Message) -> Result<()> {
(self.handler)(id, message)?;
self.handled_messages += 1;
if let Some((group_name, _)) = &self.group {
let _ack_count: i32 = self.redis.xack(&self.stream, group_name, &[id]).unwrap();
}
Ok(())
}
}
fn ensure_stream_and_group(
redis: &mut Connection,
stream: &str,
group_name: &str,
create_pos: &str,
create_stream_if_not_exists: bool,
) -> Result<()> {
let mut result: RedisResult<String> = if create_stream_if_not_exists {
redis.xgroup_create_mkstream(stream, group_name, create_pos)
} else {
redis.xgroup_create(stream, group_name, create_pos)
};
if let Err(err) = &result {
if err.to_string() == "BUSYGROUP: Consumer Group name already exists" {
result = Ok("OK".to_string());
}
}
result.context(format!(
"failed to run redis command:\n\
XGROUP CREATE {} {} {}{}",
stream,
group_name,
create_pos,
if create_stream_if_not_exists {
" MKSTREAM"
} else {
""
}
))?;
Ok(())
}
fn positions(
group_name: &Option<(String, String)>,
process_pending: bool,
start_pos: StartPosition,
) -> (Option<String>, String) {
use StartPosition::*;
let (group_create_position, consumer_start_position) =
match (group_name, process_pending, start_pos) {
(None, _, StartOfStream) => (None, String::from("0")),
(None, _, EndOfStream) => (None, String::from("$")),
(None, _, Other(id)) => (None, id),
(_, true, StartOfStream) => str_to_positions("0", "0"),
(_, true, EndOfStream) => str_to_positions("$", "0"),
(_, true, Other(id)) => (Some(id), String::from("0")),
(_, false, StartOfStream) => str_to_positions("0", ">"),
(_, false, EndOfStream) => str_to_positions("$", ">"),
(_, false, Other(id)) => (Some(id.clone()), id),
};
(group_create_position, consumer_start_position)
}
#[inline]
fn str_to_positions(a: &str, b: &str) -> (Option<String>, String) {
(Some(a.to_string()), b.to_string())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_helpers::*;
use anyhow::bail;
use redis::FromRedisValue;
fn delete_group(stream: &str, group: &str) {
redis_connection()
.xgroup_destroy::<&str, &str, bool>(stream, group)
.unwrap();
}
#[allow(clippy::unnecessary_wraps)]
fn print_message(_id: &str, message: &Message) -> Result<()> {
for (k, v) in message {
println!("{}: {}", k, String::from_redis_value(&v).unwrap());
}
Ok(())
}
#[test]
fn test_init_options() {
}
#[test]
fn test_init() {
let mut redis = redis_connection();
let mut redis_c = redis_connection();
let stream = &format!("test-stream-{}", random_string(25));
let group_name = &format!("test-group-{}", random_string(25));
let consumer_name = &format!("test-consumer-{}", random_string(25));
assert!(!key_exists(&mut redis, stream));
let opts = ConsumerOpts::default()
.create_stream_if_not_exists(true)
.group(group_name, consumer_name);
Consumer::init(&mut redis_c, &stream, print_message, opts).unwrap();
assert!(key_exists(&mut redis, stream));
let len: usize = redis.xlen(stream).unwrap();
assert_eq!(len, 0);
delete_group(stream, group_name);
delete_stream(stream);
assert!(!key_exists(&mut redis, stream));
let opts = ConsumerOpts::default()
.create_stream_if_not_exists(false)
.group(group_name, consumer_name);
assert!(Consumer::init(&mut redis_c, stream, print_message, opts).is_err());
assert!(!key_exists(&mut redis, stream));
}
#[test]
fn test_consume() {
use std::thread;
use std::time::Duration;
let group_name = &format!("test-group-{}", random_string(25));
let consumer_name = &format!("test-consumer-{}", random_string(25));
let stream = &format!("test-stream-{}", random_string(25));
let mut redis = redis_connection();
let mut redis_c = redis_connection();
crate::produce(&mut redis, stream, &[("key", "value_1")]).unwrap();
{
{
let mut messages = vec![];
let handler = |_id: &str, message: &Message| {
messages.push(message.clone());
Ok(())
};
let opts = ConsumerOpts::default().start_pos(StartPosition::StartOfStream);
let mut consumer = Consumer::init(&mut redis_c, stream, handler, opts).unwrap();
consumer.consume().unwrap();
let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
assert_eq!(value, "value_1".to_string());
}
{
let messages = &mut vec![];
let handler = |_id: &str, message: &Message| {
messages.push(message.clone());
Ok(())
};
let opts = ConsumerOpts::default().start_pos(StartPosition::EndOfStream);
let mut consumer = Consumer::init(&mut redis_c, stream, handler, opts).unwrap();
let stream_name = stream.clone();
let child = thread::spawn(move || {
thread::sleep(Duration::from_millis(500));
let mut redis = redis_connection();
crate::produce(&mut redis, &stream_name, &[("key", "value_2")]).unwrap();
});
consumer.consume().unwrap();
child.join().unwrap();
let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
assert_eq!(value, "value_2".to_string());
}
}
{
{
let mut messages = vec![];
let handler = |_id: &str, message: &Message| {
messages.push(message.clone());
bail!("I don't ack message");
};
let opts = ConsumerOpts::default()
.group(group_name, consumer_name)
.start_pos(StartPosition::EndOfStream)
.process_pending(true);
let mut consumer = Consumer::init(&mut redis_c, stream, handler, opts).unwrap();
let stream_name = stream.clone();
let child = thread::spawn(move || {
thread::sleep(Duration::from_millis(500));
let mut redis = redis_connection();
crate::produce(&mut redis, &stream_name, &[("key", "value_3")]).unwrap();
crate::produce(&mut redis, &stream_name, &[("key", "value_4")]).unwrap();
});
consumer.consume().unwrap_or(());
child.join().unwrap();
let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
assert_eq!(value, "value_3".to_string());
}
{
let mut messages = vec![];
let handler = |_id: &str, message: &Message| {
messages.push(message.clone());
bail!("I don't ack message");
};
let opts = ConsumerOpts::default()
.group(group_name, consumer_name)
.start_pos(StartPosition::EndOfStream)
.process_pending(true);
let mut consumer = Consumer::init(&mut redis_c, stream, handler, opts).unwrap();
consumer.consume().unwrap_or(());
let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
assert_eq!(value, "value_3".to_string());
}
{
let mut messages = vec![];
let handler = |_id: &str, message: &Message| {
messages.push(message.clone());
Ok(())
};
let opts = ConsumerOpts::default()
.group(group_name, consumer_name)
.start_pos(StartPosition::EndOfStream)
.process_pending(false);
let mut consumer = Consumer::init(&mut redis_c, stream, handler, opts).unwrap();
consumer.consume().unwrap();
let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
assert_eq!(value, "value_4".to_string());
}
{
let mut messages = vec![];
let handler = |_id: &str, message: &Message| {
messages.push(message.clone());
Ok(())
};
let opts = ConsumerOpts::default()
.group(group_name, consumer_name)
.start_pos(StartPosition::EndOfStream)
.process_pending(true);
let mut consumer = Consumer::init(&mut redis_c, stream, handler, opts).unwrap();
consumer.consume().unwrap();
let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
assert_eq!(value, "value_3".to_string());
let mut messages = vec![];
let handler = |_id: &str, message: &Message| {
messages.push(message.clone());
Ok(())
};
let opts = ConsumerOpts::default()
.group(group_name, consumer_name)
.start_pos(StartPosition::EndOfStream)
.process_pending(true);
let mut consumer = Consumer::init(&mut redis_c, stream, handler, opts).unwrap();
consumer.consume().unwrap();
assert!(messages.is_empty());
}
delete_group(stream, group_name);
{
let mut messages = vec![];
let handler = |_id: &str, message: &Message| {
messages.push(message.clone());
Ok(())
};
let opts = ConsumerOpts::default()
.group(group_name, consumer_name)
.start_pos(StartPosition::StartOfStream)
.process_pending(false);
let mut consumer = Consumer::init(&mut redis_c, stream, handler, opts).unwrap();
consumer.consume().unwrap();
let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
assert_eq!(value, "value_4".to_string());
let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
assert_eq!(value, "value_3".to_string());
let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
assert_eq!(value, "value_2".to_string());
let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
assert_eq!(value, "value_1".to_string());
delete_group(stream, group_name);
let mut messages = vec![];
let handler = |_id: &str, message: &Message| {
messages.push(message.clone());
Ok(())
};
let opts = ConsumerOpts::default()
.group(group_name, consumer_name)
.start_pos(StartPosition::StartOfStream)
.process_pending(true);
let mut consumer = Consumer::init(&mut redis_c, stream, handler, opts).unwrap();
consumer.consume().unwrap();
let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
assert_eq!(value, "value_4".to_string());
let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
assert_eq!(value, "value_3".to_string());
let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
assert_eq!(value, "value_2".to_string());
let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
assert_eq!(value, "value_1".to_string());
}
delete_group(stream, group_name);
{
let mut messages = vec![];
let handler = |_id: &str, message: &Message| {
messages.push(message.clone());
Ok(())
};
let opts = ConsumerOpts::default()
.group(group_name, consumer_name)
.start_pos(StartPosition::EndOfStream)
.process_pending(false);
let mut consumer = Consumer::init(&mut redis_c, stream, handler, opts).unwrap();
consumer.consume().unwrap();
assert!(messages.is_empty());
delete_group(stream, group_name);
let mut messages = vec![];
let handler = |_id: &str, message: &Message| {
messages.push(message.clone());
Ok(())
};
let opts = ConsumerOpts::default()
.group(group_name, consumer_name)
.start_pos(StartPosition::EndOfStream)
.process_pending(true);
let mut consumer = Consumer::init(&mut redis_c, stream, handler, opts).unwrap();
consumer.consume().unwrap();
assert!(messages.is_empty());
}
}
delete_group(stream, group_name);
delete_stream(stream);
}
#[test]
fn test_ensure_stream_and_group() -> Result<()> {
let mut redis = redis_connection();
delete_stream("test-stream");
ensure_stream_and_group(&mut redis, "test-stream", "test-group", "0", true)
.context("failed to produce entry to stream")?;
ensure_stream_and_group(&mut redis, "test-stream", "test-group", "0", true)
.context("failed to produce entry to stream")?;
Ok(())
}
}