redis_asio/stream/
manage.rs1use super::EntryId;
2use crate::{RedisCommand, RedisResult, command};
3
4
5#[derive(Clone)]
7pub struct PendingOptions {
8 pub(crate) streams: Vec<(String, EntryId)>,
10 pub(crate) group: String,
12 pub(crate) consumer: String,
14 pub(crate) count: Option<u16>,
16}
17
18#[derive(Clone)]
20pub struct TouchGroupOptions {
21 pub(crate) stream: String,
22 pub(crate) group: String,
23}
24
25#[derive(Clone)]
27pub struct AckOptions {
28 pub(crate) stream: String,
29 pub(crate) group: String,
30 pub(crate) entry_id: EntryId,
31}
32
33#[derive(PartialEq, Debug, Clone)]
35pub enum AckResponse {
36 Ok,
37 NotExists,
38}
39
40pub(crate) fn ack_entry_command(options: AckOptions) -> RedisCommand {
41 command("XACK")
42 .arg(options.stream)
43 .arg(options.group)
44 .arg(options.entry_id.to_string())
45}
46
47pub(crate) fn pending_list_command(options: PendingOptions) -> RedisCommand {
48 let mut cmd = command("XREADGROUP")
49 .arg("GROUP")
50 .arg(options.group)
51 .arg(options.consumer);
52 if let Some(count) = options.count {
53 cmd.arg_mut(count);
54 }
55
56 cmd.arg_mut("STREAMS");
57 let mut ids_cmd = RedisCommand::new();
58 for (stream, start_id) in options.streams {
59 cmd.arg_mut(stream);
60 ids_cmd.arg_mut(start_id.to_string());
61 }
62
63 cmd.append(ids_cmd);
64 cmd
65}
66
67pub(crate) fn touch_group_command(options: TouchGroupOptions) -> RedisCommand {
68 command("XGROUP")
69 .arg("CREATE")
70 .arg(options.stream)
71 .arg(options.group)
72 .arg("$")
73 .arg("MKSTREAM") }
75
76impl AckResponse {
77 pub(crate) fn new(count_acknowledged: i64) -> Self {
78 match count_acknowledged {
79 0 => AckResponse::NotExists,
80 _ => AckResponse::Ok,
81 }
82 }
83}
84
85impl PendingOptions {
86 pub fn new(stream: String, group: String, consumer: String, start_id: EntryId)
87 -> RedisResult<Self> {
88 let streams = vec![(stream, start_id)];
89 let count: Option<u16> = None;
90 Ok(PendingOptions { streams, group, consumer, count })
91 }
92
93 pub fn with_count(stream: String, group: String, consumer: String, start_id: EntryId, count: u16)
94 -> RedisResult<Self> {
95 let streams = vec![(stream, start_id)];
96 let count = Some(count);
97 Ok(PendingOptions { streams, group, consumer, count })
98 }
99
100 pub fn add_stream(&mut self, stream: String, start_id: EntryId) {
101 self.streams.push((stream, start_id))
102 }
103}
104
105impl TouchGroupOptions {
106 pub fn new(stream: String, group: String) -> Self {
107 TouchGroupOptions { stream, group }
108 }
109}
110
111impl AckOptions {
112 pub fn new(stream: String, group: String, entry_id: EntryId) -> Self {
113 AckOptions { stream, group, entry_id }
114 }
115}