redis_streams/
types.rs

1use redis::{from_redis_value, FromRedisValue, RedisResult, RedisWrite, ToRedisArgs, Value};
2
3use std::collections::HashMap;
4use std::io::{Error, ErrorKind};
5
6// Stream Maxlen Enum
7
8/// Utility enum for passing `MAXLEN [= or ~] [COUNT]`
9/// arguments into `StreamCommands`.
10/// The enum value represents the count.
11#[derive(PartialEq, Eq, Clone, Debug, Copy)]
12pub enum StreamMaxlen {
13    Equals(usize),
14    Aprrox(usize),
15}
16
17impl ToRedisArgs for StreamMaxlen {
18    fn write_redis_args<W>(&self, out: &mut W)
19    where
20        W: ?Sized + RedisWrite,
21    {
22        let (ch, val) = match *self {
23            StreamMaxlen::Equals(v) => ("=", v),
24            StreamMaxlen::Aprrox(v) => ("~", v),
25        };
26        out.write_arg("MAXLEN".as_bytes());
27        out.write_arg(ch.as_bytes());
28        val.write_redis_args(out);
29    }
30}
31
32/// Builder options for [`xclaim_options`] command.
33///
34/// [`xclaim_options`]: ./trait.StreamCommands.html#method.xclaim_options
35///
36#[derive(Default, Debug)]
37pub struct StreamClaimOptions {
38    /// Set IDLE <milliseconds> cmd arg.
39    idle: Option<usize>,
40    /// Set TIME <mstime> cmd arg.
41    time: Option<usize>,
42    /// Set RETRYCOUNT <count> cmd arg.
43    retry: Option<usize>,
44    /// Set FORCE cmd arg.
45    force: bool,
46    /// Set JUSTID cmd arg. Be advised: the response
47    /// type changes with this option.
48    justid: bool,
49}
50
51impl StreamClaimOptions {
52    pub fn idle(mut self, ms: usize) -> Self {
53        self.idle = Some(ms);
54        self
55    }
56
57    pub fn time(mut self, ms_time: usize) -> Self {
58        self.time = Some(ms_time);
59        self
60    }
61
62    pub fn retry(mut self, count: usize) -> Self {
63        self.retry = Some(count);
64        self
65    }
66
67    pub fn with_force(mut self) -> Self {
68        self.force = true;
69        self
70    }
71
72    pub fn with_justid(mut self) -> Self {
73        self.justid = true;
74        self
75    }
76}
77
78impl ToRedisArgs for StreamClaimOptions {
79    fn write_redis_args<W>(&self, out: &mut W)
80    where
81        W: ?Sized + RedisWrite,
82    {
83        if let Some(ref ms) = self.idle {
84            out.write_arg("IDLE".as_bytes());
85            out.write_arg(format!("{}", ms).as_bytes());
86        }
87        if let Some(ref ms_time) = self.time {
88            out.write_arg("TIME".as_bytes());
89            out.write_arg(format!("{}", ms_time).as_bytes());
90        }
91        if let Some(ref count) = self.retry {
92            out.write_arg("RETRYCOUNT".as_bytes());
93            out.write_arg(format!("{}", count).as_bytes());
94        }
95        if self.force {
96            out.write_arg("FORCE".as_bytes());
97        }
98        if self.justid {
99            out.write_arg("JUSTID".as_bytes());
100        }
101    }
102}
103
104/// Builder options for [`xread_options`] command.
105///
106/// [`xread_options`]: ./trait.StreamCommands.html#method.xread_options
107///
108#[derive(Default, Debug)]
109pub struct StreamReadOptions {
110    /// Set the BLOCK <milliseconds> cmd arg.
111    block: Option<usize>,
112    /// Set the COUNT <count> cmd arg.
113    count: Option<usize>,
114    /// Set the NOACK cmd arg.
115    noack: Option<bool>,
116    /// Set the GROUP <groupname> <consumername> cmd arg.
117    /// This option will toggle the cmd from XREAD to XREADGROUP.
118    group: Option<(Vec<Vec<u8>>, Vec<Vec<u8>>)>,
119}
120
121impl StreamReadOptions {
122    pub fn read_only(&self) -> bool {
123        self.group.is_none()
124    }
125
126    pub fn noack(mut self) -> Self {
127        self.noack = Some(true);
128        self
129    }
130
131    pub fn block(mut self, ms: usize) -> Self {
132        self.block = Some(ms);
133        self
134    }
135
136    pub fn count(mut self, n: usize) -> Self {
137        self.count = Some(n);
138        self
139    }
140
141    pub fn group<GN: ToRedisArgs, CN: ToRedisArgs>(
142        mut self,
143        group_name: GN,
144        consumer_name: CN,
145    ) -> Self {
146        self.group = Some((
147            ToRedisArgs::to_redis_args(&group_name),
148            ToRedisArgs::to_redis_args(&consumer_name),
149        ));
150        self
151    }
152}
153
154impl ToRedisArgs for StreamReadOptions {
155    fn write_redis_args<W>(&self, out: &mut W)
156    where
157        W: ?Sized + RedisWrite,
158    {
159        if let Some(ref ms) = self.block {
160            out.write_arg("BLOCK".as_bytes());
161            out.write_arg(format!("{}", ms).as_bytes());
162        }
163
164        if let Some(ref n) = self.count {
165            out.write_arg("COUNT".as_bytes());
166            out.write_arg(format!("{}", n).as_bytes());
167        }
168
169        if let Some(ref group) = self.group {
170            // noack is only available w/ xreadgroup
171            if let Some(true) = self.noack {
172                out.write_arg("NOACK".as_bytes());
173            }
174
175            out.write_arg("GROUP".as_bytes());
176            for i in &group.0 {
177                out.write_arg(i);
178            }
179            for i in &group.1 {
180                out.write_arg(i);
181            }
182        }
183    }
184}
185
186/// Reply type used with [`xread`] or [`xread_options`] commands.
187///
188/// [`xread`]: ./trait.StreamCommands.html#method.xread
189/// [`xread_options`]: ./trait.StreamCommands.html#method.xread_options
190///
191#[derive(Default, Debug, Clone)]
192pub struct StreamReadReply {
193    pub keys: Vec<StreamKey>,
194}
195
196/// Reply type used with [`xrange`], [`xrange_count`], [`xrange_all`], [`xrevrange`], [`xrevrange_count`], [`xrevrange_all`] commands.
197///
198/// [`xrange`]: ./trait.StreamCommands.html#method.xrange
199/// [`xrange_count`]: ./trait.StreamCommands.html#method.xrange_count
200/// [`xrange_all`]: ./trait.StreamCommands.html#method.xrange_all
201/// [`xrevrange`]: ./trait.StreamCommands.html#method.xrevrange
202/// [`xrevrange_count`]: ./trait.StreamCommands.html#method.xrevrange_count
203/// [`xrevrange_all`]: ./trait.StreamCommands.html#method.xrevrange_all
204///
205#[derive(Default, Debug, Clone)]
206pub struct StreamRangeReply {
207    pub ids: Vec<StreamId>,
208}
209
210/// Reply type used with [`xclaim`] command.
211///
212/// [`xclaim`]: ./trait.StreamCommands.html#method.xclaim
213///
214#[derive(Default, Debug, Clone)]
215pub struct StreamClaimReply {
216    pub ids: Vec<StreamId>,
217}
218
219/// Reply type used with [`xpending`] command.
220///
221/// [`xpending`]: ./trait.StreamCommands.html#method.xpending
222///
223#[derive(Debug, Clone)]
224pub enum StreamPendingReply {
225    Empty,
226    Data(StreamPendingData),
227}
228
229impl Default for StreamPendingReply {
230    fn default() -> StreamPendingReply {
231        StreamPendingReply::Empty
232    }
233}
234
235impl StreamPendingReply {
236    pub fn count(&self) -> usize {
237        match self {
238            StreamPendingReply::Empty => 0,
239            StreamPendingReply::Data(x) => x.count,
240        }
241    }
242}
243
244/// Inner reply type when an [`xpending`] command has data.
245#[derive(Default, Debug, Clone)]
246pub struct StreamPendingData {
247    pub count: usize,
248    pub start_id: String,
249    pub end_id: String,
250    pub consumers: Vec<StreamInfoConsumer>,
251}
252
253/// Reply type used with [`xpending_count`] and
254/// [`xpending_consumer_count`] commands.
255///
256/// [`xpending_count`]: ./trait.StreamCommands.html#method.xpending_count
257/// [`xpending_consumer_count`]: ./trait.StreamCommands.html#method.xpending_consumer_count
258///
259#[derive(Default, Debug, Clone)]
260pub struct StreamPendingCountReply {
261    pub ids: Vec<StreamPendingId>,
262}
263
264/// Reply type used with [`xinfo_stream`] command.
265///
266/// [`xinfo_stream`]: ./trait.StreamCommands.html#method.xinfo_stream
267///
268#[derive(Default, Debug, Clone)]
269pub struct StreamInfoStreamReply {
270    pub last_generated_id: String,
271    pub radix_tree_keys: usize,
272    pub groups: usize,
273    pub length: usize,
274    pub first_entry: StreamId,
275    pub last_entry: StreamId,
276}
277
278/// Reply type used with [`xinfo_consumer`] command.
279///
280/// [`xinfo_consumer`]: ./trait.StreamCommands.html#method.xinfo_consumer
281///
282#[derive(Default, Debug, Clone)]
283pub struct StreamInfoConsumersReply {
284    pub consumers: Vec<StreamInfoConsumer>,
285}
286
287/// Reply type used with [`xinfo_groups`] command.
288///
289/// [`xinfo_groups`]: ./trait.StreamCommands.html#method.xinfo_groups
290///
291#[derive(Default, Debug, Clone)]
292pub struct StreamInfoGroupsReply {
293    pub groups: Vec<StreamInfoGroup>,
294}
295
296/// A consumer parsed from [`xinfo_consumers`] command.
297///
298/// [`xinfo_consumers`]: ./trait.StreamCommands.html#method.xinfo_consumers
299///
300#[derive(Default, Debug, Clone)]
301pub struct StreamInfoConsumer {
302    pub name: String,
303    pub pending: usize,
304    pub idle: usize,
305}
306
307/// A group parsed from [`xinfo_groups`] command.
308///
309/// [`xinfo_groups`]: ./trait.StreamCommands.html#method.xinfo_groups
310///
311#[derive(Default, Debug, Clone)]
312pub struct StreamInfoGroup {
313    pub name: String,
314    pub consumers: usize,
315    pub pending: usize,
316    pub last_delivered_id: String,
317}
318
319/// Represents a pending message parsed from `xpending` methods.
320#[derive(Default, Debug, Clone)]
321pub struct StreamPendingId {
322    pub id: String,
323    pub consumer: String,
324    pub last_delivered_ms: usize,
325    pub times_delivered: usize,
326}
327
328/// Represents a stream `key` and its `id`'s parsed from `xread` methods.
329#[derive(Default, Debug, Clone)]
330pub struct StreamKey {
331    pub key: String,
332    pub ids: Vec<StreamId>,
333}
334
335impl StreamKey {
336    pub fn just_ids(&self) -> Vec<&String> {
337        self.ids.iter().map(|msg| &msg.id).collect::<Vec<&String>>()
338    }
339}
340
341/// Represents a stream `id` and its field/values as a `HashMap`
342#[derive(Default, Debug, Clone)]
343pub struct StreamId {
344    pub id: String,
345    pub map: HashMap<String, Value>,
346}
347
348impl StreamId {
349    pub fn from_bulk_value(v: &Value) -> RedisResult<Self> {
350        let mut stream_id = StreamId::default();
351        match *v {
352            Value::Bulk(ref values) => {
353                if let Some(v) = values.get(0) {
354                    stream_id.id = from_redis_value(&v)?;
355                }
356                if let Some(v) = values.get(1) {
357                    stream_id.map = from_redis_value(&v)?;
358                }
359            }
360            _ => {}
361        }
362
363        Ok(stream_id)
364    }
365
366    pub fn get<T: FromRedisValue>(&self, key: &str) -> Option<T> {
367        match self.find(&key) {
368            Some(ref x) => from_redis_value(*x).ok(),
369            None => None,
370        }
371    }
372
373    pub fn find(&self, key: &&str) -> Option<&Value> {
374        self.map.get(*key)
375    }
376
377    pub fn contains_key(&self, key: &&str) -> bool {
378        self.find(key).is_some()
379    }
380
381    pub fn len(&self) -> usize {
382        self.map.len()
383    }
384}
385
386impl FromRedisValue for StreamReadReply {
387    fn from_redis_value(v: &Value) -> RedisResult<Self> {
388        let rows: Vec<HashMap<String, Vec<HashMap<String, HashMap<String, Value>>>>> =
389            from_redis_value(v)?;
390        let mut reply = StreamReadReply::default();
391        for row in &rows {
392            for (key, entry) in row.iter() {
393                let mut k = StreamKey::default();
394                k.key = key.to_owned();
395                for id_row in entry {
396                    let mut i = StreamId::default();
397                    for (id, map) in id_row.iter() {
398                        i.id = id.to_owned();
399                        i.map = map.to_owned();
400                    }
401                    k.ids.push(i);
402                }
403                reply.keys.push(k);
404            }
405        }
406        Ok(reply)
407    }
408}
409
410impl FromRedisValue for StreamRangeReply {
411    fn from_redis_value(v: &Value) -> RedisResult<Self> {
412        let rows: Vec<HashMap<String, HashMap<String, Value>>> = from_redis_value(v)?;
413        let mut reply = StreamRangeReply::default();
414        for row in &rows {
415            let mut i = StreamId::default();
416            for (id, map) in row.iter() {
417                i.id = id.to_owned();
418                i.map = map.to_owned();
419            }
420            reply.ids.push(i);
421        }
422        Ok(reply)
423    }
424}
425
426impl FromRedisValue for StreamClaimReply {
427    fn from_redis_value(v: &Value) -> RedisResult<Self> {
428        let rows: Vec<HashMap<String, HashMap<String, Value>>> = from_redis_value(v)?;
429        let mut reply = StreamClaimReply::default();
430        for row in &rows {
431            let mut i = StreamId::default();
432            for (id, map) in row.iter() {
433                i.id = id.to_owned();
434                i.map = map.to_owned();
435            }
436            reply.ids.push(i);
437        }
438        Ok(reply)
439    }
440}
441
442impl FromRedisValue for StreamPendingReply {
443    fn from_redis_value(v: &Value) -> RedisResult<Self> {
444        let parts: (usize, Option<String>, Option<String>, Vec<Vec<String>>) = from_redis_value(v)?;
445        let count = parts.0.to_owned() as usize;
446
447        if count == 0 {
448            Ok(StreamPendingReply::Empty)
449        } else {
450            let mut result = StreamPendingData::default();
451
452            let start_id = match parts.1.to_owned() {
453                Some(start) => Ok(start),
454                None => Err(Error::new(
455                    ErrorKind::Other,
456                    "IllegalState: Non-zero pending expects start id",
457                )),
458            }?;
459
460            let end_id = match parts.2.to_owned() {
461                Some(end) => Ok(end),
462                None => Err(Error::new(
463                    ErrorKind::Other,
464                    "IllegalState: Non-zero pending expects end id",
465                )),
466            }?;
467
468            result.count = count;
469            result.start_id = start_id;
470            result.end_id = end_id;
471
472            for consumer in &parts.3 {
473                let mut info = StreamInfoConsumer::default();
474                info.name = consumer[0].to_owned();
475                if let Ok(v) = consumer[1].to_owned().parse::<usize>() {
476                    info.pending = v;
477                }
478                result.consumers.push(info);
479            }
480
481            Ok(StreamPendingReply::Data(result))
482        }
483    }
484}
485
486impl FromRedisValue for StreamPendingCountReply {
487    fn from_redis_value(v: &Value) -> RedisResult<Self> {
488        let parts: Vec<Vec<(String, String, usize, usize)>> = from_redis_value(v)?;
489        let mut reply = StreamPendingCountReply::default();
490        for row in &parts {
491            let mut p = StreamPendingId::default();
492            p.id = row[0].0.to_owned();
493            p.consumer = row[0].1.to_owned();
494            p.last_delivered_ms = row[0].2.to_owned();
495            p.times_delivered = row[0].3.to_owned();
496            reply.ids.push(p);
497        }
498        Ok(reply)
499    }
500}
501
502impl FromRedisValue for StreamInfoStreamReply {
503    fn from_redis_value(v: &Value) -> RedisResult<Self> {
504        let map: HashMap<String, Value> = from_redis_value(v)?;
505        let mut reply = StreamInfoStreamReply::default();
506        if let Some(v) = &map.get("last-generated-id") {
507            reply.last_generated_id = from_redis_value(v)?;
508        }
509        if let Some(v) = &map.get("radix-tree-nodes") {
510            reply.radix_tree_keys = from_redis_value(v)?;
511        }
512        if let Some(v) = &map.get("groups") {
513            reply.groups = from_redis_value(v)?;
514        }
515        if let Some(v) = &map.get("length") {
516            reply.length = from_redis_value(v)?;
517        }
518        if let Some(v) = &map.get("first-entry") {
519            reply.first_entry = StreamId::from_bulk_value(v)?;
520        }
521        if let Some(v) = &map.get("last-entry") {
522            reply.last_entry = StreamId::from_bulk_value(v)?;
523        }
524        Ok(reply)
525    }
526}
527
528impl FromRedisValue for StreamInfoConsumersReply {
529    fn from_redis_value(v: &Value) -> RedisResult<Self> {
530        let consumers: Vec<HashMap<String, Value>> = from_redis_value(v)?;
531        let mut reply = StreamInfoConsumersReply::default();
532        for map in consumers {
533            let mut c = StreamInfoConsumer::default();
534            if let Some(v) = &map.get("name") {
535                c.name = from_redis_value(v)?;
536            }
537            if let Some(v) = &map.get("pending") {
538                c.pending = from_redis_value(v)?;
539            }
540            if let Some(v) = &map.get("idle") {
541                c.idle = from_redis_value(v)?;
542            }
543            reply.consumers.push(c);
544        }
545
546        Ok(reply)
547    }
548}
549
550impl FromRedisValue for StreamInfoGroupsReply {
551    fn from_redis_value(v: &Value) -> RedisResult<Self> {
552        let groups: Vec<HashMap<String, Value>> = from_redis_value(v)?;
553        let mut reply = StreamInfoGroupsReply::default();
554        for map in groups {
555            let mut g = StreamInfoGroup::default();
556            if let Some(v) = &map.get("name") {
557                g.name = from_redis_value(v)?;
558            }
559            if let Some(v) = &map.get("pending") {
560                g.pending = from_redis_value(v)?;
561            }
562            if let Some(v) = &map.get("consumers") {
563                g.consumers = from_redis_value(v)?;
564            }
565            if let Some(v) = &map.get("last-delivered-id") {
566                g.last_delivered_id = from_redis_value(v)?;
567            }
568            reply.groups.push(g);
569        }
570        Ok(reply)
571    }
572}