redis_driver/commands/
stream_commands.rs

1use std::collections::HashMap;
2
3use crate::{
4    prepare_command,
5    resp::{
6        cmd, CommandArg, CommandArgs, FromKeyValueValueArray, FromValue, HashMapExt, IntoArgs,
7        KeyValueArgOrCollection, SingleArgOrCollection, Value,
8    },
9    PreparedCommand, Result,
10};
11
12/// A group of Redis commands related to [`Streams`](https://redis.io/docs/data-types/streams/)
13/// # See Also
14/// [Redis Generic Commands](https://redis.io/commands/?group=stream)
15/// [Streams tutorial](https://redis.io/docs/data-types/streams-tutorial/)
16pub trait StreamCommands {
17    /// The XACK command removes one or multiple messages
18    /// from the Pending Entries List (PEL) of a stream consumer group
19    ///
20    /// # Return
21    /// The command returns the number of messages successfully acknowledged.
22    /// Certain message IDs may no longer be part of the PEL (for example because they have already been acknowledged),
23    /// and XACK will not count them as successfully acknowledged.
24    ///
25    /// # See Also
26    /// [<https://redis.io/commands/xack/>](https://redis.io/commands/xack/)
27    fn xack<K, G, I, II>(&mut self, key: K, group: G, ids: II) -> PreparedCommand<Self, usize>
28    where
29        Self: Sized,
30        K: Into<CommandArg>,
31        G: Into<CommandArg>,
32        I: Into<CommandArg>,
33        II: SingleArgOrCollection<I>,
34    {
35        prepare_command(self, cmd("XACK").arg(key).arg(group).arg(ids))
36    }
37
38    /// Appends the specified stream entry to the stream at the specified key.
39    ///
40    /// # Return
41    /// the ID of the added entry.
42    ///
43    /// The ID is the one auto-generated if * is passed as ID argument,
44    /// otherwise the command just returns the same ID specified by the user during insertion.
45    ///
46    /// The command returns a Null reply when used with create_stream=false and the key doesn't exist.
47    ///
48    /// # See Also
49    /// [<https://redis.io/commands/xadd/>](https://redis.io/commands/xadd/)
50    fn xadd<K, I, F, V, FFVV, R>(
51        &mut self,
52        key: K,
53        stream_id: I,
54        items: FFVV,
55        options: XAddOptions,
56    ) -> PreparedCommand<Self, R>
57    where
58        Self: Sized,
59        K: Into<CommandArg>,
60        I: Into<CommandArg>,
61        F: Into<CommandArg>,
62        V: Into<CommandArg>,
63        FFVV: KeyValueArgOrCollection<F, V>,
64        R: FromValue,
65    {
66        prepare_command(
67            self,
68            cmd("XADD").arg(key).arg(options).arg(stream_id).arg(items),
69        )
70    }
71
72    /// This command transfers ownership of pending stream entries that match the specified criteria.
73    ///
74    /// # Return
75    /// An instance of StreamAutoClaimResult
76    ///
77    /// # See Also
78    /// [<https://redis.io/commands/xautoclaim/>](https://redis.io/commands/xautoclaim/)
79    fn xautoclaim<K, G, C, I, V>(
80        &mut self,
81        key: K,
82        group: G,
83        consumer: C,
84        min_idle_time: u64,
85        start: I,
86        options: XAutoClaimOptions,
87    ) -> PreparedCommand<Self, XAutoClaimResult<V>>
88    where
89        Self: Sized,
90        K: Into<CommandArg>,
91        G: Into<CommandArg>,
92        C: Into<CommandArg>,
93        I: Into<CommandArg>,
94        V: FromValue,
95    {
96        prepare_command(
97            self,
98            cmd("XAUTOCLAIM")
99                .arg(key)
100                .arg(group)
101                .arg(consumer)
102                .arg(min_idle_time)
103                .arg(start)
104                .arg(options),
105        )
106    }
107
108    /// In the context of a stream consumer group, this command changes the ownership of a pending message,
109    /// so that the new owner is the consumer specified as the command argument.
110    ///
111    /// # Return
112    /// The ID of the added entry.
113    ///
114    /// The ID is the one auto-generated if * is passed as ID argument,
115    /// otherwise the command just returns the same ID specified by the user during insertion.
116    ///
117    /// The command returns a Null reply when used with create_stream=false and the key doesn't exist.
118    ///
119    /// # See Also
120    /// [<https://redis.io/commands/xclaim/>](https://redis.io/commands/xclaim/)
121    fn xclaim<K, G, C, I, II, V>(
122        &mut self,
123        key: K,
124        group: G,
125        consumer: C,
126        min_idle_time: u64,
127        ids: II,
128        options: XClaimOptions,
129    ) -> PreparedCommand<Self, Vec<StreamEntry<V>>>
130    where
131        Self: Sized,
132        K: Into<CommandArg>,
133        G: Into<CommandArg>,
134        C: Into<CommandArg>,
135        I: Into<CommandArg>,
136        II: SingleArgOrCollection<I>,
137        V: FromValue,
138    {
139        prepare_command(
140            self,
141            cmd("XCLAIM")
142                .arg(key)
143                .arg(group)
144                .arg(consumer)
145                .arg(min_idle_time)
146                .arg(ids)
147                .arg(options),
148        )
149    }
150
151    /// Removes the specified entries from a stream, and returns the number of entries deleted.
152    ///
153    /// # Return
154    /// The number of entries actually deleted.
155    ///
156    /// # See Also
157    /// [<https://redis.io/commands/xdel/>](https://redis.io/commands/xdel/)
158    fn xdel<K, I, II>(&mut self, key: K, ids: II) -> PreparedCommand<Self, usize>
159    where
160        Self: Sized,
161        K: Into<CommandArg>,
162        I: Into<CommandArg>,
163        II: SingleArgOrCollection<I>,
164    {
165        prepare_command(self, cmd("XDEL").arg(key).arg(ids))
166    }
167
168    /// This command creates a new consumer group uniquely identified by <groupname> for the stream stored at <key>.
169    ///
170    /// # Return
171    /// * `true` success
172    /// * `false`failure
173    ///
174    /// # See Also
175    /// [<https://redis.io/commands/xgroup-create/>](https://redis.io/commands/xgroup-create/)
176    fn xgroup_create<K, G, I>(
177        &mut self,
178        key: K,
179        groupname: G,
180        id: I,
181        options: XGroupCreateOptions,
182    ) -> PreparedCommand<Self, bool>
183    where
184        Self: Sized,
185        K: Into<CommandArg>,
186        G: Into<CommandArg>,
187        I: Into<CommandArg>,
188    {
189        prepare_command(
190            self,
191            cmd("XGROUP")
192                .arg("CREATE")
193                .arg(key)
194                .arg(groupname)
195                .arg(id)
196                .arg(options),
197        )
198    }
199
200    /// Create a consumer named `consumername` in the consumer group `groupname``
201    /// of the stream that's stored at `key.
202    ///
203    /// # Return
204    /// * `true` success
205    /// * `false`failure
206    ///
207    /// # See Also
208    /// [<https://redis.io/commands/xgroup-createconsumer/>](https://redis.io/commands/xgroup-createconsumer/)
209    fn xgroup_createconsumer<K, G, C>(
210        &mut self,
211        key: K,
212        groupname: G,
213        consumername: C,
214    ) -> PreparedCommand<Self, bool>
215    where
216        Self: Sized,
217        K: Into<CommandArg>,
218        G: Into<CommandArg>,
219        C: Into<CommandArg>,
220    {
221        prepare_command(
222            self,
223            cmd("XGROUP")
224                .arg("CREATECONSUMER")
225                .arg(key)
226                .arg(groupname)
227                .arg(consumername),
228        )
229    }
230
231    /// The XGROUP DELCONSUMER command deletes a consumer from the consumer group.
232    ///
233    /// # Return
234    /// The number of pending messages that the consumer had before it was deleted
235    ///
236    /// # See Also
237    /// [<https://redis.io/commands/xgroup-delconsumer/>](https://redis.io/commands/xgroup-delconsumer/)
238    fn xgroup_delconsumer<K, G, C>(
239        &mut self,
240        key: K,
241        groupname: G,
242        consumername: C,
243    ) -> PreparedCommand<Self, usize>
244    where
245        Self: Sized,
246        K: Into<CommandArg>,
247        G: Into<CommandArg>,
248        C: Into<CommandArg>,
249    {
250        prepare_command(
251            self,
252            cmd("XGROUP")
253                .arg("DELCONSUMER")
254                .arg(key)
255                .arg(groupname)
256                .arg(consumername),
257        )
258    }
259
260    /// The XGROUP DESTROY command completely destroys a consumer group.
261    ///
262    /// # Return
263    /// * `true` success
264    /// * `false`failure
265    ///
266    /// # See Also
267    /// [<https://redis.io/commands/xgroup-destroy/>](https://redis.io/commands/xgroup-destroy/)
268    fn xgroup_destroy<K, G>(&mut self, key: K, groupname: G) -> PreparedCommand<Self, bool>
269    where
270        Self: Sized,
271        K: Into<CommandArg>,
272        G: Into<CommandArg>,
273    {
274        prepare_command(self, cmd("XGROUP").arg("DESTROY").arg(key).arg(groupname))
275    }
276
277    /// Set the last delivered ID for a consumer group.
278    ///
279    /// # See Also
280    /// [<https://redis.io/commands/xgroup-setid/>](https://redis.io/commands/xgroup-setid/)
281    fn xgroup_setid<K, G, I>(
282        &mut self,
283        key: K,
284        groupname: G,
285        id: I,
286        entries_read: Option<usize>,
287    ) -> PreparedCommand<Self, ()>
288    where
289        Self: Sized,
290        K: Into<CommandArg>,
291        G: Into<CommandArg>,
292        I: Into<CommandArg>,
293    {
294        prepare_command(
295            self,
296            cmd("XGROUP")
297                .arg("SETID")
298                .arg(key)
299                .arg(groupname)
300                .arg(id)
301                .arg(entries_read.map(|e| ("ENTRIESREAD", e))),
302        )
303    }
304
305    /// This command returns the list of consumers that belong to the `groupname` consumer group of the stream stored at `key`.
306    ///
307    /// # Return
308    /// A collection of XConsumerInfo.
309    ///
310    /// # See Also
311    /// [<https://redis.io/commands/xinfo-consumers/>](https://redis.io/commands/xinfo-consumers/)
312    fn xinfo_consumers<K, G>(
313        &mut self,
314        key: K,
315        groupname: G,
316    ) -> PreparedCommand<Self, Vec<XConsumerInfo>>
317    where
318        Self: Sized,
319        K: Into<CommandArg>,
320        G: Into<CommandArg>,
321    {
322        prepare_command(self, cmd("XINFO").arg("CONSUMERS").arg(key).arg(groupname))
323    }
324
325    /// This command returns the list of consumers that belong
326    /// to the `groupname` consumer group of the stream stored at `key`.
327    ///
328    /// # Return
329    /// A collection of XGroupInfo.
330    ///
331    /// # See Also
332    /// [<https://redis.io/commands/xinfo-groups/>](https://redis.io/commands/xinfo-groups/)
333    fn xinfo_groups<K>(&mut self, key: K) -> PreparedCommand<Self, Vec<XGroupInfo>>
334    where
335        Self: Sized,
336        K: Into<CommandArg>,
337    {
338        prepare_command(self, cmd("XINFO").arg("GROUPS").arg(key))
339    }
340
341    /// This command returns information about the stream stored at `key`.
342    ///
343    /// # Return
344    /// A collection of XGroupInfo.
345    ///
346    /// # See Also
347    /// [<https://redis.io/commands/xinfo-stream/>](https://redis.io/commands/xinfo-stream/)
348    fn xinfo_stream<K>(
349        &mut self,
350        key: K,
351        options: XInfoStreamOptions,
352    ) -> PreparedCommand<Self, XStreamInfo>
353    where
354        Self: Sized,
355        K: Into<CommandArg>,
356    {
357        prepare_command(self, cmd("XINFO").arg("STREAM").arg(key).arg(options))
358    }
359
360    /// Returns the number of entries inside a stream.
361    ///
362    /// # Return
363    /// The number of entries of the stream at `key`.
364    ///
365    /// # See Also
366    /// [<https://redis.io/commands/xrange/>](https://redis.io/commands/xrange/)
367    fn xlen<K>(&mut self, key: K) -> PreparedCommand<Self, usize>
368    where
369        Self: Sized,
370        K: Into<CommandArg>,
371    {
372        prepare_command(self, cmd("XLEN").arg(key))
373    }
374
375    /// The XPENDING command is the interface to inspect the list of pending messages.
376    ///
377    /// # See Also
378    /// [<https://redis.io/commands/xpending/>](https://redis.io/commands/xpending/)
379    fn xpending<K, G>(&mut self, key: K, group: G) -> PreparedCommand<Self, XPendingResult>
380    where
381        Self: Sized,
382        K: Into<CommandArg>,
383        G: Into<CommandArg>,
384    {
385        prepare_command(self, cmd("XPENDING").arg(key).arg(group))
386    }
387
388    /// The XPENDING command is the interface to inspect the list of pending messages.
389    ///
390    /// # See Also
391    /// [<https://redis.io/commands/xpending/>](https://redis.io/commands/xpending/)
392    fn xpending_with_options<K, G>(
393        &mut self,
394        key: K,
395        group: G,
396        options: XPendingOptions,
397    ) -> PreparedCommand<Self, Vec<XPendingMessageResult>>
398    where
399        Self: Sized,
400        K: Into<CommandArg>,
401        G: Into<CommandArg>,
402    {
403        prepare_command(self, cmd("XPENDING").arg(key).arg(group).arg(options))
404    }
405
406    /// The command returns the stream entries matching a given range of IDs.
407    ///
408    /// # Return
409    /// A collection of StreamEntry
410    ///
411    /// The command returns the entries with IDs matching the specified range.
412    /// The returned entries are complete, that means that the ID and all the fields they are composed are returned.
413    /// Moreover, the entries are returned with their fields and values in the exact same order as XADD added them.
414    ///
415    /// # See Also
416    /// [<https://redis.io/commands/xrange/>](https://redis.io/commands/xrange/)
417    fn xrange<K, S, E, V>(
418        &mut self,
419        key: K,
420        start: S,
421        end: E,
422        count: Option<usize>,
423    ) -> PreparedCommand<Self, Vec<StreamEntry<V>>>
424    where
425        Self: Sized,
426        K: Into<CommandArg>,
427        S: Into<CommandArg>,
428        E: Into<CommandArg>,
429        V: FromValue,
430    {
431        prepare_command(
432            self,
433            cmd("XRANGE")
434                .arg(key)
435                .arg(start)
436                .arg(end)
437                .arg(count.map(|c| ("COUNT", c))),
438        )
439    }
440
441    /// Read data from one or multiple streams,
442    /// only returning entries with an ID greater than the last received ID reported by the caller.
443    ///
444    /// # Return
445    /// A collection of XReadStreamResult
446    ///
447    /// # See Also
448    /// [<https://redis.io/commands/xread/>](https://redis.io/commands/xread/)
449    fn xread<K, KK, I, II, V, R>(
450        &mut self,
451        options: XReadOptions,
452        keys: KK,
453        ids: II,
454    ) -> PreparedCommand<Self, R>
455    where
456        Self: Sized,
457        K: Into<CommandArg>,
458        KK: SingleArgOrCollection<K>,
459        I: Into<CommandArg>,
460        II: SingleArgOrCollection<I>,
461        V: FromValue,
462        R: FromKeyValueValueArray<String, Vec<StreamEntry<V>>>,
463    {
464        prepare_command(
465            self,
466            cmd("XREAD").arg(options).arg("STREAMS").arg(keys).arg(ids),
467        )
468    }
469
470    /// The XREADGROUP command is a special version of the [`xread`](crate::StreamCommands::xread)
471    /// command with support for consumer groups.
472    ///
473    /// # Return
474    /// A collection of XReadStreamResult
475    ///
476    /// # See Also
477    /// [<https://redis.io/commands/xreadgroup/>](https://redis.io/commands/xreadgroup/)
478    fn xreadgroup<G, C, K, KK, I, II, V, R>(
479        &mut self,
480        group: G,
481        consumer: C,
482        options: XReadGroupOptions,
483        keys: KK,
484        ids: II,
485    ) -> PreparedCommand<Self, R>
486    where
487        Self: Sized,
488        G: Into<CommandArg>,
489        C: Into<CommandArg>,
490        K: Into<CommandArg>,
491        KK: SingleArgOrCollection<K>,
492        I: Into<CommandArg>,
493        II: SingleArgOrCollection<I>,
494        V: FromValue,
495        R: FromKeyValueValueArray<String, Vec<StreamEntry<V>>>,
496    {
497        prepare_command(
498            self,
499            cmd("XREADGROUP")
500                .arg("GROUP")
501                .arg(group)
502                .arg(consumer)
503                .arg(options)
504                .arg("STREAMS")
505                .arg(keys)
506                .arg(ids),
507        )
508    }
509
510    /// This command is exactly like [`xrange`](crate::StreamCommands::xrange),
511    /// but with the notable difference of returning the entries in reverse order,
512    /// and also taking the start-end range in reverse order
513    ///
514    /// # Return
515    /// A collection of StreamEntry
516    ///
517    /// # See Also
518    /// [<https://redis.io/commands/xrevrange/>](https://redis.io/commands/xrevrange/)
519    fn xrevrange<K, E, S, V>(
520        &mut self,
521        key: K,
522        end: E,
523        start: S,
524        count: Option<usize>,
525    ) -> PreparedCommand<Self, Vec<StreamEntry<V>>>
526    where
527        Self: Sized,
528        K: Into<CommandArg>,
529        E: Into<CommandArg>,
530        S: Into<CommandArg>,
531        V: FromValue,
532    {
533        prepare_command(
534            self,
535            cmd("XREVRANGE")
536                .arg(key)
537                .arg(end)
538                .arg(start)
539                .arg(count.map(|c| ("COUNT", c))),
540        )
541    }
542
543    /// XTRIM trims the stream by evicting older entries (entries with lower IDs) if needed.
544    ///
545    /// # Return
546    /// The number of entries deleted from the stream.
547    ///
548    /// # See Also
549    /// [<https://redis.io/commands/xtrim/>](https://redis.io/commands/xtrim/)
550    fn xtrim<K>(&mut self, key: K, options: XTrimOptions) -> PreparedCommand<Self, usize>
551    where
552        Self: Sized,
553        K: Into<CommandArg>,
554    {
555        prepare_command(self, cmd("XTRIM").arg(key).arg(options))
556    }
557}
558
559/// Stream Add options for the [`xadd`](crate::StreamCommands::xadd) command.
560#[derive(Default)]
561pub struct XAddOptions {
562    command_args: CommandArgs,
563}
564
565impl XAddOptions {
566    #[must_use]
567    pub fn no_mk_stream(self) -> Self {
568        Self {
569            command_args: self.command_args.arg("NOMKSTREAM"),
570        }
571    }
572
573    #[must_use]
574    pub fn trim_options(self, trim_options: XTrimOptions) -> Self {
575        Self {
576            command_args: self.command_args.arg(trim_options),
577        }
578    }
579}
580
581impl IntoArgs for XAddOptions {
582    fn into_args(self, args: CommandArgs) -> CommandArgs {
583        args.arg(self.command_args)
584    }
585}
586
587/// Stream Trim operator for the [`xadd`](crate::StreamCommands::xadd)
588/// and [`xtrim`](crate::StreamCommands::xtrim) commands
589pub enum XTrimOperator {
590    None,
591    /// =
592    Equal,
593    /// ~
594    Approximately,
595}
596
597impl IntoArgs for XTrimOperator {
598    fn into_args(self, args: CommandArgs) -> CommandArgs {
599        match self {
600            XTrimOperator::None => args,
601            XTrimOperator::Equal => args.arg(CommandArg::Str("=")),
602            XTrimOperator::Approximately => args.arg(CommandArg::Str("~")),
603        }
604    }
605}
606
607impl Default for XTrimOperator {
608    fn default() -> Self {
609        XTrimOperator::None
610    }
611}
612
613/// Stream Trim options for the [`xadd`](crate::StreamCommands::xadd)
614/// and [`xtrim`](crate::StreamCommands::xtrim) commands
615#[derive(Default)]
616pub struct XTrimOptions {
617    command_args: CommandArgs,
618}
619
620impl XTrimOptions {
621    #[must_use]
622    pub fn max_len(operator: XTrimOperator, threshold: i64) -> Self {
623        Self {
624            command_args: CommandArgs::default()
625                .arg("MAXLEN")
626                .arg(operator)
627                .arg(threshold),
628        }
629    }
630
631    #[must_use]
632    pub fn min_id<I: Into<CommandArg>>(operator: XTrimOperator, threshold_id: I) -> Self {
633        Self {
634            command_args: CommandArgs::default()
635                .arg("MINID")
636                .arg(operator)
637                .arg(threshold_id),
638        }
639    }
640
641    #[must_use]
642    pub fn limit(self, count: usize) -> Self {
643        Self {
644            command_args: self.command_args.arg("LIMIT").arg(count),
645        }
646    }
647}
648
649impl IntoArgs for XTrimOptions {
650    fn into_args(self, args: CommandArgs) -> CommandArgs {
651        args.arg(self.command_args)
652    }
653}
654
655/// Options for the [`xautoclaim`](crate::StreamCommands::xautoclaim) command
656#[derive(Default)]
657pub struct XAutoClaimOptions {
658    command_args: CommandArgs,
659}
660
661impl XAutoClaimOptions {
662    #[must_use]
663    pub fn count(self, count: usize) -> Self {
664        Self {
665            command_args: self.command_args.arg("COUNT").arg(count),
666        }
667    }
668
669    #[must_use]
670    pub fn just_id(self) -> Self {
671        Self {
672            command_args: self.command_args.arg("JUSTID"),
673        }
674    }
675}
676
677impl IntoArgs for XAutoClaimOptions {
678    fn into_args(self, args: CommandArgs) -> CommandArgs {
679        args.arg(self.command_args)
680    }
681}
682
683pub struct StreamEntry<V>
684where
685    V: FromValue,
686{
687    pub stream_id: String,
688    pub items: HashMap<String, V>,
689}
690
691impl<V> FromValue for StreamEntry<V>
692where
693    V: FromValue,
694{
695    fn from_value(value: Value) -> Result<Self> {
696        let (stream_id, items): (String, HashMap<String, V>) = value.into()?;
697        Ok(Self { stream_id, items })
698    }
699}
700
701pub struct XAutoClaimResult<V>
702where
703    V: FromValue,
704{
705    pub start_stream_id: String,
706    pub entries: Vec<StreamEntry<V>>,
707    pub deleted_id: Vec<String>,
708}
709
710impl<V> FromValue for XAutoClaimResult<V>
711where
712    V: FromValue,
713{
714    fn from_value(value: Value) -> Result<Self> {
715        let (start_stream_id, entries, deleted_id): (String, Vec<StreamEntry<V>>, Vec<String>) =
716            value.into()?;
717        Ok(Self {
718            start_stream_id,
719            entries,
720            deleted_id,
721        })
722    }
723}
724
725/// Options for the [`xclaim`](crate::StreamCommands::xclaim) command
726#[derive(Default)]
727pub struct XClaimOptions {
728    command_args: CommandArgs,
729}
730
731impl XClaimOptions {
732    /// Set the idle time (last time it was delivered) of the message.
733    #[must_use]
734    pub fn idle_time(self, idle_time_millis: u64) -> Self {
735        Self {
736            command_args: self.command_args.arg("IDLE").arg(idle_time_millis),
737        }
738    }
739
740    ///  This is the same as `idle_time` but instead of a relative amount of milliseconds,
741    /// it sets the idle time to a specific Unix time (in milliseconds).
742    #[must_use]
743    pub fn time(self, unix_time_milliseconds: u64) -> Self {
744        Self {
745            command_args: self.command_args.arg("TIME").arg(unix_time_milliseconds),
746        }
747    }
748
749    /// Set the retry counter to the specified value.
750    #[must_use]
751    pub fn retry_count(self, count: usize) -> Self {
752        Self {
753            command_args: self.command_args.arg("RETRYCOUNT").arg(count),
754        }
755    }
756
757    /// Creates the pending message entry in the PEL
758    /// even if certain specified IDs are not already
759    /// in the PEL assigned to a different client.
760    #[must_use]
761    pub fn force(self) -> Self {
762        Self {
763            command_args: self.command_args.arg("FORCE"),
764        }
765    }
766
767    ///  Return just an array of IDs of messages successfully claimed,
768    /// without returning the actual message.
769    #[must_use]
770    pub fn just_id(self) -> Self {
771        Self {
772            command_args: self.command_args.arg("JUSTID"),
773        }
774    }
775}
776
777impl IntoArgs for XClaimOptions {
778    fn into_args(self, args: CommandArgs) -> CommandArgs {
779        args.arg(self.command_args)
780    }
781}
782
783/// Options for the [`xgroup_create`](crate::StreamCommands::xgroup_create) command
784#[derive(Default)]
785pub struct XGroupCreateOptions {
786    command_args: CommandArgs,
787}
788
789impl XGroupCreateOptions {
790    /// By default, the XGROUP CREATE command insists that the target stream exists and returns an error when it doesn't.
791    ///  However, you can use the optional MKSTREAM subcommand as the last argument after the `id`
792    /// to automatically create the stream (with length of 0) if it doesn't exist
793    #[must_use]
794    pub fn mk_stream(self) -> Self {
795        Self {
796            command_args: self.command_args.arg("MKSTREAM"),
797        }
798    }
799
800    /// The optional entries_read named argument can be specified to enable consumer group lag tracking for an arbitrary ID.
801    /// An arbitrary ID is any ID that isn't the ID of the stream's first entry, its last entry or the zero ("0-0") ID.
802    /// This can be useful you know exactly how many entries are between the arbitrary ID (excluding it) and the stream's last entry.
803    /// In such cases, the entries_read can be set to the stream's entries_added subtracted with the number of entries.
804    #[must_use]
805    pub fn entries_read(self, entries_read: usize) -> Self {
806        Self {
807            command_args: self.command_args.arg("ENTRIESREAD").arg(entries_read),
808        }
809    }
810}
811
812impl IntoArgs for XGroupCreateOptions {
813    fn into_args(self, args: CommandArgs) -> CommandArgs {
814        args.arg(self.command_args)
815    }
816}
817
818/// Result entry for the [`xinfo_consumers`](crate::StreamCommands::xinfo_consumers) command.
819pub struct XConsumerInfo {
820    /// the consumer's name
821    pub name: String,
822
823    /// the number of pending messages for the client,
824    /// which are messages that were delivered but are yet to be acknowledged
825    pub pending: usize,
826
827    /// the number of milliseconds that have passed
828    /// since the consumer last interacted with the server
829    pub idle_millis: u64,
830}
831
832impl FromValue for XConsumerInfo {
833    fn from_value(value: Value) -> Result<Self> {
834        let mut values: HashMap<String, Value> = value.into()?;
835
836        Ok(Self {
837            name: values.remove_with_result("name")?.into()?,
838            pending: values.remove_with_result("pending")?.into()?,
839            idle_millis: values.remove_with_result("idle")?.into()?,
840        })
841    }
842}
843
844/// Result entry for the [`xinfo_groups`](crate::StreamCommands::xinfo_groups) command.
845pub struct XGroupInfo {
846    /// the consumer group's name
847    pub name: String,
848
849    /// the number of consumers in the group
850    pub consumers: usize,
851
852    /// the length of the group's pending entries list (PEL),
853    /// which are messages that were delivered but are yet to be acknowledged
854    pub pending: usize,
855
856    /// the ID of the last entry delivered the group's consumers
857    pub last_delivered_id: String,
858
859    /// the logical "read counter" of the last entry delivered to group's consumers
860    pub entries_read: Option<usize>,
861
862    /// the number of entries in the stream that are still waiting to be delivered to the group's consumers,
863    /// or a NULL when that number can't be determined.
864    pub lag: Option<usize>,
865}
866
867impl FromValue for XGroupInfo {
868    fn from_value(value: Value) -> Result<Self> {
869        let mut values: HashMap<String, Value> = value.into()?;
870
871        Ok(Self {
872            name: values.remove_with_result("name")?.into()?,
873            consumers: values.remove_with_result("consumers")?.into()?,
874            pending: values.remove_with_result("pending")?.into()?,
875            last_delivered_id: values.remove_with_result("last-delivered-id")?.into()?,
876            entries_read: values.remove_with_result("entries-read")?.into()?,
877            lag: values.remove_with_result("lag")?.into()?,
878        })
879    }
880}
881
882/// Options for the [`xinfo_stream`](crate::StreamCommands::xinfo_stream) command
883#[derive(Default)]
884pub struct XInfoStreamOptions {
885    command_args: CommandArgs,
886}
887
888impl XInfoStreamOptions {
889    /// The optional FULL modifier provides a more verbose reply.
890    #[must_use]
891    pub fn full(self) -> Self {
892        Self {
893            command_args: self.command_args.arg("FULL"),
894        }
895    }
896
897    /// The COUNT option can be used to limit the number of stream and PEL entries that are returned
898    /// (The first `count` entries are returned).
899    #[must_use]
900    pub fn count(self, count: usize) -> Self {
901        Self {
902            command_args: self.command_args.arg("COUNT").arg(count),
903        }
904    }
905}
906
907impl IntoArgs for XInfoStreamOptions {
908    fn into_args(self, args: CommandArgs) -> CommandArgs {
909        args.arg(self.command_args)
910    }
911}
912
913/// Stream info returned by the [`xinfo_stream`](crate::StreamCommands::xinfo_stream) command.
914pub struct XStreamInfo {
915    /// the number of entries in the stream (see [`xlen`](crate::StreamCommands::xlen))
916    pub length: usize,
917
918    /// the number of keys in the underlying radix data structure
919    pub radix_tree_keys: usize,
920
921    /// the number of nodes in the underlying radix data structure
922    pub radix_tree_nodes: usize,
923
924    /// the number of consumer groups defined for the stream
925    pub groups: usize,
926
927    /// the ID of the least-recently entry that was added to the stream
928    pub last_generated_id: String,
929
930    /// the maximal entry ID that was deleted from the stream
931    pub max_deleted_entry_id: String,
932
933    /// the count of all entries added to the stream during its lifetime
934    pub entries_added: usize,
935
936    /// the ID and field-value tuples of the first entry in the stream
937    pub first_entry: StreamEntry<String>,
938
939    /// the ID and field-value tuples of the last entry in the stream
940    pub last_entry: StreamEntry<String>,
941
942    pub recorded_first_entry_id: String,
943}
944
945impl FromValue for XStreamInfo {
946    fn from_value(value: Value) -> Result<Self> {
947        let mut values: HashMap<String, Value> = value.into()?;
948
949        Ok(Self {
950            length: values.remove_with_result("length")?.into()?,
951            radix_tree_keys: values.remove_with_result("radix-tree-keys")?.into()?,
952            radix_tree_nodes: values.remove_with_result("radix-tree-nodes")?.into()?,
953            groups: values.remove_with_result("groups")?.into()?,
954            last_generated_id: values.remove_with_result("last-generated-id")?.into()?,
955            max_deleted_entry_id: values.remove_with_result("max-deleted-entry-id")?.into()?,
956            entries_added: values.remove_with_result("entries-added")?.into()?,
957            first_entry: values.remove_with_result("first-entry")?.into()?,
958            last_entry: values.remove_with_result("last-entry")?.into()?,
959            recorded_first_entry_id: values
960                .remove_with_result("recorded-first-entry-id")?
961                .into()?,
962        })
963    }
964}
965
966/// Options for the [`xread`](crate::StreamCommands::xread) command
967#[derive(Default)]
968pub struct XReadOptions {
969    command_args: CommandArgs,
970}
971
972impl XReadOptions {
973    #[must_use]
974    pub fn count(self, count: usize) -> Self {
975        Self {
976            command_args: self.command_args.arg("COUNT").arg(count),
977        }
978    }
979
980    #[must_use]
981    pub fn block(self, milliseconds: u64) -> Self {
982        Self {
983            command_args: self.command_args.arg("BLOCK").arg(milliseconds),
984        }
985    }
986}
987
988impl IntoArgs for XReadOptions {
989    fn into_args(self, args: CommandArgs) -> CommandArgs {
990        args.arg(self.command_args)
991    }
992}
993
994/// Options for the [`xreadgroup`](crate::StreamCommands::xreadgroup) command
995#[derive(Default)]
996pub struct XReadGroupOptions {
997    command_args: CommandArgs,
998}
999
1000impl XReadGroupOptions {
1001    #[must_use]
1002    pub fn count(self, count: usize) -> Self {
1003        Self {
1004            command_args: self.command_args.arg("COUNT").arg(count),
1005        }
1006    }
1007
1008    #[must_use]
1009    pub fn block(self, milliseconds: u64) -> Self {
1010        Self {
1011            command_args: self.command_args.arg("BLOCK").arg(milliseconds),
1012        }
1013    }
1014
1015    #[must_use]
1016    pub fn no_ack(self) -> Self {
1017        Self {
1018            command_args: self.command_args.arg("NOACK"),
1019        }
1020    }
1021}
1022
1023impl IntoArgs for XReadGroupOptions {
1024    fn into_args(self, args: CommandArgs) -> CommandArgs {
1025        args.arg(self.command_args)
1026    }
1027}
1028
1029/// Options for the [`xpending_with_options`](crate::StreamCommands::xpending_with_options) command
1030#[derive(Default)]
1031pub struct XPendingOptions {
1032    command_args: CommandArgs,
1033}
1034
1035impl XPendingOptions {
1036    #[must_use]
1037    pub fn idle(self, min_idle_time: u64) -> Self {
1038        Self {
1039            command_args: self.command_args.arg("IDLE").arg(min_idle_time),
1040        }
1041    }
1042
1043    #[must_use]
1044    pub fn start<S: Into<CommandArg>>(self, start: S) -> Self {
1045        Self {
1046            command_args: self.command_args.arg(start),
1047        }
1048    }
1049
1050    #[must_use]
1051    pub fn end<E: Into<CommandArg>>(self, end: E) -> Self {
1052        Self {
1053            command_args: self.command_args.arg(end),
1054        }
1055    }
1056
1057    #[must_use]
1058    pub fn count(self, count: usize) -> Self {
1059        Self {
1060            command_args: self.command_args.arg(count),
1061        }
1062    }
1063
1064    #[must_use]
1065    pub fn consumer<C: Into<CommandArg>>(self, consumer: C) -> Self {
1066        Self {
1067            command_args: self.command_args.arg(consumer),
1068        }
1069    }
1070}
1071
1072impl IntoArgs for XPendingOptions {
1073    fn into_args(self, args: CommandArgs) -> CommandArgs {
1074        args.arg(self.command_args)
1075    }
1076}
1077
1078/// Result for the [`xpending`](crate::StreamCommands::xpending) command
1079pub struct XPendingResult {
1080    pub num_pending_messages: usize,
1081    pub smallest_id: String,
1082    pub greatest_id: String,
1083    pub consumers: Vec<XPendingConsumer>,
1084}
1085
1086impl FromValue for XPendingResult {
1087    fn from_value(value: Value) -> Result<Self> {
1088        let (num_pending_messages, smallest_id, greatest_id, consumers): (
1089            usize,
1090            String,
1091            String,
1092            Vec<XPendingConsumer>,
1093        ) = value.into()?;
1094        Ok(Self {
1095            num_pending_messages,
1096            smallest_id,
1097            greatest_id,
1098            consumers,
1099        })
1100    }
1101}
1102
1103/// Customer info result for the [`xpending`](crate::StreamCommands::xpending) command
1104pub struct XPendingConsumer {
1105    pub consumer: String,
1106    pub num_messages: usize,
1107}
1108
1109impl FromValue for XPendingConsumer {
1110    fn from_value(value: Value) -> Result<Self> {
1111        let (consumer, num_messages): (String, usize) = value.into()?;
1112        Ok(Self {
1113            consumer,
1114            num_messages,
1115        })
1116    }
1117}
1118
1119/// Message result for the [`xpending_with_options`](crate::StreamCommands::xpending_with_options) command
1120pub struct XPendingMessageResult {
1121    pub message_id: String,
1122    pub consumer: String,
1123    pub elapsed_millis: u64,
1124    pub times_delivered: usize,
1125}
1126
1127impl FromValue for XPendingMessageResult {
1128    fn from_value(value: Value) -> Result<Self> {
1129        let (message_id, consumer, elapsed_millis, times_delivered): (String, String, u64, usize) =
1130            value.into()?;
1131        Ok(Self {
1132            message_id,
1133            consumer,
1134            elapsed_millis,
1135            times_delivered,
1136        })
1137    }
1138}