redis_queue/
types.rs

1//!Various data types to supplement queue interface
2
3use core::{fmt, time, cmp};
4use core::str::FromStr;
5use core::convert::TryInto;
6
7use redis::{FromRedisValue, ErrorKind, Value, RedisResult, RedisError, RedisWrite, ToRedisArgs};
8
9use crate::queue::QueueConfig;
10
11pub(crate) mod idents {
12    macro_rules! define_term {
13        ($($ident:ident),+) => {
14            $(
15                pub const $ident: &'static str = stringify!($ident);
16            )+
17        };
18    }
19
20    define_term!(TYPE, XGROUP, CREATE, MKSTREAM, BUSYGROUP, TIME, XLEN, XADD, XREADGROUP, XPENDING, XACK, XDEL);
21    define_term!(XINFO, GROUPS, XTRIM);
22    define_term!(GROUP, COUNT, BLOCK, STREAMS, IDLE, MAXLEN, MINID);
23}
24
25fn parse_redis_key(value: &redis::Value) -> Result<&str, RedisError> {
26    match value {
27        redis::Value::Data(ref data) => match core::str::from_utf8(data) {
28            Ok(key) => Ok(key),
29            Err(_) => Err((redis::ErrorKind::TypeError, "Non-UTF8 stream field's name").into()),
30        },
31        _ => Err((redis::ErrorKind::TypeError, "Invalid stream field's name").into()),
32    }
33}
34
35macro_rules! assign_field_if {
36    ($field:ident = $value:ident IF $key:ident == $expected:expr) => {
37        if $field.is_none() && $key.eq_ignore_ascii_case($expected) {
38            $field = Some(FromRedisValue::from_redis_value($value)?);
39            continue;
40        }
41    };
42}
43
44#[cold]
45#[inline(never)]
46fn unlikely_redis_error(kind: redis::ErrorKind, text: &'static str) -> RedisError {
47    (kind, text).into()
48}
49
50macro_rules! unwrap_required_field {
51    ($field:ident) => {
52        match $field {
53            Some(field) => field,
54            None => {
55                return Err(unlikely_redis_error(
56                    redis::ErrorKind::TypeError,
57                    concat!("'", stringify!($field), "' is missing"),
58                ))
59            }
60        }
61    };
62}
63
64#[derive(Copy, Clone, Debug, PartialEq, Eq)]
65///Possible types returned by `TYPE` command
66pub(crate) enum RedisType {
67    ///String
68    String,
69    ///List
70    List,
71    ///Set
72    Set,
73    ///Zset
74    ZSet,
75    ///Hash
76    Hash,
77    ///Stream
78    Stream,
79    ///None, which means key doesn't exist
80    None,
81}
82
83impl RedisType {
84    #[inline(always)]
85    pub fn parse(value: &str) -> Option<Self> {
86        if value.eq_ignore_ascii_case("string") {
87            Some(Self::String)
88        } else if value.eq_ignore_ascii_case("list") {
89            Some(Self::List)
90        } else if value.eq_ignore_ascii_case("set") {
91            Some(Self::Set)
92        } else if value.eq_ignore_ascii_case("zset") {
93            Some(Self::ZSet)
94        } else if value.eq_ignore_ascii_case("hash") {
95            Some(Self::Hash)
96        } else if value.eq_ignore_ascii_case("stream") {
97            Some(Self::Stream)
98        } else if value.eq_ignore_ascii_case("none") {
99            Some(Self::None)
100        } else {
101            None
102        }
103    }
104}
105
106impl FromRedisValue for RedisType {
107    fn from_redis_value(value: &Value) -> RedisResult<Self> {
108        match value {
109            Value::Bulk(_) => Err((ErrorKind::TypeError, "Not a single value").into()),
110            Value::Data(value) => match core::str::from_utf8(value) {
111                Ok(value) => match Self::parse(value) {
112                    Some(result) => Ok(result),
113                    None => Err((ErrorKind::TypeError, "Not a type").into()),
114                },
115                Err(_) => Err((ErrorKind::TypeError, "Not a string").into()),
116            },
117            Value::Nil => Err((ErrorKind::TypeError, "unexpected null").into()),
118            Value::Int(_) => Err((ErrorKind::TypeError, "unexpected Integer").into()),
119            Value::Okay => Err((ErrorKind::TypeError, "unexpected OK").into()),
120            Value::Status(response) => match Self::parse(response) {
121                Some(result) => Ok(result),
122                None => Err((ErrorKind::TypeError, "Not a type").into()),
123            },
124        }
125    }
126
127    fn from_byte_vec(vec: &[u8]) -> Option<Vec<Self>> {
128        match core::str::from_utf8(vec) {
129            Ok(value) => Self::parse(value).map(|val| vec![val]),
130            Err(_) => None,
131        }
132    }
133}
134
135#[derive(Debug, Copy, Clone)]
136///Trimming method
137pub enum TrimMethod {
138    ///Request to clean exceeding specified number of elements.
139    MaxLen(u64),
140    ///Request to clean entries below specified id.
141    ///
142    ///Supported since Redis 6.2
143    MinId(StreamId),
144}
145
146impl ToRedisArgs for TrimMethod {
147    #[inline(always)]
148    fn write_redis_args<W: ?Sized + redis::RedisWrite>(&self, out: &mut W) {
149        match self {
150            Self::MaxLen(threshold) => {
151                idents::MAXLEN.write_redis_args(out);
152                threshold.write_redis_args(out);
153            }
154            Self::MinId(id) => {
155                idents::MINID.write_redis_args(out);
156                id.write_redis_args(out);
157            }
158        }
159    }
160
161    #[inline(always)]
162    fn is_single_arg(&self) -> bool {
163        false
164    }
165}
166
167#[derive(Debug)]
168///Possible errors parsing Redis Stream ID
169pub enum StreamIdParseError {
170    ///Not compatible type (non-string or empty)
171    InvalidType,
172    ///Timestamp is not valid zero or positive integer
173    InvalidTimestamp,
174    ///Sequence part of Stream's ID is missing
175    MissingSequence,
176    ///Sequence is not valid zero or positive integer
177    InvalidSequence,
178}
179
180impl StreamIdParseError {
181    #[inline(always)]
182    const fn as_str(&self) -> &'static str {
183        match self {
184            Self::InvalidType => "Not a valid stream id",
185            Self::InvalidTimestamp => "Invalid timestamp",
186            Self::MissingSequence => "Missing sequence part",
187            Self::InvalidSequence => "Invalid sequence number",
188        }
189    }
190}
191
192impl fmt::Display for StreamIdParseError {
193    #[inline(always)]
194    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
195        fmt.write_str(self.as_str())
196    }
197}
198
199impl std::error::Error for StreamIdParseError {}
200
201#[derive(Copy, Clone, PartialEq, Eq)]
202#[repr(transparent)]
203///Timestamp component of `StreamId`
204///
205///When used as argument, written as `{timestamp}`
206///
207///Second part of id is automatically generated by Queue
208pub struct TimestampId {
209    ///Timestamp (in milliseconds)
210    timestamp: u64,
211}
212
213impl TimestampId {
214    #[inline]
215    ///Creates new id from duration.
216    ///
217    ///timestamp is limited to `u64::max_value()`
218    pub fn new(timestamp: time::Duration) -> Self {
219        Self {
220            //Limit timestamp to u64::max_value() in milliseconds.
221            timestamp: match timestamp.as_millis().try_into() {
222                Ok(res) => res,
223                Err(_) => u64::max_value(),
224            },
225        }
226    }
227}
228
229impl fmt::Debug for TimestampId {
230    #[inline(always)]
231    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
232        let Self { timestamp } = self;
233        fmt::Debug::fmt(timestamp, fmt)
234    }
235}
236
237impl fmt::Display for TimestampId {
238    #[inline(always)]
239    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
240        let Self { timestamp } = self;
241        fmt.write_fmt(format_args!("{timestamp}-*"))
242    }
243}
244
245impl ToRedisArgs for TimestampId {
246    #[inline(always)]
247    fn write_redis_args<W: ?Sized + RedisWrite>(&self, out: &mut W) {
248        self.timestamp.write_redis_args(out)
249    }
250
251    #[inline(always)]
252    fn is_single_arg(&self) -> bool {
253        true
254    }
255}
256
257#[derive(Copy, Clone, PartialEq, Eq, Hash)]
258///Auto-generated stream key
259pub struct StreamId {
260    ///Timestamp (in milliseconds)
261    timestamp: u64,
262    ///Sequence number within `timestamp,in case of multiple items being placed at the same time
263    seq: u64,
264}
265
266impl StreamId {
267    #[inline(always)]
268    ///Default value in case of NULL value.
269    pub const fn nil() -> Self {
270        Self { timestamp: 0, seq: 0 }
271    }
272
273    #[inline(always)]
274    ///Checks whether id is nil.
275    ///
276    ///Both this type and redis itself uses `0-0` to indicate initial null id
277    pub const fn is_nil(&self) -> bool {
278        self.timestamp == 0 && self.seq == 0
279    }
280
281    #[inline(always)]
282    ///Returns UNIX timestamp
283    pub const fn as_timestamp(&self) -> time::Duration {
284        time::Duration::from_millis(self.timestamp)
285    }
286
287    ///Returns next id after `self`
288    pub const fn next(&self) -> Self {
289        if self.timestamp == u64::max_value() {
290            Self {
291                timestamp: self.timestamp,
292                seq: self.seq.saturating_add(1),
293            }
294        } else if self.seq == u64::max_value() {
295            Self {
296                timestamp: self.timestamp.saturating_add(1),
297                seq: 0,
298            }
299        } else {
300            Self {
301                timestamp: self.timestamp,
302                seq: self.seq + 1,
303            }
304        }
305    }
306
307    ///Returns next id after `self`
308    pub const fn prev(&self) -> Self {
309        if self.timestamp == 0 {
310            Self {
311                timestamp: self.timestamp,
312                seq: self.seq.saturating_sub(1),
313            }
314        } else if self.seq == 0 {
315            Self {
316                timestamp: self.timestamp.saturating_sub(1),
317                seq: 0,
318            }
319        } else {
320            Self {
321                timestamp: self.timestamp,
322                seq: self.seq - 1,
323            }
324        }
325    }
326}
327
328impl fmt::Debug for StreamId {
329    #[inline(always)]
330    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
331        let Self { timestamp, seq } = self;
332        fmt::Debug::fmt(&(timestamp, seq), fmt)
333    }
334}
335
336impl PartialOrd for StreamId {
337    #[allow(clippy::non_canonical_partial_ord_impl)]
338    #[inline(always)]
339    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
340        PartialOrd::partial_cmp(&(self.timestamp, self.seq), &(other.timestamp, other.seq))
341    }
342}
343
344impl Ord for StreamId {
345    #[inline(always)]
346    fn cmp(&self, other: &Self) -> cmp::Ordering {
347        Ord::cmp(&(self.timestamp, self.seq), &(other.timestamp, other.seq))
348    }
349}
350
351impl FromStr for StreamId {
352    type Err = StreamIdParseError;
353
354    fn from_str(data: &str) -> Result<Self, Self::Err> {
355        let mut split = data.split('-');
356        let timestamp = match split.next() {
357            Some(timestamp) => match timestamp.parse() {
358                Ok(timestamp) => timestamp,
359                Err(_) => {
360                    return Err(StreamIdParseError::InvalidTimestamp);
361                }
362            },
363            None => return Err(StreamIdParseError::InvalidType),
364        };
365
366        let seq = match split.next() {
367            Some(seq) => match seq.parse() {
368                Ok(seq) => seq,
369                Err(_) => return Err(StreamIdParseError::InvalidSequence),
370            },
371            None => return Err(StreamIdParseError::MissingSequence),
372        };
373
374        Ok(Self { timestamp, seq })
375    }
376}
377
378impl fmt::Display for StreamId {
379    #[inline(always)]
380    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
381        let Self { timestamp, seq } = self;
382        fmt.write_fmt(format_args!("{timestamp}-{seq}"))
383    }
384}
385
386impl ToRedisArgs for StreamId {
387    #[inline(always)]
388    fn write_redis_args<W: ?Sized + RedisWrite>(&self, out: &mut W) {
389        const STREAM_MAX_SIZE: usize = 20 + 1 + 20;
390        let mut buf = str_buf::StrBuf::<STREAM_MAX_SIZE>::new();
391        let _ = fmt::Write::write_fmt(&mut buf, format_args!("{self}"));
392
393        out.write_arg_fmt(buf.as_str())
394    }
395
396    #[inline(always)]
397    fn is_single_arg(&self) -> bool {
398        true
399    }
400}
401
402impl FromRedisValue for StreamId {
403    fn from_redis_value(value: &Value) -> RedisResult<Self> {
404        match value {
405            Value::Data(data) => match core::str::from_utf8(data) {
406                Ok(data) => match data.parse() {
407                    Ok(result) => Ok(result),
408                    Err(error) => Err((redis::ErrorKind::InvalidClientConfig, error.as_str()).into()),
409                },
410                Err(_) => Err((redis::ErrorKind::TypeError, "Not a string").into()),
411            },
412            Value::Bulk(_) => Err((redis::ErrorKind::TypeError, "Not bulk instead of stream id").into()),
413            Value::Nil => Ok(StreamId::nil()),
414            Value::Int(_) => Err((redis::ErrorKind::TypeError, "unexpected Integer").into()),
415            Value::Okay => Err((redis::ErrorKind::TypeError, "unexpected OK").into()),
416            Value::Status(_) => Err((redis::ErrorKind::TypeError, "unexpected status").into()),
417        }
418    }
419}
420
421#[derive(Debug)]
422///Group information
423pub struct GroupInfo {
424    ///Group name
425    pub name: String,
426    ///Number of consumers in the group
427    pub consumers: u64,
428    ///Number of messages that are read but yet to be acknowledged.
429    pub pending: u64,
430    ///ID of last message delivered to group's consumers
431    pub last_delivered_id: StreamId,
432    //Since redis 7.0
433    /////Some sort of read counter for last entry delivered to group's consumers
434    //pub entries_read: u64,
435    /////Number of entries in the stream that are still waiting to be delivered
436    //pub lag: u64,
437}
438
439impl GroupInfo {
440    const USER_FIELD_NAME: &'static str = "name";
441    const USER_FIELD_CONSUMERS: &'static str = "consumers";
442    const USER_FIELD_PENDING: &'static str = "pending";
443    const USER_FIELD_LAST_DELIVERED_ID: &'static str = "last-delivered-id";
444}
445
446impl FromRedisValue for GroupInfo {
447    fn from_redis_value(value: &Value) -> RedisResult<Self> {
448        match value {
449            //Always array
450            Value::Bulk(values) => {
451                let mut name = None;
452                let mut consumers = None;
453                let mut pending = None;
454                let mut last_delivered_id = None;
455
456                if values.len() < 8 {
457                    return Err((
458                        redis::ErrorKind::TypeError,
459                        "Insufficient number of values returned. Need at least 8",
460                    )
461                        .into());
462                }
463
464                for pair in values.chunks(2) {
465                    let key = parse_redis_key(&pair[0])?;
466                    let value = &pair[1];
467
468                    assign_field_if!(name = value IF key == Self::USER_FIELD_NAME);
469                    assign_field_if!(consumers = value IF key == Self::USER_FIELD_CONSUMERS);
470                    assign_field_if!(pending = value IF key == Self::USER_FIELD_PENDING);
471                    assign_field_if!(last_delivered_id = value IF key == Self::USER_FIELD_LAST_DELIVERED_ID);
472                }
473
474                let name = unwrap_required_field!(name);
475                let consumers = unwrap_required_field!(consumers);
476                let pending = unwrap_required_field!(pending);
477                let last_delivered_id = unwrap_required_field!(last_delivered_id);
478                Ok(Self {
479                    name,
480                    consumers,
481                    pending,
482                    last_delivered_id,
483                })
484            }
485            Value::Data(_) => Err((redis::ErrorKind::TypeError, "Not a pending field").into()),
486            Value::Nil => Err((redis::ErrorKind::TypeError, "unexpected null").into()),
487            Value::Int(_) => Err((redis::ErrorKind::TypeError, "unexpected Integer").into()),
488            Value::Okay => Err((redis::ErrorKind::TypeError, "unexpected OK").into()),
489            Value::Status(_) => Err((redis::ErrorKind::TypeError, "unexpected status").into()),
490        }
491    }
492}
493
494///Consumer information in `PendingStats`
495pub struct PendingConsumerStat {
496    ///Consumer name
497    pub name: String,
498    ///Number of pending messages to this user.
499    pub no_ack_num: u64,
500}
501
502impl FromRedisValue for PendingConsumerStat {
503    fn from_redis_value(value: &Value) -> Result<Self, RedisError> {
504        match value {
505            Value::Bulk(values) => {
506                if values.len() == 2 {
507                    Ok(Self {
508                        name: FromRedisValue::from_redis_value(&values[0])?,
509                        no_ack_num: FromRedisValue::from_redis_value(&values[1])?,
510                    })
511                } else {
512                    Err((
513                        redis::ErrorKind::TypeError,
514                        "PendingConsumerStat array requires 2 elements",
515                    )
516                        .into())
517                }
518            }
519            Value::Data(_) => Err((redis::ErrorKind::TypeError, "Not a PendingConsumerStat array").into()),
520            Value::Nil => Err((redis::ErrorKind::TypeError, "unexpected null").into()),
521            Value::Int(_) => Err((redis::ErrorKind::TypeError, "unexpected Integer").into()),
522            Value::Okay => Err((redis::ErrorKind::TypeError, "unexpected OK").into()),
523            Value::Status(_) => Err((redis::ErrorKind::TypeError, "unexpected status").into()),
524        }
525    }
526}
527
528///Brief output of XPENDING
529///
530///Provided when no ID is specified in arguments
531pub struct PendingStats {
532    ///Number of pending messages within group
533    ///
534    ///If len is 0, rest of fields are using default values.
535    pub len: u64,
536    ///Smallest ID among pending messages
537    pub lowest_id: StreamId,
538    ///Highest ID among pending messages
539    pub highest_id: StreamId,
540    ///Stats on every consumer within group
541    pub consumers: Vec<PendingConsumerStat>,
542}
543
544impl FromRedisValue for PendingStats {
545    fn from_redis_value(value: &Value) -> Result<Self, RedisError> {
546        match value {
547            Value::Bulk(values) => {
548                if values.len() == 4 {
549                    Ok(Self {
550                        len: FromRedisValue::from_redis_value(&values[0])?,
551                        lowest_id: FromRedisValue::from_redis_value(&values[1])?,
552                        highest_id: FromRedisValue::from_redis_value(&values[2])?,
553                        consumers: FromRedisValue::from_redis_value(&values[3])?,
554                    })
555                } else {
556                    Err((redis::ErrorKind::TypeError, "PendingStats array requires 4 elements").into())
557                }
558            }
559            Value::Data(_) => Err((redis::ErrorKind::TypeError, "Not a PendingStats array").into()),
560            //If there are no messages pending, redis returns NIL, so handle it in a more casual
561            //manner by just setting length to 0
562            Value::Nil => Ok(Self {
563                len: 0,
564                lowest_id: StreamId::nil(),
565                highest_id: StreamId::nil(),
566                consumers: Vec::new(),
567            }),
568            Value::Int(_) => Err((redis::ErrorKind::TypeError, "unexpected Integer").into()),
569            Value::Okay => Err((redis::ErrorKind::TypeError, "unexpected OK").into()),
570            Value::Status(_) => Err((redis::ErrorKind::TypeError, "unexpected status").into()),
571        }
572    }
573}
574
575#[derive(Debug, PartialEq, Eq, Clone)]
576///Entry's content
577pub struct EntryValue<T> {
578    ///Identifier, hopefully unique
579    ///
580    ///Internally encoded in LE byte order
581    pub id: uuid::Uuid,
582    ///User supplied data
583    pub payload: T,
584}
585
586impl<T> EntryValue<T> {
587    const USER_FIELD_ID: &'static str = "id";
588    const USER_FIELD_DATA: &'static str = "payload";
589}
590
591impl<T: FromRedisValue> FromRedisValue for EntryValue<T> {
592    fn from_redis_value(value: &Value) -> RedisResult<Self> {
593        match value {
594            //Stream's entry values are always map
595            Value::Bulk(values) => {
596                let mut id = None;
597                let mut payload = None;
598
599                for pair in values.chunks(2) {
600                    let key = parse_redis_key(&pair[0])?;
601                    let value = &pair[1];
602
603                    if id.is_none() && key.eq_ignore_ascii_case(Self::USER_FIELD_ID) {
604                        id = Some(value);
605                    }
606                    assign_field_if!(payload = value IF key == Self::USER_FIELD_DATA);
607                }
608
609                let id = match id {
610                    Some(id) => match id {
611                        Value::Data(data) => {
612                            let data: [u8; 16] = match data.as_slice().try_into() {
613                                Ok(data) => data,
614                                Err(_) => return Err((redis::ErrorKind::TypeError, "id field is not 16 bytes").into()),
615                            };
616                            let data = u128::from_le_bytes(data);
617                            uuid::Uuid::from_u128(data)
618                        }
619                        _ => return Err((redis::ErrorKind::TypeError, "id field is not bytes").into()),
620                    },
621                    None => return Err((redis::ErrorKind::TypeError, "Missing id field").into()),
622                };
623                let payload = unwrap_required_field!(payload);
624                Ok(Self { id, payload })
625            }
626            Value::Data(_) => Err((redis::ErrorKind::TypeError, "Not a stream values").into()),
627            Value::Nil => Err((redis::ErrorKind::TypeError, "unexpected null").into()),
628            Value::Int(_) => Err((redis::ErrorKind::TypeError, "unexpected Integer").into()),
629            Value::Okay => Err((redis::ErrorKind::TypeError, "unexpected OK").into()),
630            Value::Status(_) => Err((redis::ErrorKind::TypeError, "unexpected status").into()),
631        }
632    }
633}
634
635impl<T: ToRedisArgs> ToRedisArgs for EntryValue<T> {
636    #[inline(always)]
637    fn write_redis_args<W: ?Sized + RedisWrite>(&self, out: &mut W) {
638        Self::USER_FIELD_ID.write_redis_args(out);
639        out.write_arg(&self.id.as_u128().to_le_bytes());
640
641        Self::USER_FIELD_DATA.write_redis_args(out);
642        self.payload.write_redis_args(out);
643    }
644
645    #[inline(always)]
646    //We serialize map, so never single arg.
647    fn is_single_arg(&self) -> bool {
648        false
649    }
650}
651
652#[derive(Copy, Clone)]
653///Position within redis stream
654pub enum RangeIdx {
655    ///Special type, meaning open range.
656    Any,
657    ///Any message with specified timestamp of id.
658    Timestamp(TimestampId),
659    ///Concrete message id.
660    Id(StreamId),
661    ///Excluding variant of `Id`.
662    ///
663    ///I.e. start from `id` but do not include `id` itself.
664    ExcludeId(StreamId),
665}
666
667impl fmt::Debug for RangeIdx {
668    #[inline(always)]
669    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
670        match self {
671            Self::Any => fmt.write_str("Any"),
672            Self::Timestamp(time) => fmt::Debug::fmt(time, fmt),
673            Self::Id(id) => fmt::Debug::fmt(id, fmt),
674            Self::ExcludeId(id) => {
675                fmt.write_str("Exclude(")?;
676                fmt::Debug::fmt(id, fmt)?;
677                fmt.write_str(")")
678            }
679        }
680    }
681}
682
683#[derive(Debug, Copy, Clone)]
684///Inclusive range within stream
685pub struct Range {
686    ///Starting position
687    pub start: RangeIdx,
688    ///Ending position
689    pub end: RangeIdx,
690}
691
692impl ToRedisArgs for Range {
693    #[inline(always)]
694    fn write_redis_args<W: ?Sized + redis::RedisWrite>(&self, out: &mut W) {
695        let Self { start, end } = self;
696
697        match start {
698            RangeIdx::Any => "-".write_redis_args(out),
699            RangeIdx::Timestamp(id) => id.write_redis_args(out),
700            RangeIdx::Id(id) => id.write_redis_args(out),
701            RangeIdx::ExcludeId(id) => id.next().write_redis_args(out),
702        }
703
704        match end {
705            RangeIdx::Any => "+".write_redis_args(out),
706            RangeIdx::Timestamp(id) => id.write_redis_args(out),
707            RangeIdx::Id(id) => id.write_redis_args(out),
708            RangeIdx::ExcludeId(id) => id.prev().write_redis_args(out),
709        }
710    }
711
712    #[inline(always)]
713    fn is_single_arg(&self) -> bool {
714        false
715    }
716}
717
718///Parameters to fetch pending messages (fetched, but not consumed).
719pub struct PendingParams<'a> {
720    ///Group name.
721    ///
722    ///This is used as identifier of group of pending messages.
723    ///
724    ///When message is successfully read, it is moved inside this group.
725    ///
726    ///At any point user can `XACK` to confirm message is consumed and delete it.
727    ///
728    ///Otherwise fetch it again (in case of crash or something similar)
729    pub group: &'a str,
730    ///Range parameter
731    pub range: Range,
732    ///IDLE time filter.
733    ///
734    ///When used, filters out messages whose idle time LESS THAN specified duration.
735    pub idle: Option<time::Duration>,
736    ///Optional filter by consumer name
737    pub consumer: Option<&'a str>,
738    ///Number of messages to pick.
739    pub count: usize,
740}
741
742pub(crate) struct PendingParamsConfig<'a> {
743    pub config: &'a QueueConfig,
744    pub params: &'a PendingParams<'a>,
745}
746
747impl<'a> ToRedisArgs for PendingParamsConfig<'a> {
748    #[inline(always)]
749    fn write_redis_args<W: ?Sized + redis::RedisWrite>(&self, out: &mut W) {
750        let Self { config, params } = self;
751
752        config.stream.as_ref().write_redis_args(out);
753        params.group.write_redis_args(out);
754
755        if let Some(idle) = params.idle {
756            let idle: u64 = match idle.as_millis().try_into() {
757                Ok(idle) => idle,
758                Err(_) => u64::max_value(),
759            };
760
761            idents::IDLE.as_bytes().write_redis_args(out);
762            idle.write_redis_args(out);
763        }
764
765        params.range.write_redis_args(out);
766        params.count.write_redis_args(out);
767
768        if let Some(consumer) = &params.consumer {
769            consumer.write_redis_args(out)
770        }
771    }
772
773    #[inline(always)]
774    fn is_single_arg(&self) -> bool {
775        false
776    }
777}
778
779#[derive(Debug)]
780///Information about pending message
781pub struct PendingEntry {
782    ///Entry's id
783    pub id: StreamId,
784    ///Consumer name
785    pub consumer: String,
786    ///Duration since message was last delivered to the consumer.
787    pub last_delivered: time::Duration,
788    ///Number of times message has been delivered
789    pub count: u64,
790}
791
792impl FromRedisValue for PendingEntry {
793    fn from_redis_value(value: &Value) -> RedisResult<Self> {
794        match value {
795            //Always array
796            Value::Bulk(values) => {
797                if values.len() == 4 {
798                    Ok(Self {
799                        id: StreamId::from_redis_value(&values[0])?,
800                        consumer: String::from_redis_value(&values[1])?,
801                        last_delivered: time::Duration::from_millis(u64::from_redis_value(&values[2])?),
802                        count: u64::from_redis_value(&values[3])?,
803                    })
804                } else {
805                    Err((
806                        redis::ErrorKind::TypeError,
807                        "Invalid number of values in PendingEntry, should be 4",
808                    )
809                        .into())
810                }
811            }
812            Value::Data(_) => Err((redis::ErrorKind::TypeError, "Not a pending field").into()),
813            Value::Nil => Err((redis::ErrorKind::TypeError, "unexpected null").into()),
814            Value::Int(_) => Err((redis::ErrorKind::TypeError, "unexpected Integer").into()),
815            Value::Okay => Err((redis::ErrorKind::TypeError, "unexpected OK").into()),
816            Value::Status(_) => Err((redis::ErrorKind::TypeError, "unexpected status").into()),
817        }
818    }
819
820    #[inline]
821    fn from_byte_vec(_vec: &[u8]) -> Option<Vec<Self>> {
822        None
823    }
824}
825
826///Possible ways to read items from queue
827pub enum FetchType {
828    ///Requests new messages.
829    ///
830    ///New means not yet read by anyone.
831    New,
832    ///Attempts to fetch only pending message.
833    ///
834    ///If returns empty result, it means all messages were successfully consumed.
835    Pending,
836    ///Fetch all pending messages after specified id.
837    After(StreamId),
838}
839
840impl ToRedisArgs for FetchType {
841    #[inline(always)]
842    fn write_redis_args<W: ?Sized + RedisWrite>(&self, out: &mut W) {
843        match self {
844            Self::New => out.write_arg(b">"),
845            Self::Pending => out.write_arg(b"0"),
846            Self::After(id) => id.write_redis_args(out),
847        }
848    }
849
850    #[inline(always)]
851    fn is_single_arg(&self) -> bool {
852        true
853    }
854}
855
856///Parameters for fetch request.
857pub struct FetchParams<'a> {
858    ///Group name.
859    ///
860    ///This is used as identifier of group of pending messages.
861    ///
862    ///When message is successfully read, it is moved inside this group.
863    ///
864    ///At any point user can `XACK` to confirm message is consumed and delete it.
865    ///
866    ///Otherwise fetch it again (in case of crash or something similar)
867    pub group: &'a str,
868    ///Consumer name
869    ///
870    ///Used to identifier client reading messages
871    ///If message is successfully read from Queue, then message is moved to pending list, belonging
872    ///to this consumer.
873    ///
874    ///From now on, this message can be accessed only by this consumer.
875    pub consumer: &'a str,
876    ///Fetch type
877    pub typ: FetchType,
878    ///Number of messages to fetch maximum.
879    ///
880    ///If 0, fetches all available
881    pub count: usize,
882    ///Requests to block for specified duration in milliseconds.
883    pub timeout: Option<time::Duration>,
884}
885
886pub(crate) struct FetchParamsConfig<'a> {
887    pub config: &'a QueueConfig,
888    pub params: &'a FetchParams<'a>,
889}
890
891impl<'a> ToRedisArgs for FetchParamsConfig<'a> {
892    #[inline(always)]
893    fn write_redis_args<W: ?Sized + RedisWrite>(&self, out: &mut W) {
894        let Self { config, params } = self;
895
896        out.write_arg(idents::GROUP.as_bytes());
897        params.group.write_redis_args(out);
898        params.consumer.write_redis_args(out);
899        if params.count > 0 {
900            out.write_arg(idents::COUNT.as_bytes());
901            params.count.write_redis_args(out);
902        }
903        if let Some(timeout) = params.timeout {
904            out.write_arg(idents::BLOCK.as_bytes());
905            (timeout.as_millis() as u64).write_redis_args(out);
906        }
907
908        out.write_arg(idents::STREAMS.as_bytes());
909        config.stream.as_ref().write_redis_args(out);
910
911        params.typ.write_redis_args(out);
912    }
913
914    #[inline(always)]
915    fn is_single_arg(&self) -> bool {
916        false
917    }
918}
919
920#[derive(Debug, PartialEq, Eq, Clone)]
921///Queue's entry
922pub struct Entry<T> {
923    ///Entry's id
924    pub id: StreamId,
925    ///Entry's value
926    pub value: EntryValue<T>,
927}
928
929impl<T: FromRedisValue> FromRedisValue for Entry<T> {
930    fn from_redis_value(value: &Value) -> RedisResult<Self> {
931        match value {
932            //Map of values is always encoded as sequence of items
933            Value::Bulk(values) => {
934                //Stream is always consist of 2 parts:
935                //1. Identifier
936                //2. Values
937                if values.len() == 2 {
938                    Ok(Self {
939                        id: StreamId::from_redis_value(&values[0])?,
940                        value: EntryValue::<T>::from_redis_value(&values[1])?,
941                    })
942                } else {
943                    Err((
944                        redis::ErrorKind::TypeError,
945                        "Invalid number of values in entry, should be 2",
946                    )
947                        .into())
948                }
949            }
950            Value::Data(_) => Err((redis::ErrorKind::TypeError, "Not a stream entry").into()),
951            Value::Nil => Err((redis::ErrorKind::TypeError, "unexpected null").into()),
952            Value::Int(_) => Err((redis::ErrorKind::TypeError, "unexpected Integer").into()),
953            Value::Okay => Err((redis::ErrorKind::TypeError, "unexpected OK").into()),
954            Value::Status(_) => Err((redis::ErrorKind::TypeError, "unexpected status").into()),
955        }
956    }
957
958    fn from_byte_vec(_vec: &[u8]) -> Option<Vec<Self>> {
959        None
960    }
961}
962
963///Result of fetch operation.
964pub struct FetchResult<T> {
965    ///Name of stream from where message comes
966    pub stream: String,
967    ///Stream's content
968    pub entries: Vec<Entry<T>>,
969}
970
971impl<T: FromRedisValue> FromRedisValue for FetchResult<T> {
972    fn from_redis_value(value: &Value) -> RedisResult<Self> {
973        match value {
974            //Map of values is always encoded as sequence of items
975            Value::Bulk(values) => {
976                //Stream is always consist of 2 parts:
977                //1. Identifier
978                //2. Values
979                if values.len() == 2 {
980                    Ok(Self {
981                        stream: String::from_redis_value(&values[0])?,
982                        entries: Vec::<Entry<T>>::from_redis_value(&values[1])?,
983                    })
984                } else {
985                    Err((redis::ErrorKind::TypeError, "Invalid number of values in entry, should be 2").into())
986                }
987            }
988            Value::Data(_) => Err((redis::ErrorKind::TypeError, "Not a stream entry").into()),
989            Value::Nil => Err((redis::ErrorKind::TypeError, "unexpected null").into()),
990            Value::Int(_) => Err((redis::ErrorKind::TypeError, "unexpected Integer").into()),
991            Value::Okay => Err((redis::ErrorKind::TypeError, "unexpected OK").into()),
992            Value::Status(_) => Err((redis::ErrorKind::TypeError, "unexpected status").into()),
993        }
994    }
995
996    fn from_byte_vec(_vec: &[u8]) -> Option<Vec<Self>> {
997        None
998    }
999}
1000
1001///Result of fetch operation.
1002pub struct FetchEntries<T> {
1003    ///Stream's content
1004    pub entries: Vec<Entry<T>>,
1005}
1006
1007impl<T: FromRedisValue> FromRedisValue for FetchEntries<T> {
1008    fn from_redis_value(value: &Value) -> RedisResult<Self> {
1009        match value {
1010            //Map of values is always encoded as sequence of items
1011            Value::Bulk(values) => {
1012                //Stream is always consist of 2 parts:
1013                //1. Identifier
1014                //2. Values
1015                if values.len() == 2 {
1016                    Ok(Self {
1017                        entries: Vec::<Entry<T>>::from_redis_value(&values[1])?,
1018                    })
1019                } else {
1020                    Err((redis::ErrorKind::TypeError, "Invalid number of values in entry, should be 2").into())
1021                }
1022            }
1023            Value::Data(_) => Err((redis::ErrorKind::TypeError, "Not a stream entry").into()),
1024            Value::Nil => Err((redis::ErrorKind::TypeError, "unexpected null").into()),
1025            Value::Int(_) => Err((redis::ErrorKind::TypeError, "unexpected Integer").into()),
1026            Value::Okay => Err((redis::ErrorKind::TypeError, "unexpected OK").into()),
1027            Value::Status(_) => Err((redis::ErrorKind::TypeError, "unexpected status").into()),
1028        }
1029    }
1030
1031    fn from_byte_vec(_vec: &[u8]) -> Option<Vec<Self>> {
1032        None
1033    }
1034}