rustis/commands/
stream_commands.rs

1use crate::{
2    client::{prepare_command, PreparedCommand},
3    resp::{
4        cmd, CommandArgs, KeyValueArgsCollection, KeyValueCollectionResponse, PrimitiveResponse,
5        SingleArg, SingleArgCollection, ToArgs,
6    },
7};
8use serde::{de::DeserializeOwned, Deserialize};
9use std::collections::HashMap;
10
11/// A group of Redis commands related to [`Streams`](https://redis.io/docs/data-types/streams/)
12/// # See Also
13/// [Redis Generic Commands](https://redis.io/commands/?group=stream)
14/// [Streams tutorial](https://redis.io/docs/data-types/streams-tutorial/)
15pub trait StreamCommands<'a> {
16    /// The XACK command removes one or multiple messages
17    /// from the Pending Entries List (PEL) of a stream consumer group
18    ///
19    /// # Return
20    /// The command returns the number of messages successfully acknowledged.
21    /// Certain message IDs may no longer be part of the PEL (for example because they have already been acknowledged),
22    /// and XACK will not count them as successfully acknowledged.
23    ///
24    /// # See Also
25    /// [<https://redis.io/commands/xack/>](https://redis.io/commands/xack/)
26    fn xack<K, G, I, II>(self, key: K, group: G, ids: II) -> PreparedCommand<'a, Self, usize>
27    where
28        Self: Sized,
29        K: SingleArg,
30        G: SingleArg,
31        I: SingleArg,
32        II: SingleArgCollection<I>,
33    {
34        prepare_command(self, cmd("XACK").arg(key).arg(group).arg(ids))
35    }
36
37    /// Appends the specified stream entry to the stream at the specified key.
38    ///
39    /// # Return
40    /// the ID of the added entry.
41    ///
42    /// The ID is the one auto-generated if * is passed as ID argument,
43    /// otherwise the command just returns the same ID specified by the user during insertion.
44    ///
45    /// The command returns a Null reply when used with create_stream=false and the key doesn't exist.
46    ///
47    /// # See Also
48    /// [<https://redis.io/commands/xadd/>](https://redis.io/commands/xadd/)
49    fn xadd<K, I, F, V, FFVV, R>(
50        self,
51        key: K,
52        stream_id: I,
53        items: FFVV,
54        options: XAddOptions,
55    ) -> PreparedCommand<'a, Self, R>
56    where
57        Self: Sized,
58        K: SingleArg,
59        I: SingleArg,
60        F: SingleArg,
61        V: SingleArg,
62        FFVV: KeyValueArgsCollection<F, V>,
63        R: PrimitiveResponse,
64    {
65        prepare_command(
66            self,
67            cmd("XADD").arg(key).arg(options).arg(stream_id).arg(items),
68        )
69    }
70
71    /// This command transfers ownership of pending stream entries that match the specified criteria.
72    ///
73    /// # Return
74    /// An instance of StreamAutoClaimResult
75    ///
76    /// # See Also
77    /// [<https://redis.io/commands/xautoclaim/>](https://redis.io/commands/xautoclaim/)
78    fn xautoclaim<K, G, C, I, V>(
79        self,
80        key: K,
81        group: G,
82        consumer: C,
83        min_idle_time: u64,
84        start: I,
85        options: XAutoClaimOptions,
86    ) -> PreparedCommand<'a, Self, XAutoClaimResult<V>>
87    where
88        Self: Sized,
89        K: SingleArg,
90        G: SingleArg,
91        C: SingleArg,
92        I: SingleArg,
93        V: PrimitiveResponse + DeserializeOwned,
94    {
95        prepare_command(
96            self,
97            cmd("XAUTOCLAIM")
98                .arg(key)
99                .arg(group)
100                .arg(consumer)
101                .arg(min_idle_time)
102                .arg(start)
103                .arg(options),
104        )
105    }
106
107    /// In the context of a stream consumer group, this command changes the ownership of a pending message,
108    /// so that the new owner is the consumer specified as the command argument.
109    ///
110    /// # Return
111    /// The ID of the added entry.
112    ///
113    /// The ID is the one auto-generated if * is passed as ID argument,
114    /// otherwise the command just returns the same ID specified by the user during insertion.
115    ///
116    /// The command returns a Null reply when used with create_stream=false and the key doesn't exist.
117    ///
118    /// # See Also
119    /// [<https://redis.io/commands/xclaim/>](https://redis.io/commands/xclaim/)
120    fn xclaim<K, G, C, I, II, V>(
121        self,
122        key: K,
123        group: G,
124        consumer: C,
125        min_idle_time: u64,
126        ids: II,
127        options: XClaimOptions,
128    ) -> PreparedCommand<'a, Self, Vec<StreamEntry<V>>>
129    where
130        Self: Sized,
131        K: SingleArg,
132        G: SingleArg,
133        C: SingleArg,
134        I: SingleArg,
135        II: SingleArgCollection<I>,
136        V: PrimitiveResponse + DeserializeOwned,
137    {
138        prepare_command(
139            self,
140            cmd("XCLAIM")
141                .arg(key)
142                .arg(group)
143                .arg(consumer)
144                .arg(min_idle_time)
145                .arg(ids)
146                .arg(options),
147        )
148    }
149
150    /// Removes the specified entries from a stream, and returns the number of entries deleted.
151    ///
152    /// # Return
153    /// The number of entries actually deleted.
154    ///
155    /// # See Also
156    /// [<https://redis.io/commands/xdel/>](https://redis.io/commands/xdel/)
157    fn xdel<K, I, II>(self, key: K, ids: II) -> PreparedCommand<'a, Self, usize>
158    where
159        Self: Sized,
160        K: SingleArg,
161        I: SingleArg,
162        II: SingleArgCollection<I>,
163    {
164        prepare_command(self, cmd("XDEL").arg(key).arg(ids))
165    }
166
167    /// This command creates a new consumer group uniquely identified by `groupname` for the stream stored at `key`.
168    ///
169    /// # Return
170    /// * `true` success
171    /// * `false`failure
172    ///
173    /// # See Also
174    /// [<https://redis.io/commands/xgroup-create/>](https://redis.io/commands/xgroup-create/)
175    fn xgroup_create<K, G, I>(
176        self,
177        key: K,
178        groupname: G,
179        id: I,
180        options: XGroupCreateOptions,
181    ) -> PreparedCommand<'a, Self, bool>
182    where
183        Self: Sized,
184        K: SingleArg,
185        G: SingleArg,
186        I: SingleArg,
187    {
188        prepare_command(
189            self,
190            cmd("XGROUP")
191                .arg("CREATE")
192                .arg(key)
193                .arg(groupname)
194                .arg(id)
195                .arg(options),
196        )
197    }
198
199    /// Create a consumer named `consumername` in the consumer group `groupname``
200    /// of the stream that's stored at `key.
201    ///
202    /// # Return
203    /// * `true` success
204    /// * `false`failure
205    ///
206    /// # See Also
207    /// [<https://redis.io/commands/xgroup-createconsumer/>](https://redis.io/commands/xgroup-createconsumer/)
208    fn xgroup_createconsumer<K, G, C>(
209        self,
210        key: K,
211        groupname: G,
212        consumername: C,
213    ) -> PreparedCommand<'a, Self, bool>
214    where
215        Self: Sized,
216        K: SingleArg,
217        G: SingleArg,
218        C: SingleArg,
219    {
220        prepare_command(
221            self,
222            cmd("XGROUP")
223                .arg("CREATECONSUMER")
224                .arg(key)
225                .arg(groupname)
226                .arg(consumername),
227        )
228    }
229
230    /// The XGROUP DELCONSUMER command deletes a consumer from the consumer group.
231    ///
232    /// # Return
233    /// The number of pending messages that the consumer had before it was deleted
234    ///
235    /// # See Also
236    /// [<https://redis.io/commands/xgroup-delconsumer/>](https://redis.io/commands/xgroup-delconsumer/)
237    fn xgroup_delconsumer<K, G, C>(
238        self,
239        key: K,
240        groupname: G,
241        consumername: C,
242    ) -> PreparedCommand<'a, Self, usize>
243    where
244        Self: Sized,
245        K: SingleArg,
246        G: SingleArg,
247        C: SingleArg,
248    {
249        prepare_command(
250            self,
251            cmd("XGROUP")
252                .arg("DELCONSUMER")
253                .arg(key)
254                .arg(groupname)
255                .arg(consumername),
256        )
257    }
258
259    /// The XGROUP DESTROY command completely destroys a consumer group.
260    ///
261    /// # Return
262    /// * `true` success
263    /// * `false`failure
264    ///
265    /// # See Also
266    /// [<https://redis.io/commands/xgroup-destroy/>](https://redis.io/commands/xgroup-destroy/)
267    fn xgroup_destroy<K, G>(self, key: K, groupname: G) -> PreparedCommand<'a, Self, bool>
268    where
269        Self: Sized,
270        K: SingleArg,
271        G: SingleArg,
272    {
273        prepare_command(self, cmd("XGROUP").arg("DESTROY").arg(key).arg(groupname))
274    }
275
276    /// The command returns a helpful text describing the different XGROUP subcommands.
277    ///
278    /// # Return
279    /// An array of strings.
280    ///
281    /// # Example
282    /// ```
283    /// # use rustis::{
284    /// #    client::Client,
285    /// #    commands::StreamCommands,
286    /// #    Result,
287    /// # };
288    /// #
289    /// # #[cfg_attr(feature = "tokio-runtime", tokio::main)]
290    /// # #[cfg_attr(feature = "async-std-runtime", async_std::main)]
291    /// # async fn main() -> Result<()> {
292    /// #    let client = Client::connect("127.0.0.1:6379").await?;
293    /// let result: Vec<String> = client.xgroup_help().await?;
294    /// assert!(result.iter().any(|e| e == "HELP"));
295    /// #   Ok(())
296    /// # }
297    /// ```
298    ///
299    /// # See Also
300    /// [<https://redis.io/commands/xgroup-help/>](https://redis.io/commands/xgroup-help/)
301    #[must_use]
302    fn xgroup_help(self) -> PreparedCommand<'a, Self, Vec<String>>
303    where
304        Self: Sized,
305    {
306        prepare_command(self, cmd("XGROUP").arg("HELP"))
307    }
308
309    /// Set the last delivered ID for a consumer group.
310    ///
311    /// # See Also
312    /// [<https://redis.io/commands/xgroup-setid/>](https://redis.io/commands/xgroup-setid/)
313    fn xgroup_setid<K, G, I>(
314        self,
315        key: K,
316        groupname: G,
317        id: I,
318        entries_read: Option<usize>,
319    ) -> PreparedCommand<'a, Self, ()>
320    where
321        Self: Sized,
322        K: SingleArg,
323        G: SingleArg,
324        I: SingleArg,
325    {
326        prepare_command(
327            self,
328            cmd("XGROUP")
329                .arg("SETID")
330                .arg(key)
331                .arg(groupname)
332                .arg(id)
333                .arg(entries_read.map(|e| ("ENTRIESREAD", e))),
334        )
335    }
336
337    /// This command returns the list of consumers that belong to the `groupname` consumer group of the stream stored at `key`.
338    ///
339    /// # Return
340    /// A collection of XConsumerInfo.
341    ///
342    /// # See Also
343    /// [<https://redis.io/commands/xinfo-consumers/>](https://redis.io/commands/xinfo-consumers/)
344    fn xinfo_consumers<K, G>(
345        self,
346        key: K,
347        groupname: G,
348    ) -> PreparedCommand<'a, Self, Vec<XConsumerInfo>>
349    where
350        Self: Sized,
351        K: SingleArg,
352        G: SingleArg,
353    {
354        prepare_command(self, cmd("XINFO").arg("CONSUMERS").arg(key).arg(groupname))
355    }
356
357    /// This command returns the list of consumers that belong
358    /// to the `groupname` consumer group of the stream stored at `key`.
359    ///
360    /// # Return
361    /// A collection of XGroupInfo.
362    ///
363    /// # See Also
364    /// [<https://redis.io/commands/xinfo-groups/>](https://redis.io/commands/xinfo-groups/)
365    fn xinfo_groups<K>(self, key: K) -> PreparedCommand<'a, Self, Vec<XGroupInfo>>
366    where
367        Self: Sized,
368        K: SingleArg,
369    {
370        prepare_command(self, cmd("XINFO").arg("GROUPS").arg(key))
371    }
372
373    /// The command returns a helpful text describing the different XINFO subcommands.
374    ///
375    /// # Return
376    /// An array of strings.
377    ///
378    /// # Example
379    /// ```
380    /// # use rustis::{
381    /// #    client::Client,
382    /// #    commands::StreamCommands,
383    /// #    Result,
384    /// # };
385    /// #
386    /// # #[cfg_attr(feature = "tokio-runtime", tokio::main)]
387    /// # #[cfg_attr(feature = "async-std-runtime", async_std::main)]
388    /// # async fn main() -> Result<()> {
389    /// #    let client = Client::connect("127.0.0.1:6379").await?;
390    /// let result: Vec<String> = client.xinfo_help().await?;
391    /// assert!(result.iter().any(|e| e == "HELP"));
392    /// #   Ok(())
393    /// # }
394    /// ```
395    ///
396    /// # See Also
397    /// [<https://redis.io/commands/xinfo-help/>](https://redis.io/commands/xinfo-help/)
398    #[must_use]
399    fn xinfo_help(self) -> PreparedCommand<'a, Self, Vec<String>>
400    where
401        Self: Sized,
402    {
403        prepare_command(self, cmd("XINFO").arg("HELP"))
404    }
405
406    /// This command returns information about the stream stored at `key`.
407    ///
408    /// # Return
409    /// A collection of XGroupInfo.
410    ///
411    /// # See Also
412    /// [<https://redis.io/commands/xinfo-stream/>](https://redis.io/commands/xinfo-stream/)
413    fn xinfo_stream<K>(
414        self,
415        key: K,
416        options: XInfoStreamOptions,
417    ) -> PreparedCommand<'a, Self, XStreamInfo>
418    where
419        Self: Sized,
420        K: SingleArg,
421    {
422        prepare_command(self, cmd("XINFO").arg("STREAM").arg(key).arg(options))
423    }
424
425    /// Returns the number of entries inside a stream.
426    ///
427    /// # Return
428    /// The number of entries of the stream at `key`.
429    ///
430    /// # See Also
431    /// [<https://redis.io/commands/xrange/>](https://redis.io/commands/xrange/)
432    fn xlen<K>(self, key: K) -> PreparedCommand<'a, Self, usize>
433    where
434        Self: Sized,
435        K: SingleArg,
436    {
437        prepare_command(self, cmd("XLEN").arg(key))
438    }
439
440    /// The XPENDING command is the interface to inspect the list of pending messages.
441    ///
442    /// # See Also
443    /// [<https://redis.io/commands/xpending/>](https://redis.io/commands/xpending/)
444    fn xpending<K, G>(self, key: K, group: G) -> PreparedCommand<'a, Self, XPendingResult>
445    where
446        Self: Sized,
447        K: SingleArg,
448        G: SingleArg,
449    {
450        prepare_command(self, cmd("XPENDING").arg(key).arg(group))
451    }
452
453    /// The XPENDING command is the interface to inspect the list of pending messages.
454    ///
455    /// # See Also
456    /// [<https://redis.io/commands/xpending/>](https://redis.io/commands/xpending/)
457    fn xpending_with_options<K, G>(
458        self,
459        key: K,
460        group: G,
461        options: XPendingOptions,
462    ) -> PreparedCommand<'a, Self, Vec<XPendingMessageResult>>
463    where
464        Self: Sized,
465        K: SingleArg,
466        G: SingleArg,
467    {
468        prepare_command(self, cmd("XPENDING").arg(key).arg(group).arg(options))
469    }
470
471    /// The command returns the stream entries matching a given range of IDs.
472    ///
473    /// # Return
474    /// A collection of StreamEntry
475    ///
476    /// The command returns the entries with IDs matching the specified range.
477    /// The returned entries are complete, that means that the ID and all the fields they are composed are returned.
478    /// Moreover, the entries are returned with their fields and values in the exact same order as XADD added them.
479    ///
480    /// # See Also
481    /// [<https://redis.io/commands/xrange/>](https://redis.io/commands/xrange/)
482    fn xrange<K, S, E, V>(
483        self,
484        key: K,
485        start: S,
486        end: E,
487        count: Option<usize>,
488    ) -> PreparedCommand<'a, Self, Vec<StreamEntry<V>>>
489    where
490        Self: Sized,
491        K: SingleArg,
492        S: SingleArg,
493        E: SingleArg,
494        V: PrimitiveResponse + DeserializeOwned,
495    {
496        prepare_command(
497            self,
498            cmd("XRANGE")
499                .arg(key)
500                .arg(start)
501                .arg(end)
502                .arg(count.map(|c| ("COUNT", c))),
503        )
504    }
505
506    /// Read data from one or multiple streams,
507    /// only returning entries with an ID greater than the last received ID reported by the caller.
508    ///
509    /// # Return
510    /// A collection of XReadStreamResult
511    ///
512    /// # See Also
513    /// [<https://redis.io/commands/xread/>](https://redis.io/commands/xread/)
514    fn xread<K, KK, I, II, V, R>(
515        self,
516        options: XReadOptions,
517        keys: KK,
518        ids: II,
519    ) -> PreparedCommand<'a, Self, R>
520    where
521        Self: Sized,
522        K: SingleArg,
523        KK: SingleArgCollection<K>,
524        I: SingleArg,
525        II: SingleArgCollection<I>,
526        V: PrimitiveResponse + DeserializeOwned,
527        R: KeyValueCollectionResponse<String, Vec<StreamEntry<V>>>,
528    {
529        prepare_command(
530            self,
531            cmd("XREAD").arg(options).arg("STREAMS").arg(keys).arg(ids),
532        )
533    }
534
535    /// The XREADGROUP command is a special version of the [`xread`](StreamCommands::xread)
536    /// command with support for consumer groups.
537    ///
538    /// # Return
539    /// A collection of XReadStreamResult
540    ///
541    /// # See Also
542    /// [<https://redis.io/commands/xreadgroup/>](https://redis.io/commands/xreadgroup/)
543    fn xreadgroup<G, C, K, KK, I, II, V, R>(
544        self,
545        group: G,
546        consumer: C,
547        options: XReadGroupOptions,
548        keys: KK,
549        ids: II,
550    ) -> PreparedCommand<'a, Self, R>
551    where
552        Self: Sized,
553        G: SingleArg,
554        C: SingleArg,
555        K: SingleArg,
556        KK: SingleArgCollection<K>,
557        I: SingleArg,
558        II: SingleArgCollection<I>,
559        V: PrimitiveResponse + DeserializeOwned,
560        R: KeyValueCollectionResponse<String, Vec<StreamEntry<V>>>,
561    {
562        prepare_command(
563            self,
564            cmd("XREADGROUP")
565                .arg("GROUP")
566                .arg(group)
567                .arg(consumer)
568                .arg(options)
569                .arg("STREAMS")
570                .arg(keys)
571                .arg(ids),
572        )
573    }
574
575    /// This command is exactly like [`xrange`](StreamCommands::xrange),
576    /// but with the notable difference of returning the entries in reverse order,
577    /// and also taking the start-end range in reverse order
578    ///
579    /// # Return
580    /// A collection of StreamEntry
581    ///
582    /// # See Also
583    /// [<https://redis.io/commands/xrevrange/>](https://redis.io/commands/xrevrange/)
584    fn xrevrange<K, E, S, V>(
585        self,
586        key: K,
587        end: E,
588        start: S,
589        count: Option<usize>,
590    ) -> PreparedCommand<'a, Self, Vec<StreamEntry<V>>>
591    where
592        Self: Sized,
593        K: SingleArg,
594        E: SingleArg,
595        S: SingleArg,
596        V: PrimitiveResponse + DeserializeOwned,
597    {
598        prepare_command(
599            self,
600            cmd("XREVRANGE")
601                .arg(key)
602                .arg(end)
603                .arg(start)
604                .arg(count.map(|c| ("COUNT", c))),
605        )
606    }
607
608    /// XTRIM trims the stream by evicting older entries (entries with lower IDs) if needed.
609    ///
610    /// # Return
611    /// The number of entries deleted from the stream.
612    ///
613    /// # See Also
614    /// [<https://redis.io/commands/xtrim/>](https://redis.io/commands/xtrim/)
615    fn xtrim<K>(self, key: K, options: XTrimOptions) -> PreparedCommand<'a, Self, usize>
616    where
617        Self: Sized,
618        K: SingleArg,
619    {
620        prepare_command(self, cmd("XTRIM").arg(key).arg(options))
621    }
622}
623
624/// Stream Add options for the [`xadd`](StreamCommands::xadd) command.
625#[derive(Default)]
626pub struct XAddOptions {
627    command_args: CommandArgs,
628}
629
630impl XAddOptions {
631    #[must_use]
632    pub fn no_mk_stream(mut self) -> Self {
633        Self {
634            command_args: self.command_args.arg("NOMKSTREAM").build(),
635        }
636    }
637
638    #[must_use]
639    pub fn trim_options(mut self, trim_options: XTrimOptions) -> Self {
640        Self {
641            command_args: self.command_args.arg(trim_options).build(),
642        }
643    }
644}
645
646impl ToArgs for XAddOptions {
647    fn write_args(&self, args: &mut CommandArgs) {
648        args.arg(&self.command_args);
649    }
650}
651
652/// Stream Trim operator for the [`xadd`](StreamCommands::xadd)
653/// and [`xtrim`](StreamCommands::xtrim) commands
654#[derive(Default)]
655pub enum XTrimOperator {
656    #[default]
657    None,
658    /// =
659    Equal,
660    /// ~
661    Approximately,
662}
663
664impl ToArgs for XTrimOperator {
665    fn write_args(&self, args: &mut CommandArgs) {
666        match self {
667            XTrimOperator::None => {}
668            XTrimOperator::Equal => {
669                args.arg("=");
670            }
671            XTrimOperator::Approximately => {
672                args.arg("~");
673            }
674        }
675    }
676}
677
678/// Stream Trim options for the [`xadd`](StreamCommands::xadd)
679/// and [`xtrim`](StreamCommands::xtrim) commands
680#[derive(Default)]
681pub struct XTrimOptions {
682    command_args: CommandArgs,
683}
684
685impl XTrimOptions {
686    #[must_use]
687    pub fn max_len(operator: XTrimOperator, threshold: i64) -> Self {
688        Self {
689            command_args: CommandArgs::default()
690                .arg("MAXLEN")
691                .arg(operator)
692                .arg(threshold)
693                .build(),
694        }
695    }
696
697    #[must_use]
698    pub fn min_id<I: SingleArg>(operator: XTrimOperator, threshold_id: I) -> Self {
699        Self {
700            command_args: CommandArgs::default()
701                .arg("MINID")
702                .arg(operator)
703                .arg(threshold_id)
704                .build(),
705        }
706    }
707
708    #[must_use]
709    pub fn limit(mut self, count: usize) -> Self {
710        Self {
711            command_args: self.command_args.arg("LIMIT").arg(count).build(),
712        }
713    }
714}
715
716impl ToArgs for XTrimOptions {
717    fn write_args(&self, args: &mut CommandArgs) {
718        args.arg(&self.command_args);
719    }
720}
721
722/// Options for the [`xautoclaim`](StreamCommands::xautoclaim) command
723#[derive(Default)]
724pub struct XAutoClaimOptions {
725    command_args: CommandArgs,
726}
727
728impl XAutoClaimOptions {
729    #[must_use]
730    pub fn count(mut self, count: usize) -> Self {
731        Self {
732            command_args: self.command_args.arg("COUNT").arg(count).build(),
733        }
734    }
735
736    #[must_use]
737    pub fn just_id(mut self) -> Self {
738        Self {
739            command_args: self.command_args.arg("JUSTID").build(),
740        }
741    }
742}
743
744impl ToArgs for XAutoClaimOptions {
745    fn write_args(&self, args: &mut CommandArgs) {
746        args.arg(&self.command_args);
747    }
748}
749
750/// Result for the [`xrange`](StreamCommands::xrange) and other associated commands.
751#[derive(Deserialize)]
752pub struct StreamEntry<V>
753where
754    V: PrimitiveResponse,
755{
756    /// The stream Id
757    pub stream_id: String,
758    /// entries with their fields and values in the exact same
759    /// order as [`xadd`](StreamCommands::xadd) added them.
760    pub items: HashMap<String, V>,
761}
762
763/// Result for the [`xautoclaim`](StreamCommands::xautoclaim) command.
764#[derive(Deserialize)]
765pub struct XAutoClaimResult<V>
766where
767    V: PrimitiveResponse,
768{
769    /// A stream ID to be used as the `start` argument for
770    /// the next call to [`xautoclaim`](StreamCommands::xautoclaim).
771    pub start_stream_id: String,
772    /// An array containing all the successfully claimed messages in
773    /// the same format as [`xrange`](StreamCommands::xrange).
774    pub entries: Vec<StreamEntry<V>>,
775    /// An array containing message IDs that no longer exist in the stream,
776    /// and were deleted from the PEL in which they were found.
777    pub deleted_ids: Vec<String>,
778}
779
780/// Options for the [`xclaim`](StreamCommands::xclaim) command
781#[derive(Default)]
782pub struct XClaimOptions {
783    command_args: CommandArgs,
784}
785
786impl XClaimOptions {
787    /// Set the idle time (last time it was delivered) of the message.
788    #[must_use]
789    pub fn idle_time(mut self, idle_time_millis: u64) -> Self {
790        Self {
791            command_args: self.command_args.arg("IDLE").arg(idle_time_millis).build(),
792        }
793    }
794
795    ///  This is the same as `idle_time` but instead of a relative amount of milliseconds,
796    /// it sets the idle time to a specific Unix time (in milliseconds).
797    #[must_use]
798    pub fn time(mut self, unix_time_milliseconds: u64) -> Self {
799        Self {
800            command_args: self
801                .command_args
802                .arg("TIME")
803                .arg(unix_time_milliseconds)
804                .build(),
805        }
806    }
807
808    /// Set the retry counter to the specified value.
809    #[must_use]
810    pub fn retry_count(mut self, count: usize) -> Self {
811        Self {
812            command_args: self.command_args.arg("RETRYCOUNT").arg(count).build(),
813        }
814    }
815
816    /// Creates the pending message entry in the PEL
817    /// even if certain specified IDs are not already
818    /// in the PEL assigned to a different client.
819    #[must_use]
820    pub fn force(mut self) -> Self {
821        Self {
822            command_args: self.command_args.arg("FORCE").build(),
823        }
824    }
825
826    ///  Return just an array of IDs of messages successfully claimed,
827    /// without returning the actual message.
828    #[must_use]
829    pub fn just_id(mut self) -> Self {
830        Self {
831            command_args: self.command_args.arg("JUSTID").build(),
832        }
833    }
834}
835
836impl ToArgs for XClaimOptions {
837    fn write_args(&self, args: &mut CommandArgs) {
838        args.arg(&self.command_args);
839    }
840}
841
842/// Options for the [`xgroup_create`](StreamCommands::xgroup_create) command
843#[derive(Default)]
844pub struct XGroupCreateOptions {
845    command_args: CommandArgs,
846}
847
848impl XGroupCreateOptions {
849    /// By default, the XGROUP CREATE command insists that the target stream exists and returns an error when it doesn't.
850    ///  However, you can use the optional MKSTREAM subcommand as the last argument after the `id`
851    /// to automatically create the stream (with length of 0) if it doesn't exist
852    #[must_use]
853    pub fn mk_stream(mut self) -> Self {
854        Self {
855            command_args: self.command_args.arg("MKSTREAM").build(),
856        }
857    }
858
859    /// The optional entries_read named argument can be specified to enable consumer group lag tracking for an arbitrary ID.
860    /// An arbitrary ID is any ID that isn't the ID of the stream's first entry, its last entry or the zero ("0-0") ID.
861    /// This can be useful you know exactly how many entries are between the arbitrary ID (excluding it) and the stream's last entry.
862    /// In such cases, the entries_read can be set to the stream's entries_added subtracted with the number of entries.
863    #[must_use]
864    pub fn entries_read(mut self, entries_read: usize) -> Self {
865        Self {
866            command_args: self
867                .command_args
868                .arg("ENTRIESREAD")
869                .arg(entries_read)
870                .build(),
871        }
872    }
873}
874
875impl ToArgs for XGroupCreateOptions {
876    fn write_args(&self, args: &mut CommandArgs) {
877        args.arg(&self.command_args);
878    }
879}
880
881/// Result entry for the [`xinfo_consumers`](StreamCommands::xinfo_consumers) command.
882#[derive(Deserialize)]
883pub struct XConsumerInfo {
884    /// the consumer's name
885    pub name: String,
886
887    /// the number of pending messages for the client,
888    /// which are messages that were delivered but are yet to be acknowledged
889    pub pending: usize,
890
891    /// the number of milliseconds that have passed
892    /// since the consumer last interacted with the server
893    #[serde(rename = "idle")]
894    pub idle_millis: u64,
895}
896
897/// Result entry for the [`xinfo_groups`](StreamCommands::xinfo_groups) command.
898#[derive(Deserialize)]
899#[serde(rename_all = "kebab-case")]
900pub struct XGroupInfo {
901    /// the consumer group's name
902    pub name: String,
903
904    /// the number of consumers in the group
905    pub consumers: usize,
906
907    /// the length of the group's pending entries list (PEL),
908    /// which are messages that were delivered but are yet to be acknowledged
909    pub pending: usize,
910
911    /// the ID of the last entry delivered the group's consumers
912    pub last_delivered_id: String,
913
914    /// the logical "read counter" of the last entry delivered to group's consumers
915    pub entries_read: Option<usize>,
916
917    /// the number of entries in the stream that are still waiting to be delivered to the group's consumers,
918    /// or a NULL when that number can't be determined.
919    pub lag: Option<usize>,
920}
921
922/// Options for the [`xinfo_stream`](StreamCommands::xinfo_stream) command
923#[derive(Default)]
924pub struct XInfoStreamOptions {
925    command_args: CommandArgs,
926}
927
928impl XInfoStreamOptions {
929    /// The optional FULL modifier provides a more verbose reply.
930    #[must_use]
931    pub fn full(mut self) -> Self {
932        Self {
933            command_args: self.command_args.arg("FULL").build(),
934        }
935    }
936
937    /// The COUNT option can be used to limit the number of stream and PEL entries that are returned
938    /// (The first `count` entries are returned).
939    #[must_use]
940    pub fn count(mut self, count: usize) -> Self {
941        Self {
942            command_args: self.command_args.arg("COUNT").arg(count).build(),
943        }
944    }
945}
946
947impl ToArgs for XInfoStreamOptions {
948    fn write_args(&self, args: &mut CommandArgs) {
949        args.arg(&self.command_args);
950    }
951}
952
953/// Stream info returned by the [`xinfo_stream`](StreamCommands::xinfo_stream) command.
954#[derive(Deserialize)]
955#[serde(rename_all = "kebab-case")]
956pub struct XStreamInfo {
957    /// the number of entries in the stream (see [`xlen`](StreamCommands::xlen))
958    pub length: usize,
959
960    /// the number of keys in the underlying radix data structure
961    pub radix_tree_keys: usize,
962
963    /// the number of nodes in the underlying radix data structure
964    pub radix_tree_nodes: usize,
965
966    /// the number of consumer groups defined for the stream
967    pub groups: usize,
968
969    /// the ID of the least-recently entry that was added to the stream
970    pub last_generated_id: String,
971
972    /// the maximal entry ID that was deleted from the stream
973    pub max_deleted_entry_id: String,
974
975    /// the count of all entries added to the stream during its lifetime
976    pub entries_added: usize,
977
978    /// the ID and field-value tuples of the first entry in the stream
979    pub first_entry: StreamEntry<String>,
980
981    /// the ID and field-value tuples of the last entry in the stream
982    pub last_entry: StreamEntry<String>,
983
984    pub recorded_first_entry_id: String,
985}
986
987/// Options for the [`xread`](StreamCommands::xread) command
988#[derive(Default)]
989pub struct XReadOptions {
990    command_args: CommandArgs,
991}
992
993impl XReadOptions {
994    #[must_use]
995    pub fn count(mut self, count: usize) -> Self {
996        Self {
997            command_args: self.command_args.arg("COUNT").arg(count).build(),
998        }
999    }
1000
1001    #[must_use]
1002    pub fn block(mut self, milliseconds: u64) -> Self {
1003        Self {
1004            command_args: self.command_args.arg("BLOCK").arg(milliseconds).build(),
1005        }
1006    }
1007}
1008
1009impl ToArgs for XReadOptions {
1010    fn write_args(&self, args: &mut CommandArgs) {
1011        args.arg(&self.command_args);
1012    }
1013}
1014
1015/// Options for the [`xreadgroup`](StreamCommands::xreadgroup) command
1016#[derive(Default)]
1017pub struct XReadGroupOptions {
1018    command_args: CommandArgs,
1019}
1020
1021impl XReadGroupOptions {
1022    #[must_use]
1023    pub fn count(mut self, count: usize) -> Self {
1024        Self {
1025            command_args: self.command_args.arg("COUNT").arg(count).build(),
1026        }
1027    }
1028
1029    #[must_use]
1030    pub fn block(mut self, milliseconds: u64) -> Self {
1031        Self {
1032            command_args: self.command_args.arg("BLOCK").arg(milliseconds).build(),
1033        }
1034    }
1035
1036    #[must_use]
1037    pub fn no_ack(mut self) -> Self {
1038        Self {
1039            command_args: self.command_args.arg("NOACK").build(),
1040        }
1041    }
1042}
1043
1044impl ToArgs for XReadGroupOptions {
1045    fn write_args(&self, args: &mut CommandArgs) {
1046        args.arg(&self.command_args);
1047    }
1048}
1049
1050/// Options for the [`xpending_with_options`](StreamCommands::xpending_with_options) command
1051#[derive(Default)]
1052pub struct XPendingOptions {
1053    command_args: CommandArgs,
1054}
1055
1056impl XPendingOptions {
1057    #[must_use]
1058    pub fn idle(mut self, min_idle_time: u64) -> Self {
1059        Self {
1060            command_args: self.command_args.arg("IDLE").arg(min_idle_time).build(),
1061        }
1062    }
1063
1064    #[must_use]
1065    pub fn start<S: SingleArg>(mut self, start: S) -> Self {
1066        Self {
1067            command_args: self.command_args.arg(start).build(),
1068        }
1069    }
1070
1071    #[must_use]
1072    pub fn end<E: SingleArg>(mut self, end: E) -> Self {
1073        Self {
1074            command_args: self.command_args.arg(end).build(),
1075        }
1076    }
1077
1078    #[must_use]
1079    pub fn count(mut self, count: usize) -> Self {
1080        Self {
1081            command_args: self.command_args.arg(count).build(),
1082        }
1083    }
1084
1085    #[must_use]
1086    pub fn consumer<C: SingleArg>(mut self, consumer: C) -> Self {
1087        Self {
1088            command_args: self.command_args.arg(consumer).build(),
1089        }
1090    }
1091}
1092
1093impl ToArgs for XPendingOptions {
1094    fn write_args(&self, args: &mut CommandArgs) {
1095        args.arg(&self.command_args);
1096    }
1097}
1098
1099/// Result for the [`xpending`](StreamCommands::xpending) command
1100#[derive(Deserialize)]
1101pub struct XPendingResult {
1102    pub num_pending_messages: usize,
1103    pub smallest_id: String,
1104    pub greatest_id: String,
1105    pub consumers: Vec<XPendingConsumer>,
1106}
1107
1108/// Customer info result for the [`xpending`](StreamCommands::xpending) command
1109#[derive(Deserialize)]
1110pub struct XPendingConsumer {
1111    pub consumer: String,
1112    pub num_messages: usize,
1113}
1114
1115/// Message result for the [`xpending_with_options`](StreamCommands::xpending_with_options) command
1116#[derive(Deserialize)]
1117pub struct XPendingMessageResult {
1118    pub message_id: String,
1119    pub consumer: String,
1120    pub elapsed_millis: u64,
1121    pub times_delivered: usize,
1122}