redis_event/cmd/
streams.rs

1/*!
2Stream相关的命令定义、解析
3
4所有涉及到的命令参考[Redis Command Reference]
5
6[Redis Command Reference]: https://redis.io/commands#stream
7*/
8
9use core::slice::Iter;
10
11use crate::cmd::hashes::Field;
12
13#[derive(Debug)]
14pub struct XACK<'a> {
15    pub key: &'a [u8],
16    pub group: &'a [u8],
17    pub ids: Vec<&'a Vec<u8>>,
18}
19
20pub(crate) fn parse_xack(mut iter: Iter<Vec<u8>>) -> XACK {
21    let key = iter.next().unwrap();
22    let group = iter.next().unwrap();
23    let mut ids = Vec::new();
24    for id in iter {
25        ids.push(id);
26    }
27    XACK { key, group, ids }
28}
29
30#[derive(Debug)]
31pub struct XADD<'a> {
32    pub key: &'a [u8],
33    pub id: &'a [u8],
34    pub fields: Vec<Field<'a>>,
35}
36
37pub(crate) fn parse_xadd(mut iter: Iter<Vec<u8>>) -> XADD {
38    let key = iter.next().unwrap();
39    let id = iter.next().unwrap();
40    let mut fields = Vec::new();
41    loop {
42        if let Some(field) = iter.next() {
43            if let Some(value) = iter.next() {
44                let field = Field { name: field, value };
45                fields.push(field);
46            } else {
47                panic!("XADD缺失field value");
48            }
49        } else {
50            break;
51        }
52    }
53    XADD { key, id, fields }
54}
55
56#[derive(Debug)]
57pub struct XCLAIM<'a> {
58    pub key: &'a [u8],
59    pub group: &'a [u8],
60    pub consumer: &'a [u8],
61    pub min_idle_time: &'a [u8],
62    pub ids: Vec<&'a Vec<u8>>,
63    pub idle: Option<&'a Vec<u8>>,
64    pub time: Option<&'a Vec<u8>>,
65    pub retry_count: Option<&'a Vec<u8>>,
66    pub force: Option<bool>,
67    pub just_id: Option<bool>,
68}
69
70pub(crate) fn parse_xclaim(mut iter: Iter<Vec<u8>>) -> XCLAIM {
71    let key = iter.next().unwrap();
72    let group = iter.next().unwrap();
73    let consumer = iter.next().unwrap();
74    let min_idle_time = iter.next().unwrap();
75    let mut ids = Vec::new();
76    let id = iter.next().unwrap();
77    ids.push(id);
78    let mut idle = None;
79    let mut time = None;
80    let mut retry_count = None;
81    let mut force = None;
82    let mut just_id = None;
83    for arg in iter.next() {
84        let arg_string = String::from_utf8_lossy(arg);
85        let p_arg = &arg_string.to_uppercase();
86        if p_arg == "IDLE" {
87            let _idle = iter.next().unwrap();
88            idle = Some(_idle);
89        } else if p_arg == "TIME" {
90            let _time = iter.next().unwrap();
91            time = Some(_time);
92        } else if p_arg == "RETRYCOUNT" {
93            let _retry_count = iter.next().unwrap();
94            retry_count = Some(_retry_count);
95        } else if p_arg == "FORCE" {
96            force = Some(true);
97        } else if p_arg == "JUSTID" {
98            just_id = Some(true);
99        } else {
100            ids.push(arg);
101        }
102    }
103    XCLAIM {
104        key,
105        group,
106        consumer,
107        min_idle_time,
108        ids,
109        idle,
110        time,
111        retry_count,
112        force,
113        just_id,
114    }
115}
116
117#[derive(Debug)]
118pub struct XDEL<'a> {
119    pub key: &'a [u8],
120    pub ids: Vec<&'a Vec<u8>>,
121}
122
123pub(crate) fn parse_xdel(mut iter: Iter<Vec<u8>>) -> XDEL {
124    let key = iter.next().unwrap();
125    let mut ids = Vec::new();
126    for id in iter {
127        ids.push(id);
128    }
129    XDEL { key, ids }
130}
131
132#[derive(Debug)]
133pub struct XGROUP<'a> {
134    pub create: Option<Create<'a>>,
135    pub set_id: Option<SetID<'a>>,
136    pub destroy: Option<Destroy<'a>>,
137    pub del_consumer: Option<DelConsumer<'a>>,
138}
139
140#[derive(Debug)]
141pub struct Create<'a> {
142    pub key: &'a [u8],
143    pub group_name: &'a [u8],
144    pub id: &'a [u8],
145}
146
147#[derive(Debug)]
148pub struct SetID<'a> {
149    pub key: &'a [u8],
150    pub group_name: &'a [u8],
151    pub id: &'a [u8],
152}
153
154#[derive(Debug)]
155pub struct Destroy<'a> {
156    pub key: &'a [u8],
157    pub group_name: &'a [u8],
158}
159
160#[derive(Debug)]
161pub struct DelConsumer<'a> {
162    pub key: &'a [u8],
163    pub group_name: &'a [u8],
164    pub consumer_name: &'a [u8],
165}
166
167pub(crate) fn parse_xgroup(mut iter: Iter<Vec<u8>>) -> XGROUP {
168    let mut create = None;
169    let mut set_id = None;
170    let mut destroy = None;
171    let mut del_consumer = None;
172    for arg in iter.next() {
173        let arg_string = String::from_utf8_lossy(arg);
174        let p_arg = &arg_string.to_uppercase();
175        if p_arg == "CREATE" {
176            let key = iter.next().unwrap();
177            let group_name = iter.next().unwrap();
178            let id = iter.next().unwrap();
179            create = Some(Create { key, group_name, id })
180        } else if p_arg == "SETID" {
181            let key = iter.next().unwrap();
182            let group_name = iter.next().unwrap();
183            let id = iter.next().unwrap();
184            set_id = Some(SetID { key, group_name, id })
185        } else if p_arg == "DESTROY" {
186            let key = iter.next().unwrap();
187            let group_name = iter.next().unwrap();
188            destroy = Some(Destroy { key, group_name })
189        } else if p_arg == "DELCONSUMER" {
190            let key = iter.next().unwrap();
191            let group_name = iter.next().unwrap();
192            let consumer_name = iter.next().unwrap();
193            del_consumer = Some(DelConsumer {
194                key,
195                group_name,
196                consumer_name,
197            })
198        }
199    }
200    XGROUP {
201        create,
202        set_id,
203        destroy,
204        del_consumer,
205    }
206}
207
208#[derive(Debug)]
209pub struct XTRIM<'a> {
210    pub key: &'a [u8],
211    pub approximation: bool,
212    pub count: u64,
213}
214
215pub(crate) fn parse_xtrim(mut iter: Iter<Vec<u8>>) -> XTRIM {
216    let key = iter.next().unwrap();
217    iter.next().unwrap();
218    let third = iter.next().unwrap();
219    let third = String::from_utf8_lossy(third);
220    let approximation;
221    let count;
222    if "~" == third {
223        approximation = true;
224        let arg = String::from_utf8_lossy(iter.next().unwrap());
225        count = arg.parse::<u64>().unwrap();
226    } else {
227        approximation = false;
228        count = third.parse::<u64>().unwrap();
229    }
230    XTRIM {
231        key,
232        approximation,
233        count,
234    }
235}