redis/commands/
streams.rs

1//! Defines types to use with the streams commands.
2
3#[cfg(feature = "streams")]
4use crate::{
5    errors::{invalid_type_error, ParsingError},
6    types::HashMap,
7    FromRedisValue, RedisWrite, ToRedisArgs, Value,
8};
9use crate::{from_redis_value, from_redis_value_ref, types::ToSingleRedisArg};
10
11// Stream Maxlen Enum
12
13/// Utility enum for passing `MAXLEN [= or ~] [COUNT]`
14/// arguments into `StreamCommands`.
15/// The enum value represents the count.
16#[derive(PartialEq, Eq, Clone, Debug, Copy)]
17#[non_exhaustive]
18pub enum StreamMaxlen {
19    /// Match an exact count
20    Equals(usize),
21    /// Match an approximate count
22    Approx(usize),
23}
24
25impl ToRedisArgs for StreamMaxlen {
26    fn write_redis_args<W>(&self, out: &mut W)
27    where
28        W: ?Sized + RedisWrite,
29    {
30        let (ch, val) = match *self {
31            StreamMaxlen::Equals(v) => ("=", v),
32            StreamMaxlen::Approx(v) => ("~", v),
33        };
34        out.write_arg(b"MAXLEN");
35        out.write_arg(ch.as_bytes());
36        val.write_redis_args(out);
37    }
38}
39
40/// Utility enum for passing the trim mode`[=|~]`
41/// arguments into `StreamCommands`.
42#[derive(Debug)]
43#[non_exhaustive]
44pub enum StreamTrimmingMode {
45    /// Match an exact count
46    Exact,
47    /// Match an approximate count
48    Approx,
49}
50
51impl ToRedisArgs for StreamTrimmingMode {
52    fn write_redis_args<W>(&self, out: &mut W)
53    where
54        W: ?Sized + RedisWrite,
55    {
56        match self {
57            Self::Exact => out.write_arg(b"="),
58            Self::Approx => out.write_arg(b"~"),
59        };
60    }
61}
62
63/// Utility enum for passing `<MAXLEN|MINID> [=|~] threshold [LIMIT count]`
64/// arguments into `StreamCommands`.
65/// The enum values the trimming mode (=|~), the threshold, and the optional limit
66#[derive(Debug)]
67#[non_exhaustive]
68pub enum StreamTrimStrategy {
69    /// Evicts entries as long as the streams length exceeds threshold.  With an optional limit.
70    MaxLen(StreamTrimmingMode, usize, Option<usize>),
71    /// Evicts entries with IDs lower than threshold, where threshold is a stream ID With an optional limit.
72    MinId(StreamTrimmingMode, String, Option<usize>),
73}
74
75impl StreamTrimStrategy {
76    /// Define a MAXLEN trim strategy with the given maximum number of entries
77    pub fn maxlen(trim: StreamTrimmingMode, max_entries: usize) -> Self {
78        Self::MaxLen(trim, max_entries, None)
79    }
80
81    /// Defines a MINID trim strategy with the given minimum stream ID
82    pub fn minid(trim: StreamTrimmingMode, stream_id: impl Into<String>) -> Self {
83        Self::MinId(trim, stream_id.into(), None)
84    }
85
86    /// Set a limit to the number of records to trim in a single operation
87    pub fn limit(self, limit: usize) -> Self {
88        match self {
89            StreamTrimStrategy::MaxLen(m, t, _) => StreamTrimStrategy::MaxLen(m, t, Some(limit)),
90            StreamTrimStrategy::MinId(m, t, _) => StreamTrimStrategy::MinId(m, t, Some(limit)),
91        }
92    }
93}
94
95impl ToRedisArgs for StreamTrimStrategy {
96    fn write_redis_args<W>(&self, out: &mut W)
97    where
98        W: ?Sized + RedisWrite,
99    {
100        let limit = match self {
101            StreamTrimStrategy::MaxLen(m, t, limit) => {
102                out.write_arg(b"MAXLEN");
103                m.write_redis_args(out);
104                t.write_redis_args(out);
105                limit
106            }
107            StreamTrimStrategy::MinId(m, t, limit) => {
108                out.write_arg(b"MINID");
109                m.write_redis_args(out);
110                t.write_redis_args(out);
111                limit
112            }
113        };
114        if let Some(limit) = limit {
115            out.write_arg(b"LIMIT");
116            limit.write_redis_args(out);
117        }
118    }
119}
120
121/// Builder options for [`xtrim_options`] command
122///
123/// [`xtrim_options`]: ../trait.Commands.html#method.xtrim_options
124///
125#[derive(Debug)]
126pub struct StreamTrimOptions {
127    strategy: StreamTrimStrategy,
128    deletion_policy: Option<StreamDeletionPolicy>,
129}
130
131impl StreamTrimOptions {
132    /// Define a MAXLEN trim strategy with the given maximum number of entries
133    pub fn maxlen(mode: StreamTrimmingMode, max_entries: usize) -> Self {
134        Self {
135            strategy: StreamTrimStrategy::maxlen(mode, max_entries),
136            deletion_policy: None,
137        }
138    }
139
140    /// Defines a MINID trim strategy with the given minimum stream ID
141    pub fn minid(mode: StreamTrimmingMode, stream_id: impl Into<String>) -> Self {
142        Self {
143            strategy: StreamTrimStrategy::minid(mode, stream_id),
144            deletion_policy: None,
145        }
146    }
147
148    /// Set a limit to the number of records to trim in a single operation
149    pub fn limit(mut self, limit: usize) -> Self {
150        self.strategy = self.strategy.limit(limit);
151        self
152    }
153
154    /// Set the deletion policy for the XTRIM operation
155    pub fn set_deletion_policy(mut self, deletion_policy: StreamDeletionPolicy) -> Self {
156        self.deletion_policy = Some(deletion_policy);
157        self
158    }
159}
160
161impl ToRedisArgs for StreamTrimOptions {
162    fn write_redis_args<W>(&self, out: &mut W)
163    where
164        W: ?Sized + RedisWrite,
165    {
166        self.strategy.write_redis_args(out);
167        if let Some(deletion_policy) = self.deletion_policy.as_ref() {
168            deletion_policy.write_redis_args(out);
169        }
170    }
171}
172
173/// Builder options for [`xadd_options`] command
174///
175/// [`xadd_options`]: ../trait.Commands.html#method.xadd_options
176///
177#[derive(Default, Debug)]
178pub struct StreamAddOptions {
179    nomkstream: bool,
180    trim: Option<StreamTrimStrategy>,
181    deletion_policy: Option<StreamDeletionPolicy>,
182}
183
184impl StreamAddOptions {
185    /// Set the NOMKSTREAM flag on which prevents creating a stream for the XADD operation
186    pub fn nomkstream(mut self) -> Self {
187        self.nomkstream = true;
188        self
189    }
190
191    /// Enable trimming when adding using the given trim strategy
192    pub fn trim(mut self, trim: StreamTrimStrategy) -> Self {
193        self.trim = Some(trim);
194        self
195    }
196
197    /// Set the deletion policy for the XADD operation
198    pub fn set_deletion_policy(mut self, deletion_policy: StreamDeletionPolicy) -> Self {
199        self.deletion_policy = Some(deletion_policy);
200        self
201    }
202}
203
204impl ToRedisArgs for StreamAddOptions {
205    fn write_redis_args<W>(&self, out: &mut W)
206    where
207        W: ?Sized + RedisWrite,
208    {
209        if self.nomkstream {
210            out.write_arg(b"NOMKSTREAM");
211        }
212        if let Some(strategy) = self.trim.as_ref() {
213            strategy.write_redis_args(out);
214        }
215        if let Some(deletion_policy) = self.deletion_policy.as_ref() {
216            deletion_policy.write_redis_args(out);
217        }
218    }
219}
220
221/// Builder options for [`xautoclaim_options`] command.
222///
223/// [`xautoclaim_options`]: ../trait.Commands.html#method.xautoclaim_options
224///
225#[derive(Default, Debug)]
226pub struct StreamAutoClaimOptions {
227    count: Option<usize>,
228    justid: bool,
229}
230
231impl StreamAutoClaimOptions {
232    /// Sets the maximum number of elements to claim per stream.
233    pub fn count(mut self, n: usize) -> Self {
234        self.count = Some(n);
235        self
236    }
237
238    /// Set `JUSTID` cmd arg to true. Be advised: the response
239    /// type changes with this option.
240    pub fn with_justid(mut self) -> Self {
241        self.justid = true;
242        self
243    }
244}
245
246impl ToRedisArgs for StreamAutoClaimOptions {
247    fn write_redis_args<W>(&self, out: &mut W)
248    where
249        W: ?Sized + RedisWrite,
250    {
251        if let Some(ref count) = self.count {
252            out.write_arg(b"COUNT");
253            out.write_arg(format!("{count}").as_bytes());
254        }
255        if self.justid {
256            out.write_arg(b"JUSTID");
257        }
258    }
259}
260
261/// Builder options for [`xclaim_options`] command.
262///
263/// [`xclaim_options`]: ../trait.Commands.html#method.xclaim_options
264///
265#[derive(Default, Debug)]
266pub struct StreamClaimOptions {
267    /// Set `IDLE <milliseconds>` cmd arg.
268    idle: Option<usize>,
269    /// Set `TIME <Unix epoch milliseconds>` cmd arg.
270    time: Option<usize>,
271    /// Set `RETRYCOUNT <count>` cmd arg.
272    retry: Option<usize>,
273    /// Set `FORCE` cmd arg.
274    force: bool,
275    /// Set `JUSTID` cmd arg. Be advised: the response
276    /// type changes with this option.
277    justid: bool,
278    /// Set `LASTID <lastid>` cmd arg.
279    lastid: Option<String>,
280}
281
282impl StreamClaimOptions {
283    /// Set `IDLE <milliseconds>` cmd arg.
284    pub fn idle(mut self, ms: usize) -> Self {
285        self.idle = Some(ms);
286        self
287    }
288
289    /// Set `TIME <Unix epoch milliseconds>` cmd arg.
290    pub fn time(mut self, ms_time: usize) -> Self {
291        self.time = Some(ms_time);
292        self
293    }
294
295    /// Set `RETRYCOUNT <count>` cmd arg.
296    pub fn retry(mut self, count: usize) -> Self {
297        self.retry = Some(count);
298        self
299    }
300
301    /// Set `FORCE` cmd arg to true.
302    pub fn with_force(mut self) -> Self {
303        self.force = true;
304        self
305    }
306
307    /// Set `JUSTID` cmd arg to true. Be advised: the response
308    /// type changes with this option.
309    pub fn with_justid(mut self) -> Self {
310        self.justid = true;
311        self
312    }
313
314    /// Set `LASTID <lastid>` cmd arg.
315    pub fn with_lastid(mut self, lastid: impl Into<String>) -> Self {
316        self.lastid = Some(lastid.into());
317        self
318    }
319}
320
321impl ToRedisArgs for StreamClaimOptions {
322    fn write_redis_args<W>(&self, out: &mut W)
323    where
324        W: ?Sized + RedisWrite,
325    {
326        if let Some(ref ms) = self.idle {
327            out.write_arg(b"IDLE");
328            out.write_arg(format!("{ms}").as_bytes());
329        }
330        if let Some(ref ms_time) = self.time {
331            out.write_arg(b"TIME");
332            out.write_arg(format!("{ms_time}").as_bytes());
333        }
334        if let Some(ref count) = self.retry {
335            out.write_arg(b"RETRYCOUNT");
336            out.write_arg(format!("{count}").as_bytes());
337        }
338        if self.force {
339            out.write_arg(b"FORCE");
340        }
341        if self.justid {
342            out.write_arg(b"JUSTID");
343        }
344        if let Some(ref lastid) = self.lastid {
345            out.write_arg(b"LASTID");
346            lastid.write_redis_args(out);
347        }
348    }
349}
350
351/// Argument to `StreamReadOptions`
352/// Represents the Redis `GROUP <groupname> <consumername>` cmd arg.
353/// This option will toggle the cmd from `XREAD` to `XREADGROUP`
354type SRGroup = Option<(Vec<Vec<u8>>, Vec<Vec<u8>>)>;
355/// Builder options for [`xread_options`] command.
356///
357/// [`xread_options`]: ../trait.Commands.html#method.xread_options
358///
359#[derive(Default, Debug)]
360pub struct StreamReadOptions {
361    /// Set the `BLOCK <milliseconds>` cmd arg.
362    block: Option<usize>,
363    /// Set the `COUNT <count>` cmd arg.
364    count: Option<usize>,
365    /// Set the `NOACK` cmd arg.
366    noack: Option<bool>,
367    /// Set the `GROUP <groupname> <consumername>` cmd arg.
368    /// This option will toggle the cmd from XREAD to XREADGROUP.
369    group: SRGroup,
370    /// Set the `CLAIM <min-idle-time>` cmd arg.
371    /// The `<min-idle-time>` is specified in milliseconds.
372    claim: Option<usize>,
373}
374
375impl StreamReadOptions {
376    /// Indicates whether the command is participating in a group
377    /// and generating ACKs
378    pub fn read_only(&self) -> bool {
379        self.group.is_none()
380    }
381
382    /// Sets the command so that it avoids adding the message
383    /// to the PEL in cases where reliability is not a requirement
384    /// and the occasional message loss is acceptable.
385    pub fn noack(mut self) -> Self {
386        self.noack = Some(true);
387        self
388    }
389
390    /// Sets the block time in milliseconds.
391    pub fn block(mut self, ms: usize) -> Self {
392        self.block = Some(ms);
393        self
394    }
395
396    /// Sets the maximum number of elements to return per stream.
397    pub fn count(mut self, n: usize) -> Self {
398        self.count = Some(n);
399        self
400    }
401
402    /// Sets the name of a consumer group associated to the stream.
403    pub fn group<GN: ToRedisArgs, CN: ToRedisArgs>(
404        mut self,
405        group_name: GN,
406        consumer_name: CN,
407    ) -> Self {
408        self.group = Some((
409            ToRedisArgs::to_redis_args(&group_name),
410            ToRedisArgs::to_redis_args(&consumer_name),
411        ));
412        self
413    }
414
415    /// Set the minimum idle time for the CLAIM parameter.
416    pub fn claim(mut self, min_idle_time: usize) -> Self {
417        self.claim = Some(min_idle_time);
418        self
419    }
420}
421
422impl ToRedisArgs for StreamReadOptions {
423    fn write_redis_args<W>(&self, out: &mut W)
424    where
425        W: ?Sized + RedisWrite,
426    {
427        if let Some(ref group) = self.group {
428            out.write_arg(b"GROUP");
429            for i in &group.0 {
430                out.write_arg(i);
431            }
432            for i in &group.1 {
433                out.write_arg(i);
434            }
435        }
436
437        if let Some(ref ms) = self.block {
438            out.write_arg(b"BLOCK");
439            out.write_arg(format!("{ms}").as_bytes());
440        }
441
442        if let Some(ref n) = self.count {
443            out.write_arg(b"COUNT");
444            out.write_arg(format!("{n}").as_bytes());
445        }
446
447        if self.group.is_some() {
448            // noack is only available w/ xreadgroup
449            if self.noack == Some(true) {
450                out.write_arg(b"NOACK");
451            }
452            // claim is only available w/ xreadgroup
453            if let Some(ref min_idle_time) = self.claim {
454                out.write_arg(b"CLAIM");
455                out.write_arg(format!("{min_idle_time}").as_bytes());
456            }
457        }
458    }
459}
460
461/// Reply type used with the [`xautoclaim_options`] command.
462///
463/// [`xautoclaim_options`]: ../trait.Commands.html#method.xautoclaim_options
464///
465#[derive(Default, Debug, Clone)]
466pub struct StreamAutoClaimReply {
467    /// The next stream id to use as the start argument for the next xautoclaim
468    pub next_stream_id: String,
469    /// The entries claimed for the consumer. When JUSTID is enabled the map in each entry is blank
470    pub claimed: Vec<StreamId>,
471    /// The list of stream ids that were removed due to no longer being in the stream
472    pub deleted_ids: Vec<String>,
473    /// If set, this means that the reply contained invalid nil entries, that were skipped during parsing.
474    ///
475    /// This should only happen when using Redis 6, see <https://github.com/redis-rs/redis-rs/issues/1798>
476    pub invalid_entries: bool,
477}
478
479/// Reply type used with [`xread`] or [`xread_options`] commands.
480///
481/// [`xread`]: ../trait.Commands.html#method.xread
482/// [`xread_options`]: ../trait.Commands.html#method.xread_options
483///
484#[derive(Default, Debug, Clone)]
485pub struct StreamReadReply {
486    /// Complex data structure containing a payload for each key in this array
487    pub keys: Vec<StreamKey>,
488}
489
490/// Reply type used with [`xrange`], [`xrange_count`], [`xrange_all`], [`xrevrange`], [`xrevrange_count`], [`xrevrange_all`] commands.
491///
492/// Represents stream entries matching a given range of `id`'s.
493///
494/// [`xrange`]: ../trait.Commands.html#method.xrange
495/// [`xrange_count`]: ../trait.Commands.html#method.xrange_count
496/// [`xrange_all`]: ../trait.Commands.html#method.xrange_all
497/// [`xrevrange`]: ../trait.Commands.html#method.xrevrange
498/// [`xrevrange_count`]: ../trait.Commands.html#method.xrevrange_count
499/// [`xrevrange_all`]: ../trait.Commands.html#method.xrevrange_all
500///
501#[derive(Default, Debug, Clone)]
502pub struct StreamRangeReply {
503    /// Complex data structure containing a payload for each ID in this array
504    pub ids: Vec<StreamId>,
505}
506
507/// Reply type used with [`xclaim`] command.
508///
509/// Represents that ownership of the specified messages was changed.
510///
511/// [`xclaim`]: ../trait.Commands.html#method.xclaim
512///
513#[derive(Default, Debug, Clone)]
514pub struct StreamClaimReply {
515    /// Complex data structure containing a payload for each ID in this array
516    pub ids: Vec<StreamId>,
517}
518
519/// Reply type used with [`xpending`] command.
520///
521/// Data returned here were fetched from the stream without
522/// having been acknowledged.
523///
524/// [`xpending`]: ../trait.Commands.html#method.xpending
525///
526#[derive(Debug, Clone, Default)]
527#[non_exhaustive]
528pub enum StreamPendingReply {
529    /// The stream is empty.
530    #[default]
531    Empty,
532    /// Data with payload exists in the stream.
533    Data(StreamPendingData),
534}
535
536impl StreamPendingReply {
537    /// Returns how many records are in the reply.
538    pub fn count(&self) -> usize {
539        match self {
540            StreamPendingReply::Empty => 0,
541            StreamPendingReply::Data(x) => x.count,
542        }
543    }
544}
545
546/// Inner reply type when an [`xpending`] command has data.
547///
548/// [`xpending`]: ../trait.Commands.html#method.xpending
549#[derive(Default, Debug, Clone)]
550pub struct StreamPendingData {
551    /// Limit on the number of messages to return per call.
552    pub count: usize,
553    /// ID for the first pending record.
554    pub start_id: String,
555    /// ID for the final pending record.
556    pub end_id: String,
557    /// Every consumer in the consumer group with at
558    /// least one pending message,
559    /// and the number of pending messages it has.
560    pub consumers: Vec<StreamInfoConsumer>,
561}
562
563/// Reply type used with [`xpending_count`] and
564/// [`xpending_consumer_count`] commands.
565///
566/// Data returned here have been fetched from the stream without
567/// any acknowledgement.
568///
569/// [`xpending_count`]: ../trait.Commands.html#method.xpending_count
570/// [`xpending_consumer_count`]: ../trait.Commands.html#method.xpending_consumer_count
571///
572#[derive(Default, Debug, Clone)]
573pub struct StreamPendingCountReply {
574    /// An array of structs containing information about
575    /// message IDs yet to be acknowledged by various consumers,
576    /// time since last ack, and total number of acks by that consumer.
577    pub ids: Vec<StreamPendingId>,
578}
579
580/// Reply type used with [`xinfo_stream`] command, containing
581/// general information about the stream stored at the specified key.
582///
583/// The very first and last IDs in the stream are shown,
584/// in order to give some sense about what is the stream content.
585///
586/// [`xinfo_stream`]: ../trait.Commands.html#method.xinfo_stream
587///
588#[derive(Default, Debug, Clone)]
589pub struct StreamInfoStreamReply {
590    /// The last generated ID that may not be the same as the last
591    /// entry ID in case some entry was deleted.
592    pub last_generated_id: String,
593    /// Details about the radix tree representing the stream mostly
594    /// useful for optimization and debugging tasks.
595    pub radix_tree_keys: usize,
596    /// The number of consumer groups associated with the stream.
597    pub groups: usize,
598    /// Number of elements of the stream.
599    pub length: usize,
600    /// The very first entry in the stream.
601    pub first_entry: StreamId,
602    /// The very last entry in the stream.
603    pub last_entry: StreamId,
604}
605
606/// Reply type used with [`xinfo_consumer`] command, an array of every
607/// consumer in a specific consumer group.
608///
609/// [`xinfo_consumer`]: ../trait.Commands.html#method.xinfo_consumer
610///
611#[derive(Default, Debug, Clone)]
612pub struct StreamInfoConsumersReply {
613    /// An array of every consumer in a specific consumer group.
614    pub consumers: Vec<StreamInfoConsumer>,
615}
616
617/// Reply type used with [`xinfo_groups`] command.
618///
619/// This output represents all the consumer groups associated with
620/// the stream.
621///
622/// [`xinfo_groups`]: ../trait.Commands.html#method.xinfo_groups
623///
624#[derive(Default, Debug, Clone)]
625pub struct StreamInfoGroupsReply {
626    /// All the consumer groups associated with the stream.
627    pub groups: Vec<StreamInfoGroup>,
628}
629
630/// A consumer parsed from [`xinfo_consumers`] command.
631///
632/// [`xinfo_consumers`]: ../trait.Commands.html#method.xinfo_consumers
633///
634#[derive(Default, Debug, Clone)]
635pub struct StreamInfoConsumer {
636    /// Name of the consumer group.
637    pub name: String,
638    /// Number of pending messages for this specific consumer.
639    pub pending: usize,
640    /// This consumer's idle time in milliseconds.
641    pub idle: usize,
642}
643
644/// A group parsed from [`xinfo_groups`] command.
645///
646/// [`xinfo_groups`]: ../trait.Commands.html#method.xinfo_groups
647///
648#[derive(Default, Debug, Clone)]
649pub struct StreamInfoGroup {
650    /// The group name.
651    pub name: String,
652    /// Number of consumers known in the group.
653    pub consumers: usize,
654    /// Number of pending messages (delivered but not yet acknowledged) in the group.
655    pub pending: usize,
656    /// Last ID delivered to this group.
657    pub last_delivered_id: String,
658    /// The logical "read counter" of the last entry delivered to group's consumers
659    /// (or `None` if the server does not provide the value).
660    pub entries_read: Option<usize>,
661    /// The number of entries in the stream that are still waiting to be delivered to the
662    /// group's consumers, or a `None` when that number can't be determined.
663    pub lag: Option<usize>,
664}
665
666/// Represents a pending message parsed from [`xpending`] methods.
667///
668/// [`xpending`]: ../trait.Commands.html#method.xpending
669#[derive(Default, Debug, Clone)]
670pub struct StreamPendingId {
671    /// The ID of the message.
672    pub id: String,
673    /// The name of the consumer that fetched the message and has
674    /// still to acknowledge it. We call it the current owner
675    /// of the message.
676    pub consumer: String,
677    /// The number of milliseconds that elapsed since the
678    /// last time this message was delivered to this consumer.
679    pub last_delivered_ms: usize,
680    /// The number of times this message was delivered.
681    pub times_delivered: usize,
682}
683
684/// Represents a stream `key` and its `id`'s parsed from `xread` methods.
685#[derive(Default, Debug, Clone)]
686pub struct StreamKey {
687    /// The stream `key`.
688    pub key: String,
689    /// The parsed stream `id`'s.
690    pub ids: Vec<StreamId>,
691}
692
693/// Represents a stream `id` and its field/values as a `HashMap`
694/// Also contains optional PEL information if the message was fetched with XREADGROUP with a `claim` option
695#[derive(Default, Debug, Clone, PartialEq)]
696pub struct StreamId {
697    /// The stream `id` (entry ID) of this particular message.
698    pub id: String,
699    /// All fields in this message, associated with their respective values.
700    pub map: HashMap<String, Value>,
701    /// The number of milliseconds that elapsed since the last time this entry was delivered to a consumer.
702    pub milliseconds_elapsed_from_delivery: Option<usize>,
703    /// The number of times this entry was delivered.
704    pub delivered_count: Option<usize>,
705}
706
707impl StreamId {
708    /// Converts a `Value::Array` into a `StreamId`.
709    fn from_array_value(v: Value) -> Result<Self, ParsingError> {
710        let mut stream_id = StreamId::default();
711        if let Value::Array(mut values) = v {
712            if let Some(v) = values.first_mut() {
713                stream_id.id = from_redis_value(std::mem::take(v))?;
714            }
715            if let Some(v) = values.first_mut() {
716                stream_id.map = from_redis_value(std::mem::take(v))?;
717            }
718        }
719
720        Ok(stream_id)
721    }
722
723    /// Fetches value of a given field and converts it to the specified
724    /// type.
725    pub fn get<T: FromRedisValue>(&self, key: &str) -> Option<T> {
726        match self.map.get(key) {
727            Some(x) => from_redis_value_ref(x).ok(),
728            None => None,
729        }
730    }
731
732    /// Does the message contain a particular field?
733    pub fn contains_key(&self, key: &str) -> bool {
734        self.map.contains_key(key)
735    }
736
737    /// Returns how many field/value pairs exist in this message.
738    pub fn len(&self) -> usize {
739        self.map.len()
740    }
741
742    /// Returns true if there are no field/value pairs in this message.
743    pub fn is_empty(&self) -> bool {
744        self.len() == 0
745    }
746}
747
748type SACRows = Vec<HashMap<String, HashMap<String, Value>>>;
749
750impl FromRedisValue for StreamAutoClaimReply {
751    fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
752        let Value::Array(mut items) = v else {
753            invalid_type_error!("Not a array response", v);
754        };
755
756        if items.len() > 3 || items.len() < 2 {
757            invalid_type_error!("Incorrect number of items", &items);
758        }
759
760        let deleted_ids = if items.len() == 3 {
761            from_redis_value(items.pop().unwrap())?
762        } else {
763            Vec::new()
764        };
765        // safe, because we've checked for length beforehand
766        let claimed = items.pop().unwrap();
767        let next_stream_id = from_redis_value(items.pop().unwrap())?;
768
769        let Value::Array(arr) = &claimed else {
770            invalid_type_error!("Incorrect type", claimed)
771        };
772        let Some(entry) = arr.iter().find(|val| !matches!(val, Value::Nil)) else {
773            return Ok(Self {
774                next_stream_id,
775                claimed: Vec::new(),
776                deleted_ids,
777                invalid_entries: !arr.is_empty(),
778            });
779        };
780        let (claimed, invalid_entries) = match entry {
781            Value::BulkString(_) => {
782                // JUSTID response
783                let claimed_count = arr.len();
784                let ids: Vec<Option<String>> = from_redis_value(claimed)?;
785
786                let claimed: Vec<_> = ids
787                    .into_iter()
788                    .filter_map(|id| {
789                        id.map(|id| StreamId {
790                            id,
791                            ..Default::default()
792                        })
793                    })
794                    .collect();
795                // This means that some nil entries were filtered
796                let invalid_entries = claimed.len() < claimed_count;
797                (claimed, invalid_entries)
798            }
799            Value::Array(_) => {
800                // full response
801                let claimed_count = arr.len();
802                let rows: SACRows = from_redis_value(claimed)?;
803
804                let claimed: Vec<_> = rows
805                    .into_iter()
806                    .flat_map(|row| {
807                        row.into_iter().map(|(id, map)| StreamId {
808                            id,
809                            map,
810                            milliseconds_elapsed_from_delivery: None,
811                            delivered_count: None,
812                        })
813                    })
814                    .collect();
815                // This means that some nil entries were filtered
816                let invalid_entries = claimed.len() < claimed_count;
817                (claimed, invalid_entries)
818            }
819            _ => invalid_type_error!("Incorrect type", claimed),
820        };
821
822        Ok(Self {
823            next_stream_id,
824            claimed,
825            deleted_ids,
826            invalid_entries,
827        })
828    }
829}
830
831type SRRows = Vec<HashMap<String, Vec<HashMap<String, HashMap<String, Value>>>>>;
832type SRClaimRows =
833    Vec<HashMap<String, Vec<(String, HashMap<String, Value>, Option<usize>, Option<usize>)>>>;
834
835impl FromRedisValue for StreamReadReply {
836    fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
837        // Try to parse as the standard format first
838        if let Ok(rows) = from_redis_value::<SRRows>(v.clone()) {
839            return Ok(Self::from_standard_rows(rows));
840        }
841
842        // If that fails, try to parse as XREADGROUP with CLAIM format
843        // Format: [[stream_name, [[id, [field, value, ...], ms_elapsed, delivery_count], ...]]]
844        if let Ok(rows) = from_redis_value::<SRClaimRows>(v.clone()) {
845            return Ok(Self::from_claim_rows(rows));
846        }
847
848        invalid_type_error!("Could not parse StreamReadReply in any known format", v)
849    }
850}
851
852impl StreamReadReply {
853    fn from_standard_rows(rows: SRRows) -> Self {
854        let keys = rows
855            .into_iter()
856            .flat_map(|row| {
857                row.into_iter().map(|(key, entries)| StreamKey {
858                    key,
859                    ids: entries
860                        .into_iter()
861                        .flat_map(|id_row| {
862                            id_row.into_iter().map(|(id, map)| StreamId {
863                                id,
864                                map,
865                                milliseconds_elapsed_from_delivery: None,
866                                delivered_count: None,
867                            })
868                        })
869                        .collect(),
870                })
871            })
872            .collect();
873        StreamReadReply { keys }
874    }
875
876    fn from_claim_rows(rows: SRClaimRows) -> Self {
877        let keys = rows
878            .into_iter()
879            .flat_map(|row| {
880                row.into_iter().map(|(key, entries)| StreamKey {
881                    key,
882                    ids: entries
883                        .into_iter()
884                        .map(
885                            |(id, map, milliseconds_elapsed_from_delivery, delivered_count)| {
886                                StreamId {
887                                    id,
888                                    map,
889                                    milliseconds_elapsed_from_delivery,
890                                    delivered_count,
891                                }
892                            },
893                        )
894                        .collect(),
895                })
896            })
897            .collect();
898        StreamReadReply { keys }
899    }
900}
901
902impl FromRedisValue for StreamRangeReply {
903    fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
904        let rows: Vec<HashMap<String, HashMap<String, Value>>> = from_redis_value(v)?;
905        let ids: Vec<StreamId> = rows
906            .into_iter()
907            .flat_map(|row| {
908                row.into_iter().map(|(id, map)| StreamId {
909                    id,
910                    map,
911                    milliseconds_elapsed_from_delivery: None,
912                    delivered_count: None,
913                })
914            })
915            .collect();
916        Ok(StreamRangeReply { ids })
917    }
918}
919
920impl FromRedisValue for StreamClaimReply {
921    fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
922        let rows: Vec<HashMap<String, HashMap<String, Value>>> = from_redis_value(v)?;
923        let ids: Vec<StreamId> = rows
924            .into_iter()
925            .flat_map(|row| {
926                row.into_iter().map(|(id, map)| StreamId {
927                    id,
928                    map,
929                    milliseconds_elapsed_from_delivery: None,
930                    delivered_count: None,
931                })
932            })
933            .collect();
934        Ok(StreamClaimReply { ids })
935    }
936}
937
938type SPRInner = (
939    usize,
940    Option<String>,
941    Option<String>,
942    Vec<Option<(String, String)>>,
943);
944impl FromRedisValue for StreamPendingReply {
945    fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
946        let (count, start, end, consumer_data): SPRInner = from_redis_value(v)?;
947
948        if count == 0 {
949            Ok(StreamPendingReply::Empty)
950        } else {
951            let mut result = StreamPendingData::default();
952
953            let start_id = start.ok_or_else(|| {
954                ParsingError::from(arcstr::literal!(
955                    "IllegalState: Non-zero pending expects start id"
956                ))
957            })?;
958
959            let end_id = end.ok_or_else(|| {
960                ParsingError::from(arcstr::literal!(
961                    "IllegalState: Non-zero pending expects end id"
962                ))
963            })?;
964
965            result.count = count;
966            result.start_id = start_id;
967            result.end_id = end_id;
968
969            result.consumers = consumer_data
970                .into_iter()
971                .flatten()
972                .map(|(name, pending)| StreamInfoConsumer {
973                    name,
974                    pending: pending.parse().unwrap_or_default(),
975                    ..Default::default()
976                })
977                .collect();
978
979            Ok(StreamPendingReply::Data(result))
980        }
981    }
982}
983
984impl FromRedisValue for StreamPendingCountReply {
985    fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
986        let mut reply = StreamPendingCountReply::default();
987        match v {
988            Value::Array(outer_tuple) => {
989                for outer in outer_tuple {
990                    match outer {
991                        Value::Array(inner_tuple) => match &inner_tuple[..] {
992                            [Value::BulkString(id_bytes), Value::BulkString(consumer_bytes), Value::Int(last_delivered_ms_u64), Value::Int(times_delivered_u64)] =>
993                            {
994                                let id = String::from_utf8(id_bytes.to_vec())?;
995                                let consumer = String::from_utf8(consumer_bytes.to_vec())?;
996                                let last_delivered_ms = *last_delivered_ms_u64 as usize;
997                                let times_delivered = *times_delivered_u64 as usize;
998                                reply.ids.push(StreamPendingId {
999                                    id,
1000                                    consumer,
1001                                    last_delivered_ms,
1002                                    times_delivered,
1003                                });
1004                            }
1005                            _ => fail!(ParsingError::from(arcstr::literal!(
1006                                "Cannot parse redis data (3)"
1007                            ))),
1008                        },
1009                        _ => fail!(ParsingError::from(arcstr::literal!(
1010                            "Cannot parse redis data (2)"
1011                        ))),
1012                    }
1013                }
1014            }
1015            _ => fail!(ParsingError::from(arcstr::literal!(
1016                "Cannot parse redis data (1)"
1017            ))),
1018        };
1019        Ok(reply)
1020    }
1021}
1022
1023impl FromRedisValue for StreamInfoStreamReply {
1024    fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
1025        let mut map: HashMap<String, Value> = from_redis_value(v)?;
1026        let mut reply = StreamInfoStreamReply::default();
1027        if let Some(v) = map.remove("last-generated-id") {
1028            reply.last_generated_id = from_redis_value(v)?;
1029        }
1030        if let Some(v) = map.remove("radix-tree-nodes") {
1031            reply.radix_tree_keys = from_redis_value(v)?;
1032        }
1033        if let Some(v) = map.remove("groups") {
1034            reply.groups = from_redis_value(v)?;
1035        }
1036        if let Some(v) = map.remove("length") {
1037            reply.length = from_redis_value(v)?;
1038        }
1039        if let Some(v) = map.remove("first-entry") {
1040            reply.first_entry = StreamId::from_array_value(v)?;
1041        }
1042        if let Some(v) = map.remove("last-entry") {
1043            reply.last_entry = StreamId::from_array_value(v)?;
1044        }
1045        Ok(reply)
1046    }
1047}
1048
1049impl FromRedisValue for StreamInfoConsumersReply {
1050    fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
1051        let consumers: Vec<HashMap<String, Value>> = from_redis_value(v)?;
1052        let mut reply = StreamInfoConsumersReply::default();
1053        for mut map in consumers {
1054            let mut c = StreamInfoConsumer::default();
1055            if let Some(v) = map.remove("name") {
1056                c.name = from_redis_value(v)?;
1057            }
1058            if let Some(v) = map.remove("pending") {
1059                c.pending = from_redis_value(v)?;
1060            }
1061            if let Some(v) = map.remove("idle") {
1062                c.idle = from_redis_value(v)?;
1063            }
1064            reply.consumers.push(c);
1065        }
1066
1067        Ok(reply)
1068    }
1069}
1070
1071impl FromRedisValue for StreamInfoGroupsReply {
1072    fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
1073        let groups: Vec<HashMap<String, Value>> = from_redis_value(v)?;
1074        let mut reply = StreamInfoGroupsReply::default();
1075        for mut map in groups {
1076            let mut g = StreamInfoGroup::default();
1077            if let Some(v) = map.remove("name") {
1078                g.name = from_redis_value(v)?;
1079            }
1080            if let Some(v) = map.remove("pending") {
1081                g.pending = from_redis_value(v)?;
1082            }
1083            if let Some(v) = map.remove("consumers") {
1084                g.consumers = from_redis_value(v)?;
1085            }
1086            if let Some(v) = map.remove("last-delivered-id") {
1087                g.last_delivered_id = from_redis_value(v)?;
1088            }
1089            if let Some(v) = map.remove("entries-read") {
1090                g.entries_read = if let Value::Nil = v {
1091                    None
1092                } else {
1093                    Some(from_redis_value(v)?)
1094                };
1095            }
1096            if let Some(v) = map.remove("lag") {
1097                g.lag = if let Value::Nil = v {
1098                    None
1099                } else {
1100                    Some(from_redis_value(v)?)
1101                };
1102            }
1103            reply.groups.push(g);
1104        }
1105        Ok(reply)
1106    }
1107}
1108
1109/// Deletion policy for stream entries.
1110#[derive(Debug, Clone, Default)]
1111#[non_exhaustive]
1112pub enum StreamDeletionPolicy {
1113    /// Preserve existing references to the deleted entries in all consumer groups' PEL.
1114    #[default]
1115    KeepRef,
1116    /// Delete the entry from the stream and from all the consumer groups' PELs.
1117    DelRef,
1118    /// Delete the entry from the stream and from all the consumer groups' PELs, but only if the entry is acknowledged by all the groups.
1119    Acked,
1120}
1121
1122impl ToRedisArgs for StreamDeletionPolicy {
1123    fn write_redis_args<W>(&self, out: &mut W)
1124    where
1125        W: ?Sized + RedisWrite,
1126    {
1127        match self {
1128            StreamDeletionPolicy::KeepRef => out.write_arg(b"KEEPREF"),
1129            StreamDeletionPolicy::DelRef => out.write_arg(b"DELREF"),
1130            StreamDeletionPolicy::Acked => out.write_arg(b"ACKED"),
1131        }
1132    }
1133}
1134impl ToSingleRedisArg for StreamDeletionPolicy {}
1135
1136/// Status codes returned by the `XDELEX` command
1137#[cfg(feature = "streams")]
1138#[cfg_attr(docsrs, doc(cfg(feature = "streams")))]
1139#[derive(Debug, PartialEq, Eq)]
1140#[non_exhaustive]
1141pub enum XDelExStatusCode {
1142    /// No entry with the given id exists in the stream
1143    IdNotFound = -1,
1144    /// The entry was deleted from the stream
1145    Deleted = 1,
1146    /// The entry was not deleted because it has either not been delivered to any consumer
1147    /// or still has references in the consumer groups' Pending Entries List (PEL)
1148    NotDeletedUnacknowledgedOrStillReferenced = 2,
1149}
1150
1151#[cfg(feature = "streams")]
1152impl FromRedisValue for XDelExStatusCode {
1153    fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
1154        match v {
1155            Value::Int(code) => match code {
1156                -1 => Ok(XDelExStatusCode::IdNotFound),
1157                1 => Ok(XDelExStatusCode::Deleted),
1158                2 => Ok(XDelExStatusCode::NotDeletedUnacknowledgedOrStillReferenced),
1159                _ => Err(format!("Invalid XDelExStatusCode status code: {code}").into()),
1160            },
1161            _ => Err(arcstr::literal!("Response type not XAckDelStatusCode compatible").into()),
1162        }
1163    }
1164}
1165
1166/// Status codes returned by the `XACKDEL` command
1167#[cfg(feature = "streams")]
1168#[cfg_attr(docsrs, doc(cfg(feature = "streams")))]
1169#[derive(Debug, PartialEq, Eq)]
1170#[non_exhaustive]
1171pub enum XAckDelStatusCode {
1172    /// No entry with the given id exists in the stream
1173    IdNotFound = -1,
1174    /// The entry was acknowledged and deleted from the stream
1175    AcknowledgedAndDeleted = 1,
1176    /// The entry was acknowledged but not deleted because it has references in the consumer groups' Pending Entries List (PEL)
1177    AcknowledgedNotDeletedStillReferenced = 2,
1178}
1179
1180#[cfg(feature = "streams")]
1181impl FromRedisValue for XAckDelStatusCode {
1182    fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
1183        match v {
1184            Value::Int(code) => match code {
1185                -1 => Ok(XAckDelStatusCode::IdNotFound),
1186                1 => Ok(XAckDelStatusCode::AcknowledgedAndDeleted),
1187                2 => Ok(XAckDelStatusCode::AcknowledgedNotDeletedStillReferenced),
1188                _ => Err(arcstr::literal!("Invalid XAckDelStatusCode status code: {code}").into()),
1189            },
1190            _ => Err(arcstr::literal!("Response type not XAckDelStatusCode compatible").into()),
1191        }
1192    }
1193}
1194
1195#[cfg(test)]
1196mod tests {
1197    use super::*;
1198
1199    fn assert_command_eq(object: impl ToRedisArgs, expected: &[u8]) {
1200        let mut out: Vec<Vec<u8>> = Vec::new();
1201
1202        object.write_redis_args(&mut out);
1203
1204        let mut cmd: Vec<u8> = Vec::new();
1205
1206        out.iter_mut().for_each(|item| {
1207            cmd.append(item);
1208            cmd.push(b' ');
1209        });
1210
1211        cmd.pop();
1212
1213        assert_eq!(cmd, expected);
1214    }
1215
1216    mod stream_auto_claim_reply {
1217        use super::*;
1218        use crate::Value;
1219
1220        #[test]
1221        fn short_response() {
1222            let value = Value::Array(vec![Value::BulkString("1713465536578-0".into())]);
1223
1224            StreamAutoClaimReply::from_redis_value(value).unwrap_err();
1225        }
1226
1227        #[test]
1228        fn parses_none_claimed_response() {
1229            let value = Value::Array(vec![
1230                Value::BulkString("0-0".into()),
1231                Value::Array(vec![]),
1232                Value::Array(vec![]),
1233            ]);
1234
1235            let reply: StreamAutoClaimReply = FromRedisValue::from_redis_value(value).unwrap();
1236
1237            assert_eq!(reply.next_stream_id.as_str(), "0-0");
1238            assert_eq!(reply.claimed.len(), 0);
1239            assert_eq!(reply.deleted_ids.len(), 0);
1240        }
1241
1242        #[test]
1243        fn parses_response() {
1244            let value = Value::Array(vec![
1245                Value::BulkString("1713465536578-0".into()),
1246                Value::Array(vec![
1247                    Value::Array(vec![
1248                        Value::BulkString("1713465533411-0".into()),
1249                        // Both RESP2 and RESP3 expose this map as an array of key/values
1250                        Value::Array(vec![
1251                            Value::BulkString("name".into()),
1252                            Value::BulkString("test".into()),
1253                            Value::BulkString("other".into()),
1254                            Value::BulkString("whaterver".into()),
1255                        ]),
1256                    ]),
1257                    Value::Array(vec![
1258                        Value::BulkString("1713465536069-0".into()),
1259                        Value::Array(vec![
1260                            Value::BulkString("name".into()),
1261                            Value::BulkString("another test".into()),
1262                            Value::BulkString("other".into()),
1263                            Value::BulkString("something".into()),
1264                        ]),
1265                    ]),
1266                ]),
1267                Value::Array(vec![Value::BulkString("123456789-0".into())]),
1268            ]);
1269
1270            let reply: StreamAutoClaimReply = FromRedisValue::from_redis_value(value).unwrap();
1271
1272            assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1273            assert_eq!(reply.claimed.len(), 2);
1274            assert_eq!(reply.claimed[0].id.as_str(), "1713465533411-0");
1275            assert!(
1276                matches!(reply.claimed[0].map.get("name"), Some(Value::BulkString(v)) if v == "test".as_bytes())
1277            );
1278            assert_eq!(reply.claimed[1].id.as_str(), "1713465536069-0");
1279            assert_eq!(reply.deleted_ids.len(), 1);
1280            assert!(reply.deleted_ids.contains(&"123456789-0".to_string()))
1281        }
1282
1283        #[test]
1284        fn parses_v6_response() {
1285            let value = Value::Array(vec![
1286                Value::BulkString("1713465536578-0".into()),
1287                Value::Array(vec![
1288                    Value::Array(vec![
1289                        Value::BulkString("1713465533411-0".into()),
1290                        Value::Array(vec![
1291                            Value::BulkString("name".into()),
1292                            Value::BulkString("test".into()),
1293                            Value::BulkString("other".into()),
1294                            Value::BulkString("whaterver".into()),
1295                        ]),
1296                    ]),
1297                    Value::Array(vec![
1298                        Value::BulkString("1713465536069-0".into()),
1299                        Value::Array(vec![
1300                            Value::BulkString("name".into()),
1301                            Value::BulkString("another test".into()),
1302                            Value::BulkString("other".into()),
1303                            Value::BulkString("something".into()),
1304                        ]),
1305                    ]),
1306                ]),
1307                // V6 and lower lack the deleted_ids array
1308            ]);
1309
1310            let reply: StreamAutoClaimReply = FromRedisValue::from_redis_value(value).unwrap();
1311
1312            assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1313            assert_eq!(reply.claimed.len(), 2);
1314            let ids: Vec<_> = reply.claimed.iter().map(|e| e.id.as_str()).collect();
1315            assert!(ids.contains(&"1713465533411-0"));
1316            assert!(ids.contains(&"1713465536069-0"));
1317            assert_eq!(reply.deleted_ids.len(), 0);
1318        }
1319
1320        #[test]
1321        fn parses_justid_response() {
1322            let value = Value::Array(vec![
1323                Value::BulkString("1713465536578-0".into()),
1324                Value::Array(vec![
1325                    Value::BulkString("1713465533411-0".into()),
1326                    Value::BulkString("1713465536069-0".into()),
1327                ]),
1328                Value::Array(vec![Value::BulkString("123456789-0".into())]),
1329            ]);
1330
1331            let reply: StreamAutoClaimReply = FromRedisValue::from_redis_value(value).unwrap();
1332
1333            assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1334            assert_eq!(reply.claimed.len(), 2);
1335            let ids: Vec<_> = reply.claimed.iter().map(|e| e.id.as_str()).collect();
1336            assert!(ids.contains(&"1713465533411-0"));
1337            assert!(ids.contains(&"1713465536069-0"));
1338            assert_eq!(reply.deleted_ids.len(), 1);
1339            assert!(reply.deleted_ids.contains(&"123456789-0".to_string()))
1340        }
1341
1342        #[test]
1343        fn parses_v6_justid_response() {
1344            let value = Value::Array(vec![
1345                Value::BulkString("1713465536578-0".into()),
1346                Value::Array(vec![
1347                    Value::BulkString("1713465533411-0".into()),
1348                    Value::BulkString("1713465536069-0".into()),
1349                ]),
1350                // V6 and lower lack the deleted_ids array
1351            ]);
1352
1353            let reply: StreamAutoClaimReply = FromRedisValue::from_redis_value(value).unwrap();
1354
1355            assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1356            assert_eq!(reply.claimed.len(), 2);
1357            let ids: Vec<_> = reply.claimed.iter().map(|e| e.id.as_str()).collect();
1358            assert!(ids.contains(&"1713465533411-0"));
1359            assert!(ids.contains(&"1713465536069-0"));
1360            assert_eq!(reply.deleted_ids.len(), 0);
1361        }
1362    }
1363
1364    mod stream_trim_options {
1365        use super::*;
1366
1367        #[test]
1368        fn maxlen_trim() {
1369            let options = StreamTrimOptions::maxlen(StreamTrimmingMode::Approx, 10);
1370
1371            assert_command_eq(options, b"MAXLEN ~ 10");
1372        }
1373
1374        #[test]
1375        fn maxlen_exact_trim() {
1376            let options = StreamTrimOptions::maxlen(StreamTrimmingMode::Exact, 10);
1377
1378            assert_command_eq(options, b"MAXLEN = 10");
1379        }
1380
1381        #[test]
1382        fn maxlen_trim_limit() {
1383            let options = StreamTrimOptions::maxlen(StreamTrimmingMode::Approx, 10).limit(5);
1384
1385            assert_command_eq(options, b"MAXLEN ~ 10 LIMIT 5");
1386        }
1387        #[test]
1388        fn minid_trim_limit() {
1389            let options = StreamTrimOptions::minid(StreamTrimmingMode::Exact, "123456-7").limit(5);
1390
1391            assert_command_eq(options, b"MINID = 123456-7 LIMIT 5");
1392        }
1393    }
1394
1395    mod stream_add_options {
1396        use super::*;
1397
1398        #[test]
1399        fn the_default() {
1400            let options = StreamAddOptions::default();
1401
1402            assert_command_eq(options, b"");
1403        }
1404
1405        #[test]
1406        fn with_maxlen_trim() {
1407            let options = StreamAddOptions::default()
1408                .trim(StreamTrimStrategy::maxlen(StreamTrimmingMode::Exact, 10));
1409
1410            assert_command_eq(options, b"MAXLEN = 10");
1411        }
1412
1413        #[test]
1414        fn with_nomkstream() {
1415            let options = StreamAddOptions::default().nomkstream();
1416
1417            assert_command_eq(options, b"NOMKSTREAM");
1418        }
1419
1420        #[test]
1421        fn with_nomkstream_and_maxlen_trim() {
1422            let options = StreamAddOptions::default()
1423                .nomkstream()
1424                .trim(StreamTrimStrategy::maxlen(StreamTrimmingMode::Exact, 10));
1425
1426            assert_command_eq(options, b"NOMKSTREAM MAXLEN = 10");
1427        }
1428    }
1429}