rustis/commands/
stream_commands.rs

1use crate::{
2    client::{prepare_command, PreparedCommand},
3    resp::{
4        cmd, CommandArgs, KeyValueArgsCollection, KeyValueCollectionResponse, PrimitiveResponse,
5        SingleArg, SingleArgCollection, ToArgs,
6    },
7};
8use serde::{de::DeserializeOwned, Deserialize};
9use std::collections::HashMap;
10
11/// A group of Redis commands related to [`Streams`](https://redis.io/docs/data-types/streams/)
12/// # See Also
13/// [Redis Generic Commands](https://redis.io/commands/?group=stream)
14/// [Streams tutorial](https://redis.io/docs/data-types/streams-tutorial/)
15pub trait StreamCommands<'a> {
16    /// The XACK command removes one or multiple messages
17    /// from the Pending Entries List (PEL) of a stream consumer group
18    ///
19    /// # Return
20    /// The command returns the number of messages successfully acknowledged.
21    /// Certain message IDs may no longer be part of the PEL (for example because they have already been acknowledged),
22    /// and XACK will not count them as successfully acknowledged.
23    ///
24    /// # See Also
25    /// [<https://redis.io/commands/xack/>](https://redis.io/commands/xack/)
26    fn xack<K, G, I, II>(self, key: K, group: G, ids: II) -> PreparedCommand<'a, Self, usize>
27    where
28        Self: Sized,
29        K: SingleArg,
30        G: SingleArg,
31        I: SingleArg,
32        II: SingleArgCollection<I>,
33    {
34        prepare_command(self, cmd("XACK").arg(key).arg(group).arg(ids))
35    }
36
37    /// Appends the specified stream entry to the stream at the specified key.
38    ///
39    /// # Return
40    /// the ID of the added entry.
41    ///
42    /// The ID is the one auto-generated if * is passed as ID argument,
43    /// otherwise the command just returns the same ID specified by the user during insertion.
44    ///
45    /// The command returns a Null reply when used with create_stream=false and the key doesn't exist.
46    ///
47    /// # See Also
48    /// [<https://redis.io/commands/xadd/>](https://redis.io/commands/xadd/)
49    fn xadd<K, I, F, V, FFVV, R>(
50        self,
51        key: K,
52        stream_id: I,
53        items: FFVV,
54        options: XAddOptions,
55    ) -> PreparedCommand<'a, Self, R>
56    where
57        Self: Sized,
58        K: SingleArg,
59        I: SingleArg,
60        F: SingleArg,
61        V: SingleArg,
62        FFVV: KeyValueArgsCollection<F, V>,
63        R: PrimitiveResponse,
64    {
65        prepare_command(
66            self,
67            cmd("XADD").arg(key).arg(options).arg(stream_id).arg(items),
68        )
69    }
70
71    /// This command transfers ownership of pending stream entries that match the specified criteria.
72    ///
73    /// # Return
74    /// An instance of StreamAutoClaimResult
75    ///
76    /// # See Also
77    /// [<https://redis.io/commands/xautoclaim/>](https://redis.io/commands/xautoclaim/)
78    fn xautoclaim<K, G, C, I, V>(
79        self,
80        key: K,
81        group: G,
82        consumer: C,
83        min_idle_time: u64,
84        start: I,
85        options: XAutoClaimOptions,
86    ) -> PreparedCommand<'a, Self, XAutoClaimResult<V>>
87    where
88        Self: Sized,
89        K: SingleArg,
90        G: SingleArg,
91        C: SingleArg,
92        I: SingleArg,
93        V: PrimitiveResponse + DeserializeOwned,
94    {
95        prepare_command(
96            self,
97            cmd("XAUTOCLAIM")
98                .arg(key)
99                .arg(group)
100                .arg(consumer)
101                .arg(min_idle_time)
102                .arg(start)
103                .arg(options),
104        )
105    }
106
107    /// In the context of a stream consumer group, this command changes the ownership of a pending message,
108    /// so that the new owner is the consumer specified as the command argument.
109    ///
110    /// # Return
111    /// The ID of the added entry.
112    ///
113    /// The ID is the one auto-generated if * is passed as ID argument,
114    /// otherwise the command just returns the same ID specified by the user during insertion.
115    ///
116    /// The command returns a Null reply when used with create_stream=false and the key doesn't exist.
117    ///
118    /// # See Also
119    /// [<https://redis.io/commands/xclaim/>](https://redis.io/commands/xclaim/)
120    fn xclaim<K, G, C, I, II, V>(
121        self,
122        key: K,
123        group: G,
124        consumer: C,
125        min_idle_time: u64,
126        ids: II,
127        options: XClaimOptions,
128    ) -> PreparedCommand<'a, Self, Vec<StreamEntry<V>>>
129    where
130        Self: Sized,
131        K: SingleArg,
132        G: SingleArg,
133        C: SingleArg,
134        I: SingleArg,
135        II: SingleArgCollection<I>,
136        V: PrimitiveResponse + DeserializeOwned,
137    {
138        prepare_command(
139            self,
140            cmd("XCLAIM")
141                .arg(key)
142                .arg(group)
143                .arg(consumer)
144                .arg(min_idle_time)
145                .arg(ids)
146                .arg(options),
147        )
148    }
149
150    /// Removes the specified entries from a stream, and returns the number of entries deleted.
151    ///
152    /// # Return
153    /// The number of entries actually deleted.
154    ///
155    /// # See Also
156    /// [<https://redis.io/commands/xdel/>](https://redis.io/commands/xdel/)
157    fn xdel<K, I, II>(self, key: K, ids: II) -> PreparedCommand<'a, Self, usize>
158    where
159        Self: Sized,
160        K: SingleArg,
161        I: SingleArg,
162        II: SingleArgCollection<I>,
163    {
164        prepare_command(self, cmd("XDEL").arg(key).arg(ids))
165    }
166
167    /// This command creates a new consumer group uniquely identified by `groupname` for the stream stored at `key`.
168    ///
169    /// # Return
170    /// * `true` success
171    /// * `false`failure
172    ///
173    /// # See Also
174    /// [<https://redis.io/commands/xgroup-create/>](https://redis.io/commands/xgroup-create/)
175    fn xgroup_create<K, G, I>(
176        self,
177        key: K,
178        groupname: G,
179        id: I,
180        options: XGroupCreateOptions,
181    ) -> PreparedCommand<'a, Self, bool>
182    where
183        Self: Sized,
184        K: SingleArg,
185        G: SingleArg,
186        I: SingleArg,
187    {
188        prepare_command(
189            self,
190            cmd("XGROUP")
191                .arg("CREATE")
192                .arg(key)
193                .arg(groupname)
194                .arg(id)
195                .arg(options),
196        )
197    }
198
199    /// Create a consumer named `consumername` in the consumer group `groupname``
200    /// of the stream that's stored at `key.
201    ///
202    /// # Return
203    /// * `true` success
204    /// * `false`failure
205    ///
206    /// # See Also
207    /// [<https://redis.io/commands/xgroup-createconsumer/>](https://redis.io/commands/xgroup-createconsumer/)
208    fn xgroup_createconsumer<K, G, C>(
209        self,
210        key: K,
211        groupname: G,
212        consumername: C,
213    ) -> PreparedCommand<'a, Self, bool>
214    where
215        Self: Sized,
216        K: SingleArg,
217        G: SingleArg,
218        C: SingleArg,
219    {
220        prepare_command(
221            self,
222            cmd("XGROUP")
223                .arg("CREATECONSUMER")
224                .arg(key)
225                .arg(groupname)
226                .arg(consumername),
227        )
228    }
229
230    /// The XGROUP DELCONSUMER command deletes a consumer from the consumer group.
231    ///
232    /// # Return
233    /// The number of pending messages that the consumer had before it was deleted
234    ///
235    /// # See Also
236    /// [<https://redis.io/commands/xgroup-delconsumer/>](https://redis.io/commands/xgroup-delconsumer/)
237    fn xgroup_delconsumer<K, G, C>(
238        self,
239        key: K,
240        groupname: G,
241        consumername: C,
242    ) -> PreparedCommand<'a, Self, usize>
243    where
244        Self: Sized,
245        K: SingleArg,
246        G: SingleArg,
247        C: SingleArg,
248    {
249        prepare_command(
250            self,
251            cmd("XGROUP")
252                .arg("DELCONSUMER")
253                .arg(key)
254                .arg(groupname)
255                .arg(consumername),
256        )
257    }
258
259    /// The XGROUP DESTROY command completely destroys a consumer group.
260    ///
261    /// # Return
262    /// * `true` success
263    /// * `false`failure
264    ///
265    /// # See Also
266    /// [<https://redis.io/commands/xgroup-destroy/>](https://redis.io/commands/xgroup-destroy/)
267    fn xgroup_destroy<K, G>(self, key: K, groupname: G) -> PreparedCommand<'a, Self, bool>
268    where
269        Self: Sized,
270        K: SingleArg,
271        G: SingleArg,
272    {
273        prepare_command(self, cmd("XGROUP").arg("DESTROY").arg(key).arg(groupname))
274    }
275
276    /// Set the last delivered ID for a consumer group.
277    ///
278    /// # See Also
279    /// [<https://redis.io/commands/xgroup-setid/>](https://redis.io/commands/xgroup-setid/)
280    fn xgroup_setid<K, G, I>(
281        self,
282        key: K,
283        groupname: G,
284        id: I,
285        entries_read: Option<usize>,
286    ) -> PreparedCommand<'a, Self, ()>
287    where
288        Self: Sized,
289        K: SingleArg,
290        G: SingleArg,
291        I: SingleArg,
292    {
293        prepare_command(
294            self,
295            cmd("XGROUP")
296                .arg("SETID")
297                .arg(key)
298                .arg(groupname)
299                .arg(id)
300                .arg(entries_read.map(|e| ("ENTRIESREAD", e))),
301        )
302    }
303
304    /// This command returns the list of consumers that belong to the `groupname` consumer group of the stream stored at `key`.
305    ///
306    /// # Return
307    /// A collection of XConsumerInfo.
308    ///
309    /// # See Also
310    /// [<https://redis.io/commands/xinfo-consumers/>](https://redis.io/commands/xinfo-consumers/)
311    fn xinfo_consumers<K, G>(
312        self,
313        key: K,
314        groupname: G,
315    ) -> PreparedCommand<'a, Self, Vec<XConsumerInfo>>
316    where
317        Self: Sized,
318        K: SingleArg,
319        G: SingleArg,
320    {
321        prepare_command(self, cmd("XINFO").arg("CONSUMERS").arg(key).arg(groupname))
322    }
323
324    /// This command returns the list of consumers that belong
325    /// to the `groupname` consumer group of the stream stored at `key`.
326    ///
327    /// # Return
328    /// A collection of XGroupInfo.
329    ///
330    /// # See Also
331    /// [<https://redis.io/commands/xinfo-groups/>](https://redis.io/commands/xinfo-groups/)
332    fn xinfo_groups<K>(self, key: K) -> PreparedCommand<'a, Self, Vec<XGroupInfo>>
333    where
334        Self: Sized,
335        K: SingleArg,
336    {
337        prepare_command(self, cmd("XINFO").arg("GROUPS").arg(key))
338    }
339
340    /// This command returns information about the stream stored at `key`.
341    ///
342    /// # Return
343    /// A collection of XGroupInfo.
344    ///
345    /// # See Also
346    /// [<https://redis.io/commands/xinfo-stream/>](https://redis.io/commands/xinfo-stream/)
347    fn xinfo_stream<K>(
348        self,
349        key: K,
350        options: XInfoStreamOptions,
351    ) -> PreparedCommand<'a, Self, XStreamInfo>
352    where
353        Self: Sized,
354        K: SingleArg,
355    {
356        prepare_command(self, cmd("XINFO").arg("STREAM").arg(key).arg(options))
357    }
358
359    /// Returns the number of entries inside a stream.
360    ///
361    /// # Return
362    /// The number of entries of the stream at `key`.
363    ///
364    /// # See Also
365    /// [<https://redis.io/commands/xrange/>](https://redis.io/commands/xrange/)
366    fn xlen<K>(self, key: K) -> PreparedCommand<'a, Self, usize>
367    where
368        Self: Sized,
369        K: SingleArg,
370    {
371        prepare_command(self, cmd("XLEN").arg(key))
372    }
373
374    /// The XPENDING command is the interface to inspect the list of pending messages.
375    ///
376    /// # See Also
377    /// [<https://redis.io/commands/xpending/>](https://redis.io/commands/xpending/)
378    fn xpending<K, G>(self, key: K, group: G) -> PreparedCommand<'a, Self, XPendingResult>
379    where
380        Self: Sized,
381        K: SingleArg,
382        G: SingleArg,
383    {
384        prepare_command(self, cmd("XPENDING").arg(key).arg(group))
385    }
386
387    /// The XPENDING command is the interface to inspect the list of pending messages.
388    ///
389    /// # See Also
390    /// [<https://redis.io/commands/xpending/>](https://redis.io/commands/xpending/)
391    fn xpending_with_options<K, G>(
392        self,
393        key: K,
394        group: G,
395        options: XPendingOptions,
396    ) -> PreparedCommand<'a, Self, Vec<XPendingMessageResult>>
397    where
398        Self: Sized,
399        K: SingleArg,
400        G: SingleArg,
401    {
402        prepare_command(self, cmd("XPENDING").arg(key).arg(group).arg(options))
403    }
404
405    /// The command returns the stream entries matching a given range of IDs.
406    ///
407    /// # Return
408    /// A collection of StreamEntry
409    ///
410    /// The command returns the entries with IDs matching the specified range.
411    /// The returned entries are complete, that means that the ID and all the fields they are composed are returned.
412    /// Moreover, the entries are returned with their fields and values in the exact same order as XADD added them.
413    ///
414    /// # See Also
415    /// [<https://redis.io/commands/xrange/>](https://redis.io/commands/xrange/)
416    fn xrange<K, S, E, V>(
417        self,
418        key: K,
419        start: S,
420        end: E,
421        count: Option<usize>,
422    ) -> PreparedCommand<'a, Self, Vec<StreamEntry<V>>>
423    where
424        Self: Sized,
425        K: SingleArg,
426        S: SingleArg,
427        E: SingleArg,
428        V: PrimitiveResponse + DeserializeOwned,
429    {
430        prepare_command(
431            self,
432            cmd("XRANGE")
433                .arg(key)
434                .arg(start)
435                .arg(end)
436                .arg(count.map(|c| ("COUNT", c))),
437        )
438    }
439
440    /// Read data from one or multiple streams,
441    /// only returning entries with an ID greater than the last received ID reported by the caller.
442    ///
443    /// # Return
444    /// A collection of XReadStreamResult
445    ///
446    /// # See Also
447    /// [<https://redis.io/commands/xread/>](https://redis.io/commands/xread/)
448    fn xread<K, KK, I, II, V, R>(
449        self,
450        options: XReadOptions,
451        keys: KK,
452        ids: II,
453    ) -> PreparedCommand<'a, Self, R>
454    where
455        Self: Sized,
456        K: SingleArg,
457        KK: SingleArgCollection<K>,
458        I: SingleArg,
459        II: SingleArgCollection<I>,
460        V: PrimitiveResponse + DeserializeOwned,
461        R: KeyValueCollectionResponse<String, Vec<StreamEntry<V>>>,
462    {
463        prepare_command(
464            self,
465            cmd("XREAD").arg(options).arg("STREAMS").arg(keys).arg(ids),
466        )
467    }
468
469    /// The XREADGROUP command is a special version of the [`xread`](StreamCommands::xread)
470    /// command with support for consumer groups.
471    ///
472    /// # Return
473    /// A collection of XReadStreamResult
474    ///
475    /// # See Also
476    /// [<https://redis.io/commands/xreadgroup/>](https://redis.io/commands/xreadgroup/)
477    fn xreadgroup<G, C, K, KK, I, II, V, R>(
478        self,
479        group: G,
480        consumer: C,
481        options: XReadGroupOptions,
482        keys: KK,
483        ids: II,
484    ) -> PreparedCommand<'a, Self, R>
485    where
486        Self: Sized,
487        G: SingleArg,
488        C: SingleArg,
489        K: SingleArg,
490        KK: SingleArgCollection<K>,
491        I: SingleArg,
492        II: SingleArgCollection<I>,
493        V: PrimitiveResponse + DeserializeOwned,
494        R: KeyValueCollectionResponse<String, Vec<StreamEntry<V>>>,
495    {
496        prepare_command(
497            self,
498            cmd("XREADGROUP")
499                .arg("GROUP")
500                .arg(group)
501                .arg(consumer)
502                .arg(options)
503                .arg("STREAMS")
504                .arg(keys)
505                .arg(ids),
506        )
507    }
508
509    /// This command is exactly like [`xrange`](StreamCommands::xrange),
510    /// but with the notable difference of returning the entries in reverse order,
511    /// and also taking the start-end range in reverse order
512    ///
513    /// # Return
514    /// A collection of StreamEntry
515    ///
516    /// # See Also
517    /// [<https://redis.io/commands/xrevrange/>](https://redis.io/commands/xrevrange/)
518    fn xrevrange<K, E, S, V>(
519        self,
520        key: K,
521        end: E,
522        start: S,
523        count: Option<usize>,
524    ) -> PreparedCommand<'a, Self, Vec<StreamEntry<V>>>
525    where
526        Self: Sized,
527        K: SingleArg,
528        E: SingleArg,
529        S: SingleArg,
530        V: PrimitiveResponse + DeserializeOwned,
531    {
532        prepare_command(
533            self,
534            cmd("XREVRANGE")
535                .arg(key)
536                .arg(end)
537                .arg(start)
538                .arg(count.map(|c| ("COUNT", c))),
539        )
540    }
541
542    /// XTRIM trims the stream by evicting older entries (entries with lower IDs) if needed.
543    ///
544    /// # Return
545    /// The number of entries deleted from the stream.
546    ///
547    /// # See Also
548    /// [<https://redis.io/commands/xtrim/>](https://redis.io/commands/xtrim/)
549    fn xtrim<K>(self, key: K, options: XTrimOptions) -> PreparedCommand<'a, Self, usize>
550    where
551        Self: Sized,
552        K: SingleArg,
553    {
554        prepare_command(self, cmd("XTRIM").arg(key).arg(options))
555    }
556}
557
558/// Stream Add options for the [`xadd`](StreamCommands::xadd) command.
559#[derive(Default)]
560pub struct XAddOptions {
561    command_args: CommandArgs,
562}
563
564impl XAddOptions {
565    #[must_use]
566    pub fn no_mk_stream(mut self) -> Self {
567        Self {
568            command_args: self.command_args.arg("NOMKSTREAM").build(),
569        }
570    }
571
572    #[must_use]
573    pub fn trim_options(mut self, trim_options: XTrimOptions) -> Self {
574        Self {
575            command_args: self.command_args.arg(trim_options).build(),
576        }
577    }
578}
579
580impl ToArgs for XAddOptions {
581    fn write_args(&self, args: &mut CommandArgs) {
582        args.arg(&self.command_args);
583    }
584}
585
586/// Stream Trim operator for the [`xadd`](StreamCommands::xadd)
587/// and [`xtrim`](StreamCommands::xtrim) commands
588#[derive(Default)]
589pub enum XTrimOperator {
590    #[default]
591    None,
592    /// =
593    Equal,
594    /// ~
595    Approximately,
596}
597
598impl ToArgs for XTrimOperator {
599    fn write_args(&self, args: &mut CommandArgs) {
600        match self {
601            XTrimOperator::None => {}
602            XTrimOperator::Equal => {
603                args.arg("=");
604            }
605            XTrimOperator::Approximately => {
606                args.arg("~");
607            }
608        }
609    }
610}
611
612/// Stream Trim options for the [`xadd`](StreamCommands::xadd)
613/// and [`xtrim`](StreamCommands::xtrim) commands
614#[derive(Default)]
615pub struct XTrimOptions {
616    command_args: CommandArgs,
617}
618
619impl XTrimOptions {
620    #[must_use]
621    pub fn max_len(operator: XTrimOperator, threshold: i64) -> Self {
622        Self {
623            command_args: CommandArgs::default()
624                .arg("MAXLEN")
625                .arg(operator)
626                .arg(threshold)
627                .build(),
628        }
629    }
630
631    #[must_use]
632    pub fn min_id<I: SingleArg>(operator: XTrimOperator, threshold_id: I) -> Self {
633        Self {
634            command_args: CommandArgs::default()
635                .arg("MINID")
636                .arg(operator)
637                .arg(threshold_id)
638                .build(),
639        }
640    }
641
642    #[must_use]
643    pub fn limit(mut self, count: usize) -> Self {
644        Self {
645            command_args: self.command_args.arg("LIMIT").arg(count).build(),
646        }
647    }
648}
649
650impl ToArgs for XTrimOptions {
651    fn write_args(&self, args: &mut CommandArgs) {
652        args.arg(&self.command_args);
653    }
654}
655
656/// Options for the [`xautoclaim`](StreamCommands::xautoclaim) command
657#[derive(Default)]
658pub struct XAutoClaimOptions {
659    command_args: CommandArgs,
660}
661
662impl XAutoClaimOptions {
663    #[must_use]
664    pub fn count(mut self, count: usize) -> Self {
665        Self {
666            command_args: self.command_args.arg("COUNT").arg(count).build(),
667        }
668    }
669
670    #[must_use]
671    pub fn just_id(mut self) -> Self {
672        Self {
673            command_args: self.command_args.arg("JUSTID").build(),
674        }
675    }
676}
677
678impl ToArgs for XAutoClaimOptions {
679    fn write_args(&self, args: &mut CommandArgs) {
680        args.arg(&self.command_args);
681    }
682}
683
684/// Result for the [`xrange`](StreamCommands::xrange) and other associated commands.
685#[derive(Deserialize)]
686pub struct StreamEntry<V>
687where
688    V: PrimitiveResponse,
689{
690    /// The stream Id
691    pub stream_id: String,
692    /// entries with their fields and values in the exact same
693    /// order as [`xadd`](StreamCommands::xadd) added them.
694    pub items: HashMap<String, V>,
695}
696
697/// Result for the [`xautoclaim`](StreamCommands::xautoclaim) command.
698#[derive(Deserialize)]
699pub struct XAutoClaimResult<V>
700where
701    V: PrimitiveResponse,
702{
703    /// A stream ID to be used as the `start` argument for
704    /// the next call to [`xautoclaim`](StreamCommands::xautoclaim).
705    pub start_stream_id: String,
706    /// An array containing all the successfully claimed messages in
707    /// the same format as [`xrange`](StreamCommands::xrange).
708    pub entries: Vec<StreamEntry<V>>,
709    /// An array containing message IDs that no longer exist in the stream,
710    /// and were deleted from the PEL in which they were found.
711    pub deleted_ids: Vec<String>,
712}
713
714/// Options for the [`xclaim`](StreamCommands::xclaim) command
715#[derive(Default)]
716pub struct XClaimOptions {
717    command_args: CommandArgs,
718}
719
720impl XClaimOptions {
721    /// Set the idle time (last time it was delivered) of the message.
722    #[must_use]
723    pub fn idle_time(mut self, idle_time_millis: u64) -> Self {
724        Self {
725            command_args: self.command_args.arg("IDLE").arg(idle_time_millis).build(),
726        }
727    }
728
729    ///  This is the same as `idle_time` but instead of a relative amount of milliseconds,
730    /// it sets the idle time to a specific Unix time (in milliseconds).
731    #[must_use]
732    pub fn time(mut self, unix_time_milliseconds: u64) -> Self {
733        Self {
734            command_args: self
735                .command_args
736                .arg("TIME")
737                .arg(unix_time_milliseconds)
738                .build(),
739        }
740    }
741
742    /// Set the retry counter to the specified value.
743    #[must_use]
744    pub fn retry_count(mut self, count: usize) -> Self {
745        Self {
746            command_args: self.command_args.arg("RETRYCOUNT").arg(count).build(),
747        }
748    }
749
750    /// Creates the pending message entry in the PEL
751    /// even if certain specified IDs are not already
752    /// in the PEL assigned to a different client.
753    #[must_use]
754    pub fn force(mut self) -> Self {
755        Self {
756            command_args: self.command_args.arg("FORCE").build(),
757        }
758    }
759
760    ///  Return just an array of IDs of messages successfully claimed,
761    /// without returning the actual message.
762    #[must_use]
763    pub fn just_id(mut self) -> Self {
764        Self {
765            command_args: self.command_args.arg("JUSTID").build(),
766        }
767    }
768}
769
770impl ToArgs for XClaimOptions {
771    fn write_args(&self, args: &mut CommandArgs) {
772        args.arg(&self.command_args);
773    }
774}
775
776/// Options for the [`xgroup_create`](StreamCommands::xgroup_create) command
777#[derive(Default)]
778pub struct XGroupCreateOptions {
779    command_args: CommandArgs,
780}
781
782impl XGroupCreateOptions {
783    /// By default, the XGROUP CREATE command insists that the target stream exists and returns an error when it doesn't.
784    ///  However, you can use the optional MKSTREAM subcommand as the last argument after the `id`
785    /// to automatically create the stream (with length of 0) if it doesn't exist
786    #[must_use]
787    pub fn mk_stream(mut self) -> Self {
788        Self {
789            command_args: self.command_args.arg("MKSTREAM").build(),
790        }
791    }
792
793    /// The optional entries_read named argument can be specified to enable consumer group lag tracking for an arbitrary ID.
794    /// An arbitrary ID is any ID that isn't the ID of the stream's first entry, its last entry or the zero ("0-0") ID.
795    /// This can be useful you know exactly how many entries are between the arbitrary ID (excluding it) and the stream's last entry.
796    /// In such cases, the entries_read can be set to the stream's entries_added subtracted with the number of entries.
797    #[must_use]
798    pub fn entries_read(mut self, entries_read: usize) -> Self {
799        Self {
800            command_args: self
801                .command_args
802                .arg("ENTRIESREAD")
803                .arg(entries_read)
804                .build(),
805        }
806    }
807}
808
809impl ToArgs for XGroupCreateOptions {
810    fn write_args(&self, args: &mut CommandArgs) {
811        args.arg(&self.command_args);
812    }
813}
814
815/// Result entry for the [`xinfo_consumers`](StreamCommands::xinfo_consumers) command.
816#[derive(Deserialize)]
817pub struct XConsumerInfo {
818    /// the consumer's name
819    pub name: String,
820
821    /// the number of pending messages for the client,
822    /// which are messages that were delivered but are yet to be acknowledged
823    pub pending: usize,
824
825    /// the number of milliseconds that have passed
826    /// since the consumer last interacted with the server
827    #[serde(rename = "idle")]
828    pub idle_millis: u64,
829}
830
831/// Result entry for the [`xinfo_groups`](StreamCommands::xinfo_groups) command.
832#[derive(Deserialize)]
833#[serde(rename_all = "kebab-case")]
834pub struct XGroupInfo {
835    /// the consumer group's name
836    pub name: String,
837
838    /// the number of consumers in the group
839    pub consumers: usize,
840
841    /// the length of the group's pending entries list (PEL),
842    /// which are messages that were delivered but are yet to be acknowledged
843    pub pending: usize,
844
845    /// the ID of the last entry delivered the group's consumers
846    pub last_delivered_id: String,
847
848    /// the logical "read counter" of the last entry delivered to group's consumers
849    pub entries_read: Option<usize>,
850
851    /// the number of entries in the stream that are still waiting to be delivered to the group's consumers,
852    /// or a NULL when that number can't be determined.
853    pub lag: Option<usize>,
854}
855
856/// Options for the [`xinfo_stream`](StreamCommands::xinfo_stream) command
857#[derive(Default)]
858pub struct XInfoStreamOptions {
859    command_args: CommandArgs,
860}
861
862impl XInfoStreamOptions {
863    /// The optional FULL modifier provides a more verbose reply.
864    #[must_use]
865    pub fn full(mut self) -> Self {
866        Self {
867            command_args: self.command_args.arg("FULL").build(),
868        }
869    }
870
871    /// The COUNT option can be used to limit the number of stream and PEL entries that are returned
872    /// (The first `count` entries are returned).
873    #[must_use]
874    pub fn count(mut self, count: usize) -> Self {
875        Self {
876            command_args: self.command_args.arg("COUNT").arg(count).build(),
877        }
878    }
879}
880
881impl ToArgs for XInfoStreamOptions {
882    fn write_args(&self, args: &mut CommandArgs) {
883        args.arg(&self.command_args);
884    }
885}
886
887/// Stream info returned by the [`xinfo_stream`](StreamCommands::xinfo_stream) command.
888#[derive(Deserialize)]
889#[serde(rename_all = "kebab-case")]
890pub struct XStreamInfo {
891    /// the number of entries in the stream (see [`xlen`](StreamCommands::xlen))
892    pub length: usize,
893
894    /// the number of keys in the underlying radix data structure
895    pub radix_tree_keys: usize,
896
897    /// the number of nodes in the underlying radix data structure
898    pub radix_tree_nodes: usize,
899
900    /// the number of consumer groups defined for the stream
901    pub groups: usize,
902
903    /// the ID of the least-recently entry that was added to the stream
904    pub last_generated_id: String,
905
906    /// the maximal entry ID that was deleted from the stream
907    pub max_deleted_entry_id: String,
908
909    /// the count of all entries added to the stream during its lifetime
910    pub entries_added: usize,
911
912    /// the ID and field-value tuples of the first entry in the stream
913    pub first_entry: StreamEntry<String>,
914
915    /// the ID and field-value tuples of the last entry in the stream
916    pub last_entry: StreamEntry<String>,
917
918    pub recorded_first_entry_id: String,
919}
920
921/// Options for the [`xread`](StreamCommands::xread) command
922#[derive(Default)]
923pub struct XReadOptions {
924    command_args: CommandArgs,
925}
926
927impl XReadOptions {
928    #[must_use]
929    pub fn count(mut self, count: usize) -> Self {
930        Self {
931            command_args: self.command_args.arg("COUNT").arg(count).build(),
932        }
933    }
934
935    #[must_use]
936    pub fn block(mut self, milliseconds: u64) -> Self {
937        Self {
938            command_args: self.command_args.arg("BLOCK").arg(milliseconds).build(),
939        }
940    }
941}
942
943impl ToArgs for XReadOptions {
944    fn write_args(&self, args: &mut CommandArgs) {
945        args.arg(&self.command_args);
946    }
947}
948
949/// Options for the [`xreadgroup`](StreamCommands::xreadgroup) command
950#[derive(Default)]
951pub struct XReadGroupOptions {
952    command_args: CommandArgs,
953}
954
955impl XReadGroupOptions {
956    #[must_use]
957    pub fn count(mut self, count: usize) -> Self {
958        Self {
959            command_args: self.command_args.arg("COUNT").arg(count).build(),
960        }
961    }
962
963    #[must_use]
964    pub fn block(mut self, milliseconds: u64) -> Self {
965        Self {
966            command_args: self.command_args.arg("BLOCK").arg(milliseconds).build(),
967        }
968    }
969
970    #[must_use]
971    pub fn no_ack(mut self) -> Self {
972        Self {
973            command_args: self.command_args.arg("NOACK").build(),
974        }
975    }
976}
977
978impl ToArgs for XReadGroupOptions {
979    fn write_args(&self, args: &mut CommandArgs) {
980        args.arg(&self.command_args);
981    }
982}
983
984/// Options for the [`xpending_with_options`](StreamCommands::xpending_with_options) command
985#[derive(Default)]
986pub struct XPendingOptions {
987    command_args: CommandArgs,
988}
989
990impl XPendingOptions {
991    #[must_use]
992    pub fn idle(mut self, min_idle_time: u64) -> Self {
993        Self {
994            command_args: self.command_args.arg("IDLE").arg(min_idle_time).build(),
995        }
996    }
997
998    #[must_use]
999    pub fn start<S: SingleArg>(mut self, start: S) -> Self {
1000        Self {
1001            command_args: self.command_args.arg(start).build(),
1002        }
1003    }
1004
1005    #[must_use]
1006    pub fn end<E: SingleArg>(mut self, end: E) -> Self {
1007        Self {
1008            command_args: self.command_args.arg(end).build(),
1009        }
1010    }
1011
1012    #[must_use]
1013    pub fn count(mut self, count: usize) -> Self {
1014        Self {
1015            command_args: self.command_args.arg(count).build(),
1016        }
1017    }
1018
1019    #[must_use]
1020    pub fn consumer<C: SingleArg>(mut self, consumer: C) -> Self {
1021        Self {
1022            command_args: self.command_args.arg(consumer).build(),
1023        }
1024    }
1025}
1026
1027impl ToArgs for XPendingOptions {
1028    fn write_args(&self, args: &mut CommandArgs) {
1029        args.arg(&self.command_args);
1030    }
1031}
1032
1033/// Result for the [`xpending`](StreamCommands::xpending) command
1034#[derive(Deserialize)]
1035pub struct XPendingResult {
1036    pub num_pending_messages: usize,
1037    pub smallest_id: String,
1038    pub greatest_id: String,
1039    pub consumers: Vec<XPendingConsumer>,
1040}
1041
1042/// Customer info result for the [`xpending`](StreamCommands::xpending) command
1043#[derive(Deserialize)]
1044pub struct XPendingConsumer {
1045    pub consumer: String,
1046    pub num_messages: usize,
1047}
1048
1049/// Message result for the [`xpending_with_options`](StreamCommands::xpending_with_options) command
1050#[derive(Deserialize)]
1051pub struct XPendingMessageResult {
1052    pub message_id: String,
1053    pub consumer: String,
1054    pub elapsed_millis: u64,
1055    pub times_delivered: usize,
1056}