redis_stream/
consumer.rs

1use anyhow::{Context, Result};
2use redis::streams::{StreamReadOptions, StreamReadReply};
3use redis::{Commands, Connection, RedisResult, Value};
4use std::collections::HashMap;
5
6pub use super::types::{ConsumerOpts, StartPosition};
7
8pub type Message = HashMap<String, Value>;
9// pub type MessageHandler = Fn(&mut Connection, &str, &Message) -> Result<()>;
10
11// A Consumer or Group Consumer handling connection to Redis and able to consume
12// messages.
13pub struct Consumer<'a, F>
14where
15  F: FnMut(&str, &Message) -> Result<()>,
16{
17  pub count: Option<usize>,
18  pub group: Option<(String, String)>,
19  pub handled_messages: u32,
20  pub handler: F,
21  pub next_pos: String,
22  pub process_pending: bool,
23  pub redis: &'a mut Connection,
24  pub stream: String,
25  pub timeout: usize,
26}
27
28impl<'a, F> Consumer<'a, F>
29where
30  F: FnMut(&str, &Message) -> Result<()>,
31{
32  /// Initializes a new `stream::Consumer`.
33  pub fn init(
34    redis: &'a mut Connection,
35    stream: &str,
36    handler: F,
37    opts: ConsumerOpts,
38  ) -> Result<Self> {
39    let count = opts.count;
40    let timeout = opts.timeout;
41    let group = opts.group;
42    let create_stream_if_not_exists = opts.create_stream_if_not_exists;
43    let process_pending = opts.process_pending;
44    let start_pos = opts.start_pos;
45
46    let (group_create_pos, consumer_start_pos) = positions(&group, process_pending, start_pos);
47
48    if let Some((group_name, _)) = &group {
49      ensure_stream_and_group(
50        redis,
51        &stream,
52        group_name.as_ref(),
53        &group_create_pos.unwrap(),
54        create_stream_if_not_exists,
55      )?;
56    }
57
58    Ok(Consumer {
59      count,
60      group,
61      handled_messages: 0,
62      handler,
63      next_pos: consumer_start_pos,
64      process_pending,
65      redis,
66      stream: stream.to_string(),
67      timeout,
68    })
69  }
70
71  /// Handle new messages from the stream, and dispatch them to the registered
72  /// handler.
73  pub fn consume(&mut self) -> Result<()> {
74    // Prepare options for XREAD
75    let opts = if let Some((group_name, consumer_name)) = &self.group {
76      // We have a consumer group
77      // XREADGROUP GROUP <group_name> <consumer_name> BLOCK <timeout> STREAMS <stream> <start_pos>
78      StreamReadOptions::default()
79        .group(group_name, consumer_name)
80        .block(self.timeout)
81    } else {
82      // We have a simple consumer
83      // XREAD BLOCK <timeout> STREAMS <stream> <start_pos>
84      StreamReadOptions::default().block(self.timeout)
85    };
86
87    let stream_results: StreamReadReply =
88      self
89        .redis
90        .xread_options(&[&self.stream], &[&self.next_pos], opts)?;
91
92    if !stream_results.keys.is_empty() {
93      let stream = &stream_results.keys[0];
94
95      if self.group.is_some() && self.process_pending && stream.ids.is_empty() {
96        // We ran out of pending results, let's switch to processing most
97        // recent.
98        self.process_pending = false;
99        self.next_pos = String::from(">");
100        return self.consume();
101      } else {
102        // Process the results and set the next position to consume from
103        for message in &stream.ids {
104          // Keep next_post if we are in a consumer-group and it's already `>`
105          if self.next_pos != ">" {
106            // or take the last id
107            self.next_pos = message.id.to_string();
108          }
109          let items = &message.map;
110
111          self.process_message(&message.id, items)?;
112        }
113      }
114    }
115
116    Ok(())
117  }
118
119  /// Process a message by calling the handler and acknowledging the message-id
120  /// to Redis if necessary.
121  fn process_message(&mut self, id: &str, message: &Message) -> Result<()> {
122    // Call handler
123    (self.handler)(id, message)?;
124    self.handled_messages += 1;
125    // XACK if needed
126    if let Some((group_name, _)) = &self.group {
127      let _ack_count: i32 = self.redis.xack(&self.stream, group_name, &[id]).unwrap();
128    }
129    Ok(())
130  }
131}
132
133// Helpers
134
135/// Create Stream and Consumer-Group if required.
136fn ensure_stream_and_group(
137  redis: &mut Connection,
138  stream: &str,
139  group_name: &str,
140  create_pos: &str,
141  create_stream_if_not_exists: bool,
142) -> Result<()> {
143  let mut result: RedisResult<String> = if create_stream_if_not_exists {
144    redis.xgroup_create_mkstream(stream, group_name, create_pos)
145  } else {
146    redis.xgroup_create(stream, group_name, create_pos)
147  };
148
149  // Ignore BUSYGROUP errors, it means the group already exists, which is fine.
150  if let Err(err) = &result {
151    if err.to_string() == "BUSYGROUP: Consumer Group name already exists" {
152      result = Ok("OK".to_string());
153    }
154  }
155
156  result.context(format!(
157    "failed to run redis command:\n\
158     XGROUP CREATE {} {} {}{}",
159    stream,
160    group_name,
161    create_pos,
162    if create_stream_if_not_exists {
163      " MKSTREAM"
164    } else {
165      ""
166    }
167  ))?;
168
169  Ok(())
170}
171
172/// Returns the tuple (`group_create_position`, `consumer_start_position`)
173/// containing position args for `XGROUP CREATE`, `XREADGROUP` or `XREAD`:
174/// - `group_create_pos`: Position to start in the stream if group is upserted
175///   - for consumer-group:
176///     - `0` for the beginning of the stream
177///     - `$` for the end of the stream (new messages only)
178///     - `<id>` for a specific id
179/// - `start_pos`: Position to start in the stream for consumer
180///   - for groups-consumers:
181///     - `0` for pending messages
182///     - `>` for new messages
183///     - `<id>` for a specific id
184///   - for stream-consumers:
185///     - `0` for the beginning of the stream
186///     - `$` for the end of the stream
187///     - `<id>` for a specific id
188fn positions(
189  group_name: &Option<(String, String)>,
190  process_pending: bool,
191  start_pos: StartPosition,
192) -> (Option<String>, String) {
193  use StartPosition::*;
194  let (group_create_position, consumer_start_position) =
195    match (group_name, process_pending, start_pos) {
196      // no group name: we'll simply XREAD starting from beginning or end
197      (None, _, StartOfStream) => (None, String::from("0")),
198      (None, _, EndOfStream) => (None, String::from("$")),
199      (None, _, Other(id)) => (None, id),
200      // group name and process pending:
201      (_, true, StartOfStream) => str_to_positions("0", "0"),
202      (_, true, EndOfStream) => str_to_positions("$", "0"),
203      (_, true, Other(id)) => (Some(id), String::from("0")),
204      // group name and don't process pending
205      (_, false, StartOfStream) => str_to_positions("0", ">"),
206      (_, false, EndOfStream) => str_to_positions("$", ">"),
207      (_, false, Other(id)) => (Some(id.clone()), id),
208    };
209
210  (group_create_position, consumer_start_position)
211}
212
213// mainly converts &str to Strings...
214#[inline]
215fn str_to_positions(a: &str, b: &str) -> (Option<String>, String) {
216  (Some(a.to_string()), b.to_string())
217}
218
219#[cfg(test)]
220mod tests {
221  use super::*;
222  use crate::test_helpers::*;
223  use anyhow::bail;
224  use redis::FromRedisValue;
225
226  fn delete_group(stream: &str, group: &str) {
227    redis_connection()
228      .xgroup_destroy::<&str, &str, bool>(stream, group)
229      .unwrap();
230  }
231
232  #[allow(clippy::unnecessary_wraps)]
233  fn print_message(_id: &str, message: &Message) -> Result<()> {
234    for (k, v) in message {
235      println!("{}: {}", k, String::from_redis_value(&v).unwrap());
236    }
237    Ok(())
238  }
239
240  #[test]
241  fn test_init_options() {
242    // TODO
243    // - Test with default values
244    // - Test with custom values
245    // - Test guards (group_name or consumer_name)
246  }
247
248  #[test]
249  fn test_init() {
250    let mut redis = redis_connection();
251    let mut redis_c = redis_connection();
252    let stream = &format!("test-stream-{}", random_string(25));
253    let group_name = &format!("test-group-{}", random_string(25));
254    let consumer_name = &format!("test-consumer-{}", random_string(25));
255
256    // it creates an empty stream (if opt)
257    assert!(!key_exists(&mut redis, stream));
258
259    let opts = ConsumerOpts::default()
260      .create_stream_if_not_exists(true)
261      .group(group_name, consumer_name);
262    Consumer::init(&mut redis_c, &stream, print_message, opts).unwrap();
263    assert!(key_exists(&mut redis, stream));
264    // with length = 0
265    let len: usize = redis.xlen(stream).unwrap();
266    assert_eq!(len, 0);
267
268    delete_group(stream, group_name);
269    delete_stream(stream);
270
271    // it doesn't create an empty stream (if !opt)
272    assert!(!key_exists(&mut redis, stream));
273    let opts = ConsumerOpts::default()
274      .create_stream_if_not_exists(false)
275      .group(group_name, consumer_name);
276    assert!(Consumer::init(&mut redis_c, stream, print_message, opts).is_err());
277    assert!(!key_exists(&mut redis, stream));
278  }
279
280  #[test]
281  fn test_consume() {
282    use std::thread;
283    use std::time::Duration;
284
285    let group_name = &format!("test-group-{}", random_string(25));
286    let consumer_name = &format!("test-consumer-{}", random_string(25));
287    let stream = &format!("test-stream-{}", random_string(25));
288    let mut redis = redis_connection();
289    let mut redis_c = redis_connection();
290
291    crate::produce(&mut redis, stream, &[("key", "value_1")]).unwrap();
292
293    // simple consumers
294    {
295      // it processes old messages if StartOfStream
296      {
297        let mut messages = vec![];
298        let handler = |_id: &str, message: &Message| {
299          messages.push(message.clone());
300          Ok(())
301        };
302        let opts = ConsumerOpts::default().start_pos(StartPosition::StartOfStream);
303        let mut consumer = Consumer::init(&mut redis_c, stream, handler, opts).unwrap();
304
305        consumer.consume().unwrap();
306        let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
307        assert_eq!(value, "value_1".to_string());
308      }
309
310      // it skips old messages if EndOfStream
311      {
312        let messages = &mut vec![];
313        let handler = |_id: &str, message: &Message| {
314          messages.push(message.clone());
315          Ok(())
316        };
317        let opts = ConsumerOpts::default().start_pos(StartPosition::EndOfStream);
318        let mut consumer = Consumer::init(&mut redis_c, stream, handler, opts).unwrap();
319        let stream_name = stream.clone();
320        let child = thread::spawn(move || {
321          // allow consumer time to call consume
322          thread::sleep(Duration::from_millis(500));
323          let mut redis = redis_connection();
324          crate::produce(&mut redis, &stream_name, &[("key", "value_2")]).unwrap();
325        });
326
327        consumer.consume().unwrap();
328        child.join().unwrap();
329        let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
330        assert_eq!(value, "value_2".to_string());
331      }
332    }
333
334    // consumer groups
335    {
336      // it skips old messages if EndOfStream
337      {
338        let mut messages = vec![];
339        let handler = |_id: &str, message: &Message| {
340          messages.push(message.clone());
341          bail!("I don't ack message");
342        };
343        let opts = ConsumerOpts::default()
344          .group(group_name, consumer_name)
345          .start_pos(StartPosition::EndOfStream)
346          .process_pending(true);
347        let mut consumer = Consumer::init(&mut redis_c, stream, handler, opts).unwrap();
348        let stream_name = stream.clone();
349        let child = thread::spawn(move || {
350          // allow consumer time to call consume
351          thread::sleep(Duration::from_millis(500));
352          let mut redis = redis_connection();
353          crate::produce(&mut redis, &stream_name, &[("key", "value_3")]).unwrap();
354          crate::produce(&mut redis, &stream_name, &[("key", "value_4")]).unwrap();
355        });
356
357        // skip the error so we can check for pending messages in next test
358        consumer.consume().unwrap_or(());
359        child.join().unwrap();
360        let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
361        assert_eq!(value, "value_3".to_string());
362      }
363
364      // it processes pending messages if process pending is true
365      {
366        let mut messages = vec![];
367        let handler = |_id: &str, message: &Message| {
368          messages.push(message.clone());
369          bail!("I don't ack message");
370        };
371        let opts = ConsumerOpts::default()
372          .group(group_name, consumer_name)
373          .start_pos(StartPosition::EndOfStream)
374          .process_pending(true);
375        let mut consumer = Consumer::init(&mut redis_c, stream, handler, opts).unwrap();
376        // skip the error so we can check pending messages are skipped in next test
377        consumer.consume().unwrap_or(());
378        let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
379        assert_eq!(value, "value_3".to_string());
380      }
381
382      // it skips pending messages if process_pending is false
383      {
384        let mut messages = vec![];
385        let handler = |_id: &str, message: &Message| {
386          messages.push(message.clone());
387          Ok(())
388        };
389        let opts = ConsumerOpts::default()
390          .group(group_name, consumer_name)
391          .start_pos(StartPosition::EndOfStream)
392          .process_pending(false);
393        let mut consumer = Consumer::init(&mut redis_c, stream, handler, opts).unwrap();
394        consumer.consume().unwrap();
395        let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
396        assert_eq!(value, "value_4".to_string());
397      }
398
399      // it ack messages
400      {
401        let mut messages = vec![];
402        let handler = |_id: &str, message: &Message| {
403          messages.push(message.clone());
404          Ok(())
405        };
406        let opts = ConsumerOpts::default()
407          .group(group_name, consumer_name)
408          .start_pos(StartPosition::EndOfStream)
409          .process_pending(true);
410        let mut consumer = Consumer::init(&mut redis_c, stream, handler, opts).unwrap();
411        consumer.consume().unwrap();
412        let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
413        assert_eq!(value, "value_3".to_string());
414
415        let mut messages = vec![];
416        let handler = |_id: &str, message: &Message| {
417          messages.push(message.clone());
418          Ok(())
419        };
420        let opts = ConsumerOpts::default()
421          .group(group_name, consumer_name)
422          .start_pos(StartPosition::EndOfStream)
423          .process_pending(true);
424        let mut consumer = Consumer::init(&mut redis_c, stream, handler, opts).unwrap();
425        consumer.consume().unwrap();
426        assert!(messages.is_empty());
427      }
428
429      delete_group(stream, group_name);
430
431      // it processes old messages if StartOfStream
432      {
433        // when process_pending is false
434        let mut messages = vec![];
435        let handler = |_id: &str, message: &Message| {
436          messages.push(message.clone());
437          Ok(())
438        };
439        let opts = ConsumerOpts::default()
440          .group(group_name, consumer_name)
441          .start_pos(StartPosition::StartOfStream)
442          .process_pending(false);
443        let mut consumer = Consumer::init(&mut redis_c, stream, handler, opts).unwrap();
444        consumer.consume().unwrap();
445        let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
446        assert_eq!(value, "value_4".to_string());
447        let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
448        assert_eq!(value, "value_3".to_string());
449        let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
450        assert_eq!(value, "value_2".to_string());
451        let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
452        assert_eq!(value, "value_1".to_string());
453
454        delete_group(stream, group_name);
455
456        // when process_pending is true
457        let mut messages = vec![];
458        let handler = |_id: &str, message: &Message| {
459          messages.push(message.clone());
460          Ok(())
461        };
462        let opts = ConsumerOpts::default()
463          .group(group_name, consumer_name)
464          .start_pos(StartPosition::StartOfStream)
465          .process_pending(true);
466        let mut consumer = Consumer::init(&mut redis_c, stream, handler, opts).unwrap();
467        consumer.consume().unwrap();
468        let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
469        assert_eq!(value, "value_4".to_string());
470        let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
471        assert_eq!(value, "value_3".to_string());
472        let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
473        assert_eq!(value, "value_2".to_string());
474        let value = String::from_redis_value(messages.pop().unwrap().get("key").unwrap()).unwrap();
475        assert_eq!(value, "value_1".to_string());
476      }
477
478      delete_group(stream, group_name);
479
480      // it skip old messages if EndOfStream
481      {
482        // when process_pending is false
483        let mut messages = vec![];
484        let handler = |_id: &str, message: &Message| {
485          messages.push(message.clone());
486          Ok(())
487        };
488        let opts = ConsumerOpts::default()
489          .group(group_name, consumer_name)
490          .start_pos(StartPosition::EndOfStream)
491          .process_pending(false);
492        let mut consumer = Consumer::init(&mut redis_c, stream, handler, opts).unwrap();
493        consumer.consume().unwrap();
494        assert!(messages.is_empty());
495
496        delete_group(stream, group_name);
497
498        // when process_pending is true
499        let mut messages = vec![];
500        let handler = |_id: &str, message: &Message| {
501          messages.push(message.clone());
502          Ok(())
503        };
504        let opts = ConsumerOpts::default()
505          .group(group_name, consumer_name)
506          .start_pos(StartPosition::EndOfStream)
507          .process_pending(true);
508        let mut consumer = Consumer::init(&mut redis_c, stream, handler, opts).unwrap();
509        consumer.consume().unwrap();
510        assert!(messages.is_empty());
511      }
512    }
513
514    delete_group(stream, group_name);
515    delete_stream(stream);
516  }
517
518  // note: `test_process_messages` is already tested by `test_consume`
519
520  // note: `test_positions` is already tested by `test_consume` (but adding more
521  // tests wouldn't hurt)
522
523  #[test]
524  fn test_ensure_stream_and_group() -> Result<()> {
525    let mut redis = redis_connection();
526
527    delete_stream("test-stream");
528    ensure_stream_and_group(&mut redis, "test-stream", "test-group", "0", true)
529      .context("failed to produce entry to stream")?;
530    ensure_stream_and_group(&mut redis, "test-stream", "test-group", "0", true)
531      .context("failed to produce entry to stream")?;
532
533    Ok(())
534  }
535}