1use anyhow::{Context, Result};
2use redis::streams::{StreamReadOptions, StreamReadReply};
3use redis::{Commands, Connection, RedisResult, Value};
4use std::collections::HashMap;
5
6pub use super::types::{ConsumerOpts, StartPosition};
7
8pub type Message = HashMap<String, Value>;
9pub struct Consumer<'a, F>
14where
15 F: FnMut(&str, &Message) -> Result<()>,
16{
17 pub count: Option<usize>,
18 pub group: Option<(String, String)>,
19 pub handled_messages: u32,
20 pub handler: F,
21 pub next_pos: String,
22 pub process_pending: bool,
23 pub redis: &'a mut Connection,
24 pub stream: String,
25 pub timeout: usize,
26}
27
28impl<'a, F> Consumer<'a, F>
29where
30 F: FnMut(&str, &Message) -> Result<()>,
31{
32 pub fn init(
34 redis: &'a mut Connection,
35 stream: &str,
36 handler: F,
37 opts: ConsumerOpts,
38 ) -> Result<Self> {
39 let count = opts.count;
40 let timeout = opts.timeout;
41 let group = opts.group;
42 let create_stream_if_not_exists = opts.create_stream_if_not_exists;
43 let process_pending = opts.process_pending;
44 let start_pos = opts.start_pos;
45
46 let (group_create_pos, consumer_start_pos) = positions(&group, process_pending, start_pos);
47
48 if let Some((group_name, _)) = &group {
49 ensure_stream_and_group(
50 redis,
51 &stream,
52 group_name.as_ref(),
53 &group_create_pos.unwrap(),
54 create_stream_if_not_exists,
55 )?;
56 }
57
58 Ok(Consumer {
59 count,
60 group,
61 handled_messages: 0,
62 handler,
63 next_pos: consumer_start_pos,
64 process_pending,
65 redis,
66 stream: stream.to_string(),
67 timeout,
68 })
69 }
70
71 pub fn consume(&mut self) -> Result<()> {
74 let opts = if let Some((group_name, consumer_name)) = &self.group {
76 StreamReadOptions::default()
79 .group(group_name, consumer_name)
80 .block(self.timeout)
81 } else {
82 StreamReadOptions::default().block(self.timeout)
85 };
86
87 let stream_results: StreamReadReply =
88 self
89 .redis
90 .xread_options(&[&self.stream], &[&self.next_pos], opts)?;
91
92 if !stream_results.keys.is_empty() {
93 let stream = &stream_results.keys[0];
94
95 if self.group.is_some() && self.process_pending && stream.ids.is_empty() {
96 self.process_pending = false;
99 self.next_pos = String::from(">");
100 return self.consume();
101 } else {
102 for message in &stream.ids {
104 if self.next_pos != ">" {
106 self.next_pos = message.id.to_string();
108 }
109 let items = &message.map;
110
111 self.process_message(&message.id, items)?;
112 }
113 }
114 }
115
116 Ok(())
117 }
118
119 fn process_message(&mut self, id: &str, message: &Message) -> Result<()> {
122 (self.handler)(id, message)?;
124 self.handled_messages += 1;
125 if let Some((group_name, _)) = &self.group {
127 let _ack_count: i32 = self.redis.xack(&self.stream, group_name, &[id]).unwrap();
128 }
129 Ok(())
130 }
131}
132
133fn ensure_stream_and_group(
137 redis: &mut Connection,
138 stream: &str,
139 group_name: &str,
140 create_pos: &str,
141 create_stream_if_not_exists: bool,
142) -> Result<()> {
143 let mut result: RedisResult<String> = if create_stream_if_not_exists {
144 redis.xgroup_create_mkstream(stream, group_name, create_pos)
145 } else {
146 redis.xgroup_create(stream, group_name, create_pos)
147 };
148
149 if let Err(err) = &result {
151 if err.to_string() == "BUSYGROUP: Consumer Group name already exists" {
152 result = Ok("OK".to_string());
153 }
154 }
155
156 result.context(format!(
157 "failed to run redis command:\n\
158 XGROUP CREATE {} {} {}{}",
159 stream,
160 group_name,
161 create_pos,
162 if create_stream_if_not_exists {
163 " MKSTREAM"
164 } else {
165 ""
166 }
167 ))?;
168
169 Ok(())
170}
171
172fn positions(
189 group_name: &Option<(String, String)>,
190 process_pending: bool,
191 start_pos: StartPosition,
192) -> (Option<String>, String) {
193 use StartPosition::*;
194 let (group_create_position, consumer_start_position) =
195 match (group_name, process_pending, start_pos) {
196 (None, _, StartOfStream) => (None, String::from("0")),
198 (None, _, EndOfStream) => (None, String::from("$")),
199 (None, _, Other(id)) => (None, id),
200 (_, true, StartOfStream) => str_to_positions("0", "0"),
202 (_, true, EndOfStream) => str_to_positions("$", "0"),
203 (_, true, Other(id)) => (Some(id), String::from("0")),
204 (_, false, StartOfStream) => str_to_positions("0", ">"),
206 (_, false, EndOfStream) => str_to_positions("$", ">"),
207 (_, false, Other(id)) => (Some(id.clone()), id),
208 };
209
210 (group_create_position, consumer_start_position)
211}
212
213#[inline]
215fn str_to_positions(a: &str, b: &str) -> (Option<String>, String) {
216 (Some(a.to_string()), b.to_string())
217}
218
219#[cfg(test)]
220mod tests {
221 use super::*;
222 use crate::test_helpers::*;
223 use anyhow::bail;
224 use redis::FromRedisValue;
225
226 fn delete_group(stream: &str, group: &str) {
227 redis_connection()
228 .xgroup_destroy::<&str, &str, bool>(stream, group)
229 .unwrap();
230 }
231
232 #[allow(clippy::unnecessary_wraps)]
233 fn print_message(_id: &str, message: &Message) -> Result<()> {
234 for (k, v) in message {
235 println!("{}: {}", k, String::from_redis_value(&v).unwrap());
236 }
237 Ok(())
238 }
239
240 #[test]
241 fn test_init_options() {
242 }
247
248 #[test]
249 fn test_init() {
250 let mut redis = redis_connection();
251 let mut redis_c = redis_connection();
252 let stream = &format!("test-stream-{}", random_string(25));
253 let group_name = &format!("test-group-{}", random_string(25));
254 let consumer_name = &format!("test-consumer-{}", random_string(25));
255
256 assert!(!key_exists(&mut redis, stream));
258
259 let opts = ConsumerOpts::default()
260 .create_stream_if_not_exists(true)
261 .group(group_name, consumer_name);
262 Consumer::init(&mut redis_c, &stream, print_message, opts).unwrap();
263 assert!(key_exists(&mut redis, stream));
264 let len: usize = redis.xlen(stream).unwrap();
266 assert_eq!(len, 0);
267
268 delete_group(stream, group_name);
269 delete_stream(stream);
270
271 assert!(!key_exists(&mut redis, stream));
273 let opts = ConsumerOpts::default()
274 .create_stream_if_not_exists(false)
275 .group(group_name, consumer_name);
276 assert!(Consumer::init(&mut redis_c, stream, print_message, opts).is_err());
277 assert!(!key_exists(&mut redis, stream));
278 }
279
280 #[test]
281 fn test_consume() {
282 use std::thread;
283 use std::time::Duration;
284
285 let group_name = &format!("test-group-{}", random_string(25));
286 let consumer_name = &format!("test-consumer-{}", random_string(25));
287 let stream = &format!("test-stream-{}", random_string(25));
288 let mut redis = redis_connection();
289 let mut redis_c = redis_connection();
290
291 crate::produce(&mut redis, stream, &[("key", "value_1")]).unwrap();
292
293 {
295 {
297 let mut messages = vec![];
298 let handler = |_id: &str, message: &Message| {
299 messages.push(message.clone());
300 Ok(())
301 };
302 let opts = ConsumerOpts::default().start_pos(StartPosition::StartOfStream);
303 let mut consumer = Consumer::init(&mut redis_c, stream, handler, opts).unwrap();
304
305 consumer.consume().unwrap();
306 let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
307 assert_eq!(value, "value_1".to_string());
308 }
309
310 {
312 let messages = &mut vec![];
313 let handler = |_id: &str, message: &Message| {
314 messages.push(message.clone());
315 Ok(())
316 };
317 let opts = ConsumerOpts::default().start_pos(StartPosition::EndOfStream);
318 let mut consumer = Consumer::init(&mut redis_c, stream, handler, opts).unwrap();
319 let stream_name = stream.clone();
320 let child = thread::spawn(move || {
321 thread::sleep(Duration::from_millis(500));
323 let mut redis = redis_connection();
324 crate::produce(&mut redis, &stream_name, &[("key", "value_2")]).unwrap();
325 });
326
327 consumer.consume().unwrap();
328 child.join().unwrap();
329 let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
330 assert_eq!(value, "value_2".to_string());
331 }
332 }
333
334 {
336 {
338 let mut messages = vec![];
339 let handler = |_id: &str, message: &Message| {
340 messages.push(message.clone());
341 bail!("I don't ack message");
342 };
343 let opts = ConsumerOpts::default()
344 .group(group_name, consumer_name)
345 .start_pos(StartPosition::EndOfStream)
346 .process_pending(true);
347 let mut consumer = Consumer::init(&mut redis_c, stream, handler, opts).unwrap();
348 let stream_name = stream.clone();
349 let child = thread::spawn(move || {
350 thread::sleep(Duration::from_millis(500));
352 let mut redis = redis_connection();
353 crate::produce(&mut redis, &stream_name, &[("key", "value_3")]).unwrap();
354 crate::produce(&mut redis, &stream_name, &[("key", "value_4")]).unwrap();
355 });
356
357 consumer.consume().unwrap_or(());
359 child.join().unwrap();
360 let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
361 assert_eq!(value, "value_3".to_string());
362 }
363
364 {
366 let mut messages = vec![];
367 let handler = |_id: &str, message: &Message| {
368 messages.push(message.clone());
369 bail!("I don't ack message");
370 };
371 let opts = ConsumerOpts::default()
372 .group(group_name, consumer_name)
373 .start_pos(StartPosition::EndOfStream)
374 .process_pending(true);
375 let mut consumer = Consumer::init(&mut redis_c, stream, handler, opts).unwrap();
376 consumer.consume().unwrap_or(());
378 let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
379 assert_eq!(value, "value_3".to_string());
380 }
381
382 {
384 let mut messages = vec![];
385 let handler = |_id: &str, message: &Message| {
386 messages.push(message.clone());
387 Ok(())
388 };
389 let opts = ConsumerOpts::default()
390 .group(group_name, consumer_name)
391 .start_pos(StartPosition::EndOfStream)
392 .process_pending(false);
393 let mut consumer = Consumer::init(&mut redis_c, stream, handler, opts).unwrap();
394 consumer.consume().unwrap();
395 let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
396 assert_eq!(value, "value_4".to_string());
397 }
398
399 {
401 let mut messages = vec![];
402 let handler = |_id: &str, message: &Message| {
403 messages.push(message.clone());
404 Ok(())
405 };
406 let opts = ConsumerOpts::default()
407 .group(group_name, consumer_name)
408 .start_pos(StartPosition::EndOfStream)
409 .process_pending(true);
410 let mut consumer = Consumer::init(&mut redis_c, stream, handler, opts).unwrap();
411 consumer.consume().unwrap();
412 let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
413 assert_eq!(value, "value_3".to_string());
414
415 let mut messages = vec![];
416 let handler = |_id: &str, message: &Message| {
417 messages.push(message.clone());
418 Ok(())
419 };
420 let opts = ConsumerOpts::default()
421 .group(group_name, consumer_name)
422 .start_pos(StartPosition::EndOfStream)
423 .process_pending(true);
424 let mut consumer = Consumer::init(&mut redis_c, stream, handler, opts).unwrap();
425 consumer.consume().unwrap();
426 assert!(messages.is_empty());
427 }
428
429 delete_group(stream, group_name);
430
431 {
433 let mut messages = vec![];
435 let handler = |_id: &str, message: &Message| {
436 messages.push(message.clone());
437 Ok(())
438 };
439 let opts = ConsumerOpts::default()
440 .group(group_name, consumer_name)
441 .start_pos(StartPosition::StartOfStream)
442 .process_pending(false);
443 let mut consumer = Consumer::init(&mut redis_c, stream, handler, opts).unwrap();
444 consumer.consume().unwrap();
445 let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
446 assert_eq!(value, "value_4".to_string());
447 let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
448 assert_eq!(value, "value_3".to_string());
449 let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
450 assert_eq!(value, "value_2".to_string());
451 let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
452 assert_eq!(value, "value_1".to_string());
453
454 delete_group(stream, group_name);
455
456 let mut messages = vec![];
458 let handler = |_id: &str, message: &Message| {
459 messages.push(message.clone());
460 Ok(())
461 };
462 let opts = ConsumerOpts::default()
463 .group(group_name, consumer_name)
464 .start_pos(StartPosition::StartOfStream)
465 .process_pending(true);
466 let mut consumer = Consumer::init(&mut redis_c, stream, handler, opts).unwrap();
467 consumer.consume().unwrap();
468 let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
469 assert_eq!(value, "value_4".to_string());
470 let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
471 assert_eq!(value, "value_3".to_string());
472 let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
473 assert_eq!(value, "value_2".to_string());
474 let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
475 assert_eq!(value, "value_1".to_string());
476 }
477
478 delete_group(stream, group_name);
479
480 {
482 let mut messages = vec![];
484 let handler = |_id: &str, message: &Message| {
485 messages.push(message.clone());
486 Ok(())
487 };
488 let opts = ConsumerOpts::default()
489 .group(group_name, consumer_name)
490 .start_pos(StartPosition::EndOfStream)
491 .process_pending(false);
492 let mut consumer = Consumer::init(&mut redis_c, stream, handler, opts).unwrap();
493 consumer.consume().unwrap();
494 assert!(messages.is_empty());
495
496 delete_group(stream, group_name);
497
498 let mut messages = vec![];
500 let handler = |_id: &str, message: &Message| {
501 messages.push(message.clone());
502 Ok(())
503 };
504 let opts = ConsumerOpts::default()
505 .group(group_name, consumer_name)
506 .start_pos(StartPosition::EndOfStream)
507 .process_pending(true);
508 let mut consumer = Consumer::init(&mut redis_c, stream, handler, opts).unwrap();
509 consumer.consume().unwrap();
510 assert!(messages.is_empty());
511 }
512 }
513
514 delete_group(stream, group_name);
515 delete_stream(stream);
516 }
517
518 #[test]
524 fn test_ensure_stream_and_group() -> Result<()> {
525 let mut redis = redis_connection();
526
527 delete_stream("test-stream");
528 ensure_stream_and_group(&mut redis, "test-stream", "test-group", "0", true)
529 .context("failed to produce entry to stream")?;
530 ensure_stream_and_group(&mut redis, "test-stream", "test-group", "0", true)
531 .context("failed to produce entry to stream")?;
532
533 Ok(())
534 }
535}