redis_event/cmd/
streams.rs1use core::slice::Iter;
10
11use crate::cmd::hashes::Field;
12
13#[derive(Debug)]
14pub struct XACK<'a> {
15 pub key: &'a [u8],
16 pub group: &'a [u8],
17 pub ids: Vec<&'a Vec<u8>>,
18}
19
20pub(crate) fn parse_xack(mut iter: Iter<Vec<u8>>) -> XACK {
21 let key = iter.next().unwrap();
22 let group = iter.next().unwrap();
23 let mut ids = Vec::new();
24 for id in iter {
25 ids.push(id);
26 }
27 XACK { key, group, ids }
28}
29
30#[derive(Debug)]
31pub struct XADD<'a> {
32 pub key: &'a [u8],
33 pub id: &'a [u8],
34 pub fields: Vec<Field<'a>>,
35}
36
37pub(crate) fn parse_xadd(mut iter: Iter<Vec<u8>>) -> XADD {
38 let key = iter.next().unwrap();
39 let id = iter.next().unwrap();
40 let mut fields = Vec::new();
41 loop {
42 if let Some(field) = iter.next() {
43 if let Some(value) = iter.next() {
44 let field = Field { name: field, value };
45 fields.push(field);
46 } else {
47 panic!("XADD缺失field value");
48 }
49 } else {
50 break;
51 }
52 }
53 XADD { key, id, fields }
54}
55
56#[derive(Debug)]
57pub struct XCLAIM<'a> {
58 pub key: &'a [u8],
59 pub group: &'a [u8],
60 pub consumer: &'a [u8],
61 pub min_idle_time: &'a [u8],
62 pub ids: Vec<&'a Vec<u8>>,
63 pub idle: Option<&'a Vec<u8>>,
64 pub time: Option<&'a Vec<u8>>,
65 pub retry_count: Option<&'a Vec<u8>>,
66 pub force: Option<bool>,
67 pub just_id: Option<bool>,
68}
69
70pub(crate) fn parse_xclaim(mut iter: Iter<Vec<u8>>) -> XCLAIM {
71 let key = iter.next().unwrap();
72 let group = iter.next().unwrap();
73 let consumer = iter.next().unwrap();
74 let min_idle_time = iter.next().unwrap();
75 let mut ids = Vec::new();
76 let id = iter.next().unwrap();
77 ids.push(id);
78 let mut idle = None;
79 let mut time = None;
80 let mut retry_count = None;
81 let mut force = None;
82 let mut just_id = None;
83 for arg in iter.next() {
84 let arg_string = String::from_utf8_lossy(arg);
85 let p_arg = &arg_string.to_uppercase();
86 if p_arg == "IDLE" {
87 let _idle = iter.next().unwrap();
88 idle = Some(_idle);
89 } else if p_arg == "TIME" {
90 let _time = iter.next().unwrap();
91 time = Some(_time);
92 } else if p_arg == "RETRYCOUNT" {
93 let _retry_count = iter.next().unwrap();
94 retry_count = Some(_retry_count);
95 } else if p_arg == "FORCE" {
96 force = Some(true);
97 } else if p_arg == "JUSTID" {
98 just_id = Some(true);
99 } else {
100 ids.push(arg);
101 }
102 }
103 XCLAIM {
104 key,
105 group,
106 consumer,
107 min_idle_time,
108 ids,
109 idle,
110 time,
111 retry_count,
112 force,
113 just_id,
114 }
115}
116
117#[derive(Debug)]
118pub struct XDEL<'a> {
119 pub key: &'a [u8],
120 pub ids: Vec<&'a Vec<u8>>,
121}
122
123pub(crate) fn parse_xdel(mut iter: Iter<Vec<u8>>) -> XDEL {
124 let key = iter.next().unwrap();
125 let mut ids = Vec::new();
126 for id in iter {
127 ids.push(id);
128 }
129 XDEL { key, ids }
130}
131
132#[derive(Debug)]
133pub struct XGROUP<'a> {
134 pub create: Option<Create<'a>>,
135 pub set_id: Option<SetID<'a>>,
136 pub destroy: Option<Destroy<'a>>,
137 pub del_consumer: Option<DelConsumer<'a>>,
138}
139
140#[derive(Debug)]
141pub struct Create<'a> {
142 pub key: &'a [u8],
143 pub group_name: &'a [u8],
144 pub id: &'a [u8],
145}
146
147#[derive(Debug)]
148pub struct SetID<'a> {
149 pub key: &'a [u8],
150 pub group_name: &'a [u8],
151 pub id: &'a [u8],
152}
153
154#[derive(Debug)]
155pub struct Destroy<'a> {
156 pub key: &'a [u8],
157 pub group_name: &'a [u8],
158}
159
160#[derive(Debug)]
161pub struct DelConsumer<'a> {
162 pub key: &'a [u8],
163 pub group_name: &'a [u8],
164 pub consumer_name: &'a [u8],
165}
166
167pub(crate) fn parse_xgroup(mut iter: Iter<Vec<u8>>) -> XGROUP {
168 let mut create = None;
169 let mut set_id = None;
170 let mut destroy = None;
171 let mut del_consumer = None;
172 for arg in iter.next() {
173 let arg_string = String::from_utf8_lossy(arg);
174 let p_arg = &arg_string.to_uppercase();
175 if p_arg == "CREATE" {
176 let key = iter.next().unwrap();
177 let group_name = iter.next().unwrap();
178 let id = iter.next().unwrap();
179 create = Some(Create { key, group_name, id })
180 } else if p_arg == "SETID" {
181 let key = iter.next().unwrap();
182 let group_name = iter.next().unwrap();
183 let id = iter.next().unwrap();
184 set_id = Some(SetID { key, group_name, id })
185 } else if p_arg == "DESTROY" {
186 let key = iter.next().unwrap();
187 let group_name = iter.next().unwrap();
188 destroy = Some(Destroy { key, group_name })
189 } else if p_arg == "DELCONSUMER" {
190 let key = iter.next().unwrap();
191 let group_name = iter.next().unwrap();
192 let consumer_name = iter.next().unwrap();
193 del_consumer = Some(DelConsumer {
194 key,
195 group_name,
196 consumer_name,
197 })
198 }
199 }
200 XGROUP {
201 create,
202 set_id,
203 destroy,
204 del_consumer,
205 }
206}
207
208#[derive(Debug)]
209pub struct XTRIM<'a> {
210 pub key: &'a [u8],
211 pub approximation: bool,
212 pub count: u64,
213}
214
215pub(crate) fn parse_xtrim(mut iter: Iter<Vec<u8>>) -> XTRIM {
216 let key = iter.next().unwrap();
217 iter.next().unwrap();
218 let third = iter.next().unwrap();
219 let third = String::from_utf8_lossy(third);
220 let approximation;
221 let count;
222 if "~" == third {
223 approximation = true;
224 let arg = String::from_utf8_lossy(iter.next().unwrap());
225 count = arg.parse::<u64>().unwrap();
226 } else {
227 approximation = false;
228 count = third.parse::<u64>().unwrap();
229 }
230 XTRIM {
231 key,
232 approximation,
233 count,
234 }
235}