redis_asio/stream/
manage.rs

1use super::EntryId;
2use crate::{RedisCommand, RedisResult, command};
3
4
5/// Set of options that are required by `RedisStream::pending_entries()`
6#[derive(Clone)]
7pub struct PendingOptions {
8    /// Get pending entries from the following streams with ID greater than the corresponding entry IDs
9    pub(crate) streams: Vec<(String, EntryId)>,
10    /// Group name.
11    pub(crate) group: String,
12    /// Consumer name.
13    pub(crate) consumer: String,
14    /// Max count of entries. All pending entries will be requested ff the values is None.
15    pub(crate) count: Option<u16>,
16}
17
18/// Set of options that are required by `RedisStream::touch_group()`
19#[derive(Clone)]
20pub struct TouchGroupOptions {
21    pub(crate) stream: String,
22    pub(crate) group: String,
23}
24
25/// Set of options that are required by `RedisStream::ack_entry()`
26#[derive(Clone)]
27pub struct AckOptions {
28    pub(crate) stream: String,
29    pub(crate) group: String,
30    pub(crate) entry_id: EntryId,
31}
32
33/// Structure that wraps a response on XACK request.
34#[derive(PartialEq, Debug, Clone)]
35pub enum AckResponse {
36    Ok,
37    NotExists,
38}
39
40pub(crate) fn ack_entry_command(options: AckOptions) -> RedisCommand {
41    command("XACK")
42        .arg(options.stream)
43        .arg(options.group)
44        .arg(options.entry_id.to_string())
45}
46
47pub(crate) fn pending_list_command(options: PendingOptions) -> RedisCommand {
48    let mut cmd = command("XREADGROUP")
49        .arg("GROUP")
50        .arg(options.group)
51        .arg(options.consumer);
52    if let Some(count) = options.count {
53        cmd.arg_mut(count);
54    }
55
56    cmd.arg_mut("STREAMS");
57    let mut ids_cmd = RedisCommand::new();
58    for (stream, start_id) in options.streams {
59        cmd.arg_mut(stream);
60        ids_cmd.arg_mut(start_id.to_string());
61    }
62
63    cmd.append(ids_cmd);
64    cmd
65}
66
67pub(crate) fn touch_group_command(options: TouchGroupOptions) -> RedisCommand {
68    command("XGROUP")
69        .arg("CREATE")
70        .arg(options.stream)
71        .arg(options.group)
72        .arg("$")
73        .arg("MKSTREAM") // make an empty stream if there is no such one yet
74}
75
76impl AckResponse {
77    pub(crate) fn new(count_acknowledged: i64) -> Self {
78        match count_acknowledged {
79            0 => AckResponse::NotExists,
80            _ => AckResponse::Ok,
81        }
82    }
83}
84
85impl PendingOptions {
86    pub fn new(stream: String, group: String, consumer: String, start_id: EntryId)
87               -> RedisResult<Self> {
88        let streams = vec![(stream, start_id)];
89        let count: Option<u16> = None;
90        Ok(PendingOptions { streams, group, consumer, count })
91    }
92
93    pub fn with_count(stream: String, group: String, consumer: String, start_id: EntryId, count: u16)
94                      -> RedisResult<Self> {
95        let streams = vec![(stream, start_id)];
96        let count = Some(count);
97        Ok(PendingOptions { streams, group, consumer, count })
98    }
99
100    pub fn add_stream(&mut self, stream: String, start_id: EntryId) {
101        self.streams.push((stream, start_id))
102    }
103}
104
105impl TouchGroupOptions {
106    pub fn new(stream: String, group: String) -> Self {
107        TouchGroupOptions { stream, group }
108    }
109}
110
111impl AckOptions {
112    pub fn new(stream: String, group: String, entry_id: EntryId) -> Self {
113        AckOptions { stream, group, entry_id }
114    }
115}