rustis/commands/stream_commands.rs
1use crate::{
2 client::{prepare_command, PreparedCommand},
3 resp::{
4 cmd, CommandArgs, KeyValueArgsCollection, KeyValueCollectionResponse, PrimitiveResponse,
5 SingleArg, SingleArgCollection, ToArgs,
6 },
7};
8use serde::{de::DeserializeOwned, Deserialize};
9use std::collections::HashMap;
10
11/// A group of Redis commands related to [`Streams`](https://redis.io/docs/data-types/streams/)
12/// # See Also
13/// [Redis Generic Commands](https://redis.io/commands/?group=stream)
14/// [Streams tutorial](https://redis.io/docs/data-types/streams-tutorial/)
15pub trait StreamCommands<'a> {
16 /// The XACK command removes one or multiple messages
17 /// from the Pending Entries List (PEL) of a stream consumer group
18 ///
19 /// # Return
20 /// The command returns the number of messages successfully acknowledged.
21 /// Certain message IDs may no longer be part of the PEL (for example because they have already been acknowledged),
22 /// and XACK will not count them as successfully acknowledged.
23 ///
24 /// # See Also
25 /// [<https://redis.io/commands/xack/>](https://redis.io/commands/xack/)
26 fn xack<K, G, I, II>(self, key: K, group: G, ids: II) -> PreparedCommand<'a, Self, usize>
27 where
28 Self: Sized,
29 K: SingleArg,
30 G: SingleArg,
31 I: SingleArg,
32 II: SingleArgCollection<I>,
33 {
34 prepare_command(self, cmd("XACK").arg(key).arg(group).arg(ids))
35 }
36
37 /// Appends the specified stream entry to the stream at the specified key.
38 ///
39 /// # Return
40 /// the ID of the added entry.
41 ///
42 /// The ID is the one auto-generated if * is passed as ID argument,
43 /// otherwise the command just returns the same ID specified by the user during insertion.
44 ///
45 /// The command returns a Null reply when used with create_stream=false and the key doesn't exist.
46 ///
47 /// # See Also
48 /// [<https://redis.io/commands/xadd/>](https://redis.io/commands/xadd/)
49 fn xadd<K, I, F, V, FFVV, R>(
50 self,
51 key: K,
52 stream_id: I,
53 items: FFVV,
54 options: XAddOptions,
55 ) -> PreparedCommand<'a, Self, R>
56 where
57 Self: Sized,
58 K: SingleArg,
59 I: SingleArg,
60 F: SingleArg,
61 V: SingleArg,
62 FFVV: KeyValueArgsCollection<F, V>,
63 R: PrimitiveResponse,
64 {
65 prepare_command(
66 self,
67 cmd("XADD").arg(key).arg(options).arg(stream_id).arg(items),
68 )
69 }
70
71 /// This command transfers ownership of pending stream entries that match the specified criteria.
72 ///
73 /// # Return
74 /// An instance of StreamAutoClaimResult
75 ///
76 /// # See Also
77 /// [<https://redis.io/commands/xautoclaim/>](https://redis.io/commands/xautoclaim/)
78 fn xautoclaim<K, G, C, I, V>(
79 self,
80 key: K,
81 group: G,
82 consumer: C,
83 min_idle_time: u64,
84 start: I,
85 options: XAutoClaimOptions,
86 ) -> PreparedCommand<'a, Self, XAutoClaimResult<V>>
87 where
88 Self: Sized,
89 K: SingleArg,
90 G: SingleArg,
91 C: SingleArg,
92 I: SingleArg,
93 V: PrimitiveResponse + DeserializeOwned,
94 {
95 prepare_command(
96 self,
97 cmd("XAUTOCLAIM")
98 .arg(key)
99 .arg(group)
100 .arg(consumer)
101 .arg(min_idle_time)
102 .arg(start)
103 .arg(options),
104 )
105 }
106
107 /// In the context of a stream consumer group, this command changes the ownership of a pending message,
108 /// so that the new owner is the consumer specified as the command argument.
109 ///
110 /// # Return
111 /// The ID of the added entry.
112 ///
113 /// The ID is the one auto-generated if * is passed as ID argument,
114 /// otherwise the command just returns the same ID specified by the user during insertion.
115 ///
116 /// The command returns a Null reply when used with create_stream=false and the key doesn't exist.
117 ///
118 /// # See Also
119 /// [<https://redis.io/commands/xclaim/>](https://redis.io/commands/xclaim/)
120 fn xclaim<K, G, C, I, II, V>(
121 self,
122 key: K,
123 group: G,
124 consumer: C,
125 min_idle_time: u64,
126 ids: II,
127 options: XClaimOptions,
128 ) -> PreparedCommand<'a, Self, Vec<StreamEntry<V>>>
129 where
130 Self: Sized,
131 K: SingleArg,
132 G: SingleArg,
133 C: SingleArg,
134 I: SingleArg,
135 II: SingleArgCollection<I>,
136 V: PrimitiveResponse + DeserializeOwned,
137 {
138 prepare_command(
139 self,
140 cmd("XCLAIM")
141 .arg(key)
142 .arg(group)
143 .arg(consumer)
144 .arg(min_idle_time)
145 .arg(ids)
146 .arg(options),
147 )
148 }
149
150 /// Removes the specified entries from a stream, and returns the number of entries deleted.
151 ///
152 /// # Return
153 /// The number of entries actually deleted.
154 ///
155 /// # See Also
156 /// [<https://redis.io/commands/xdel/>](https://redis.io/commands/xdel/)
157 fn xdel<K, I, II>(self, key: K, ids: II) -> PreparedCommand<'a, Self, usize>
158 where
159 Self: Sized,
160 K: SingleArg,
161 I: SingleArg,
162 II: SingleArgCollection<I>,
163 {
164 prepare_command(self, cmd("XDEL").arg(key).arg(ids))
165 }
166
167 /// This command creates a new consumer group uniquely identified by `groupname` for the stream stored at `key`.
168 ///
169 /// # Return
170 /// * `true` success
171 /// * `false`failure
172 ///
173 /// # See Also
174 /// [<https://redis.io/commands/xgroup-create/>](https://redis.io/commands/xgroup-create/)
175 fn xgroup_create<K, G, I>(
176 self,
177 key: K,
178 groupname: G,
179 id: I,
180 options: XGroupCreateOptions,
181 ) -> PreparedCommand<'a, Self, bool>
182 where
183 Self: Sized,
184 K: SingleArg,
185 G: SingleArg,
186 I: SingleArg,
187 {
188 prepare_command(
189 self,
190 cmd("XGROUP")
191 .arg("CREATE")
192 .arg(key)
193 .arg(groupname)
194 .arg(id)
195 .arg(options),
196 )
197 }
198
199 /// Create a consumer named `consumername` in the consumer group `groupname``
200 /// of the stream that's stored at `key.
201 ///
202 /// # Return
203 /// * `true` success
204 /// * `false`failure
205 ///
206 /// # See Also
207 /// [<https://redis.io/commands/xgroup-createconsumer/>](https://redis.io/commands/xgroup-createconsumer/)
208 fn xgroup_createconsumer<K, G, C>(
209 self,
210 key: K,
211 groupname: G,
212 consumername: C,
213 ) -> PreparedCommand<'a, Self, bool>
214 where
215 Self: Sized,
216 K: SingleArg,
217 G: SingleArg,
218 C: SingleArg,
219 {
220 prepare_command(
221 self,
222 cmd("XGROUP")
223 .arg("CREATECONSUMER")
224 .arg(key)
225 .arg(groupname)
226 .arg(consumername),
227 )
228 }
229
230 /// The XGROUP DELCONSUMER command deletes a consumer from the consumer group.
231 ///
232 /// # Return
233 /// The number of pending messages that the consumer had before it was deleted
234 ///
235 /// # See Also
236 /// [<https://redis.io/commands/xgroup-delconsumer/>](https://redis.io/commands/xgroup-delconsumer/)
237 fn xgroup_delconsumer<K, G, C>(
238 self,
239 key: K,
240 groupname: G,
241 consumername: C,
242 ) -> PreparedCommand<'a, Self, usize>
243 where
244 Self: Sized,
245 K: SingleArg,
246 G: SingleArg,
247 C: SingleArg,
248 {
249 prepare_command(
250 self,
251 cmd("XGROUP")
252 .arg("DELCONSUMER")
253 .arg(key)
254 .arg(groupname)
255 .arg(consumername),
256 )
257 }
258
259 /// The XGROUP DESTROY command completely destroys a consumer group.
260 ///
261 /// # Return
262 /// * `true` success
263 /// * `false`failure
264 ///
265 /// # See Also
266 /// [<https://redis.io/commands/xgroup-destroy/>](https://redis.io/commands/xgroup-destroy/)
267 fn xgroup_destroy<K, G>(self, key: K, groupname: G) -> PreparedCommand<'a, Self, bool>
268 where
269 Self: Sized,
270 K: SingleArg,
271 G: SingleArg,
272 {
273 prepare_command(self, cmd("XGROUP").arg("DESTROY").arg(key).arg(groupname))
274 }
275
276 /// Set the last delivered ID for a consumer group.
277 ///
278 /// # See Also
279 /// [<https://redis.io/commands/xgroup-setid/>](https://redis.io/commands/xgroup-setid/)
280 fn xgroup_setid<K, G, I>(
281 self,
282 key: K,
283 groupname: G,
284 id: I,
285 entries_read: Option<usize>,
286 ) -> PreparedCommand<'a, Self, ()>
287 where
288 Self: Sized,
289 K: SingleArg,
290 G: SingleArg,
291 I: SingleArg,
292 {
293 prepare_command(
294 self,
295 cmd("XGROUP")
296 .arg("SETID")
297 .arg(key)
298 .arg(groupname)
299 .arg(id)
300 .arg(entries_read.map(|e| ("ENTRIESREAD", e))),
301 )
302 }
303
304 /// This command returns the list of consumers that belong to the `groupname` consumer group of the stream stored at `key`.
305 ///
306 /// # Return
307 /// A collection of XConsumerInfo.
308 ///
309 /// # See Also
310 /// [<https://redis.io/commands/xinfo-consumers/>](https://redis.io/commands/xinfo-consumers/)
311 fn xinfo_consumers<K, G>(
312 self,
313 key: K,
314 groupname: G,
315 ) -> PreparedCommand<'a, Self, Vec<XConsumerInfo>>
316 where
317 Self: Sized,
318 K: SingleArg,
319 G: SingleArg,
320 {
321 prepare_command(self, cmd("XINFO").arg("CONSUMERS").arg(key).arg(groupname))
322 }
323
324 /// This command returns the list of consumers that belong
325 /// to the `groupname` consumer group of the stream stored at `key`.
326 ///
327 /// # Return
328 /// A collection of XGroupInfo.
329 ///
330 /// # See Also
331 /// [<https://redis.io/commands/xinfo-groups/>](https://redis.io/commands/xinfo-groups/)
332 fn xinfo_groups<K>(self, key: K) -> PreparedCommand<'a, Self, Vec<XGroupInfo>>
333 where
334 Self: Sized,
335 K: SingleArg,
336 {
337 prepare_command(self, cmd("XINFO").arg("GROUPS").arg(key))
338 }
339
340 /// This command returns information about the stream stored at `key`.
341 ///
342 /// # Return
343 /// A collection of XGroupInfo.
344 ///
345 /// # See Also
346 /// [<https://redis.io/commands/xinfo-stream/>](https://redis.io/commands/xinfo-stream/)
347 fn xinfo_stream<K>(
348 self,
349 key: K,
350 options: XInfoStreamOptions,
351 ) -> PreparedCommand<'a, Self, XStreamInfo>
352 where
353 Self: Sized,
354 K: SingleArg,
355 {
356 prepare_command(self, cmd("XINFO").arg("STREAM").arg(key).arg(options))
357 }
358
359 /// Returns the number of entries inside a stream.
360 ///
361 /// # Return
362 /// The number of entries of the stream at `key`.
363 ///
364 /// # See Also
365 /// [<https://redis.io/commands/xrange/>](https://redis.io/commands/xrange/)
366 fn xlen<K>(self, key: K) -> PreparedCommand<'a, Self, usize>
367 where
368 Self: Sized,
369 K: SingleArg,
370 {
371 prepare_command(self, cmd("XLEN").arg(key))
372 }
373
374 /// The XPENDING command is the interface to inspect the list of pending messages.
375 ///
376 /// # See Also
377 /// [<https://redis.io/commands/xpending/>](https://redis.io/commands/xpending/)
378 fn xpending<K, G>(self, key: K, group: G) -> PreparedCommand<'a, Self, XPendingResult>
379 where
380 Self: Sized,
381 K: SingleArg,
382 G: SingleArg,
383 {
384 prepare_command(self, cmd("XPENDING").arg(key).arg(group))
385 }
386
387 /// The XPENDING command is the interface to inspect the list of pending messages.
388 ///
389 /// # See Also
390 /// [<https://redis.io/commands/xpending/>](https://redis.io/commands/xpending/)
391 fn xpending_with_options<K, G>(
392 self,
393 key: K,
394 group: G,
395 options: XPendingOptions,
396 ) -> PreparedCommand<'a, Self, Vec<XPendingMessageResult>>
397 where
398 Self: Sized,
399 K: SingleArg,
400 G: SingleArg,
401 {
402 prepare_command(self, cmd("XPENDING").arg(key).arg(group).arg(options))
403 }
404
405 /// The command returns the stream entries matching a given range of IDs.
406 ///
407 /// # Return
408 /// A collection of StreamEntry
409 ///
410 /// The command returns the entries with IDs matching the specified range.
411 /// The returned entries are complete, that means that the ID and all the fields they are composed are returned.
412 /// Moreover, the entries are returned with their fields and values in the exact same order as XADD added them.
413 ///
414 /// # See Also
415 /// [<https://redis.io/commands/xrange/>](https://redis.io/commands/xrange/)
416 fn xrange<K, S, E, V>(
417 self,
418 key: K,
419 start: S,
420 end: E,
421 count: Option<usize>,
422 ) -> PreparedCommand<'a, Self, Vec<StreamEntry<V>>>
423 where
424 Self: Sized,
425 K: SingleArg,
426 S: SingleArg,
427 E: SingleArg,
428 V: PrimitiveResponse + DeserializeOwned,
429 {
430 prepare_command(
431 self,
432 cmd("XRANGE")
433 .arg(key)
434 .arg(start)
435 .arg(end)
436 .arg(count.map(|c| ("COUNT", c))),
437 )
438 }
439
440 /// Read data from one or multiple streams,
441 /// only returning entries with an ID greater than the last received ID reported by the caller.
442 ///
443 /// # Return
444 /// A collection of XReadStreamResult
445 ///
446 /// # See Also
447 /// [<https://redis.io/commands/xread/>](https://redis.io/commands/xread/)
448 fn xread<K, KK, I, II, V, R>(
449 self,
450 options: XReadOptions,
451 keys: KK,
452 ids: II,
453 ) -> PreparedCommand<'a, Self, R>
454 where
455 Self: Sized,
456 K: SingleArg,
457 KK: SingleArgCollection<K>,
458 I: SingleArg,
459 II: SingleArgCollection<I>,
460 V: PrimitiveResponse + DeserializeOwned,
461 R: KeyValueCollectionResponse<String, Vec<StreamEntry<V>>>,
462 {
463 prepare_command(
464 self,
465 cmd("XREAD").arg(options).arg("STREAMS").arg(keys).arg(ids),
466 )
467 }
468
469 /// The XREADGROUP command is a special version of the [`xread`](StreamCommands::xread)
470 /// command with support for consumer groups.
471 ///
472 /// # Return
473 /// A collection of XReadStreamResult
474 ///
475 /// # See Also
476 /// [<https://redis.io/commands/xreadgroup/>](https://redis.io/commands/xreadgroup/)
477 fn xreadgroup<G, C, K, KK, I, II, V, R>(
478 self,
479 group: G,
480 consumer: C,
481 options: XReadGroupOptions,
482 keys: KK,
483 ids: II,
484 ) -> PreparedCommand<'a, Self, R>
485 where
486 Self: Sized,
487 G: SingleArg,
488 C: SingleArg,
489 K: SingleArg,
490 KK: SingleArgCollection<K>,
491 I: SingleArg,
492 II: SingleArgCollection<I>,
493 V: PrimitiveResponse + DeserializeOwned,
494 R: KeyValueCollectionResponse<String, Vec<StreamEntry<V>>>,
495 {
496 prepare_command(
497 self,
498 cmd("XREADGROUP")
499 .arg("GROUP")
500 .arg(group)
501 .arg(consumer)
502 .arg(options)
503 .arg("STREAMS")
504 .arg(keys)
505 .arg(ids),
506 )
507 }
508
509 /// This command is exactly like [`xrange`](StreamCommands::xrange),
510 /// but with the notable difference of returning the entries in reverse order,
511 /// and also taking the start-end range in reverse order
512 ///
513 /// # Return
514 /// A collection of StreamEntry
515 ///
516 /// # See Also
517 /// [<https://redis.io/commands/xrevrange/>](https://redis.io/commands/xrevrange/)
518 fn xrevrange<K, E, S, V>(
519 self,
520 key: K,
521 end: E,
522 start: S,
523 count: Option<usize>,
524 ) -> PreparedCommand<'a, Self, Vec<StreamEntry<V>>>
525 where
526 Self: Sized,
527 K: SingleArg,
528 E: SingleArg,
529 S: SingleArg,
530 V: PrimitiveResponse + DeserializeOwned,
531 {
532 prepare_command(
533 self,
534 cmd("XREVRANGE")
535 .arg(key)
536 .arg(end)
537 .arg(start)
538 .arg(count.map(|c| ("COUNT", c))),
539 )
540 }
541
542 /// XTRIM trims the stream by evicting older entries (entries with lower IDs) if needed.
543 ///
544 /// # Return
545 /// The number of entries deleted from the stream.
546 ///
547 /// # See Also
548 /// [<https://redis.io/commands/xtrim/>](https://redis.io/commands/xtrim/)
549 fn xtrim<K>(self, key: K, options: XTrimOptions) -> PreparedCommand<'a, Self, usize>
550 where
551 Self: Sized,
552 K: SingleArg,
553 {
554 prepare_command(self, cmd("XTRIM").arg(key).arg(options))
555 }
556}
557
558/// Stream Add options for the [`xadd`](StreamCommands::xadd) command.
559#[derive(Default)]
560pub struct XAddOptions {
561 command_args: CommandArgs,
562}
563
564impl XAddOptions {
565 #[must_use]
566 pub fn no_mk_stream(mut self) -> Self {
567 Self {
568 command_args: self.command_args.arg("NOMKSTREAM").build(),
569 }
570 }
571
572 #[must_use]
573 pub fn trim_options(mut self, trim_options: XTrimOptions) -> Self {
574 Self {
575 command_args: self.command_args.arg(trim_options).build(),
576 }
577 }
578}
579
580impl ToArgs for XAddOptions {
581 fn write_args(&self, args: &mut CommandArgs) {
582 args.arg(&self.command_args);
583 }
584}
585
586/// Stream Trim operator for the [`xadd`](StreamCommands::xadd)
587/// and [`xtrim`](StreamCommands::xtrim) commands
588#[derive(Default)]
589pub enum XTrimOperator {
590 #[default]
591 None,
592 /// =
593 Equal,
594 /// ~
595 Approximately,
596}
597
598impl ToArgs for XTrimOperator {
599 fn write_args(&self, args: &mut CommandArgs) {
600 match self {
601 XTrimOperator::None => {}
602 XTrimOperator::Equal => {
603 args.arg("=");
604 }
605 XTrimOperator::Approximately => {
606 args.arg("~");
607 }
608 }
609 }
610}
611
612/// Stream Trim options for the [`xadd`](StreamCommands::xadd)
613/// and [`xtrim`](StreamCommands::xtrim) commands
614#[derive(Default)]
615pub struct XTrimOptions {
616 command_args: CommandArgs,
617}
618
619impl XTrimOptions {
620 #[must_use]
621 pub fn max_len(operator: XTrimOperator, threshold: i64) -> Self {
622 Self {
623 command_args: CommandArgs::default()
624 .arg("MAXLEN")
625 .arg(operator)
626 .arg(threshold)
627 .build(),
628 }
629 }
630
631 #[must_use]
632 pub fn min_id<I: SingleArg>(operator: XTrimOperator, threshold_id: I) -> Self {
633 Self {
634 command_args: CommandArgs::default()
635 .arg("MINID")
636 .arg(operator)
637 .arg(threshold_id)
638 .build(),
639 }
640 }
641
642 #[must_use]
643 pub fn limit(mut self, count: usize) -> Self {
644 Self {
645 command_args: self.command_args.arg("LIMIT").arg(count).build(),
646 }
647 }
648}
649
650impl ToArgs for XTrimOptions {
651 fn write_args(&self, args: &mut CommandArgs) {
652 args.arg(&self.command_args);
653 }
654}
655
656/// Options for the [`xautoclaim`](StreamCommands::xautoclaim) command
657#[derive(Default)]
658pub struct XAutoClaimOptions {
659 command_args: CommandArgs,
660}
661
662impl XAutoClaimOptions {
663 #[must_use]
664 pub fn count(mut self, count: usize) -> Self {
665 Self {
666 command_args: self.command_args.arg("COUNT").arg(count).build(),
667 }
668 }
669
670 #[must_use]
671 pub fn just_id(mut self) -> Self {
672 Self {
673 command_args: self.command_args.arg("JUSTID").build(),
674 }
675 }
676}
677
678impl ToArgs for XAutoClaimOptions {
679 fn write_args(&self, args: &mut CommandArgs) {
680 args.arg(&self.command_args);
681 }
682}
683
684/// Result for the [`xrange`](StreamCommands::xrange) and other associated commands.
685#[derive(Deserialize)]
686pub struct StreamEntry<V>
687where
688 V: PrimitiveResponse,
689{
690 /// The stream Id
691 pub stream_id: String,
692 /// entries with their fields and values in the exact same
693 /// order as [`xadd`](StreamCommands::xadd) added them.
694 pub items: HashMap<String, V>,
695}
696
697/// Result for the [`xautoclaim`](StreamCommands::xautoclaim) command.
698#[derive(Deserialize)]
699pub struct XAutoClaimResult<V>
700where
701 V: PrimitiveResponse,
702{
703 /// A stream ID to be used as the `start` argument for
704 /// the next call to [`xautoclaim`](StreamCommands::xautoclaim).
705 pub start_stream_id: String,
706 /// An array containing all the successfully claimed messages in
707 /// the same format as [`xrange`](StreamCommands::xrange).
708 pub entries: Vec<StreamEntry<V>>,
709 /// An array containing message IDs that no longer exist in the stream,
710 /// and were deleted from the PEL in which they were found.
711 pub deleted_ids: Vec<String>,
712}
713
714/// Options for the [`xclaim`](StreamCommands::xclaim) command
715#[derive(Default)]
716pub struct XClaimOptions {
717 command_args: CommandArgs,
718}
719
720impl XClaimOptions {
721 /// Set the idle time (last time it was delivered) of the message.
722 #[must_use]
723 pub fn idle_time(mut self, idle_time_millis: u64) -> Self {
724 Self {
725 command_args: self.command_args.arg("IDLE").arg(idle_time_millis).build(),
726 }
727 }
728
729 /// This is the same as `idle_time` but instead of a relative amount of milliseconds,
730 /// it sets the idle time to a specific Unix time (in milliseconds).
731 #[must_use]
732 pub fn time(mut self, unix_time_milliseconds: u64) -> Self {
733 Self {
734 command_args: self
735 .command_args
736 .arg("TIME")
737 .arg(unix_time_milliseconds)
738 .build(),
739 }
740 }
741
742 /// Set the retry counter to the specified value.
743 #[must_use]
744 pub fn retry_count(mut self, count: usize) -> Self {
745 Self {
746 command_args: self.command_args.arg("RETRYCOUNT").arg(count).build(),
747 }
748 }
749
750 /// Creates the pending message entry in the PEL
751 /// even if certain specified IDs are not already
752 /// in the PEL assigned to a different client.
753 #[must_use]
754 pub fn force(mut self) -> Self {
755 Self {
756 command_args: self.command_args.arg("FORCE").build(),
757 }
758 }
759
760 /// Return just an array of IDs of messages successfully claimed,
761 /// without returning the actual message.
762 #[must_use]
763 pub fn just_id(mut self) -> Self {
764 Self {
765 command_args: self.command_args.arg("JUSTID").build(),
766 }
767 }
768}
769
770impl ToArgs for XClaimOptions {
771 fn write_args(&self, args: &mut CommandArgs) {
772 args.arg(&self.command_args);
773 }
774}
775
776/// Options for the [`xgroup_create`](StreamCommands::xgroup_create) command
777#[derive(Default)]
778pub struct XGroupCreateOptions {
779 command_args: CommandArgs,
780}
781
782impl XGroupCreateOptions {
783 /// By default, the XGROUP CREATE command insists that the target stream exists and returns an error when it doesn't.
784 /// However, you can use the optional MKSTREAM subcommand as the last argument after the `id`
785 /// to automatically create the stream (with length of 0) if it doesn't exist
786 #[must_use]
787 pub fn mk_stream(mut self) -> Self {
788 Self {
789 command_args: self.command_args.arg("MKSTREAM").build(),
790 }
791 }
792
793 /// The optional entries_read named argument can be specified to enable consumer group lag tracking for an arbitrary ID.
794 /// An arbitrary ID is any ID that isn't the ID of the stream's first entry, its last entry or the zero ("0-0") ID.
795 /// This can be useful you know exactly how many entries are between the arbitrary ID (excluding it) and the stream's last entry.
796 /// In such cases, the entries_read can be set to the stream's entries_added subtracted with the number of entries.
797 #[must_use]
798 pub fn entries_read(mut self, entries_read: usize) -> Self {
799 Self {
800 command_args: self
801 .command_args
802 .arg("ENTRIESREAD")
803 .arg(entries_read)
804 .build(),
805 }
806 }
807}
808
809impl ToArgs for XGroupCreateOptions {
810 fn write_args(&self, args: &mut CommandArgs) {
811 args.arg(&self.command_args);
812 }
813}
814
815/// Result entry for the [`xinfo_consumers`](StreamCommands::xinfo_consumers) command.
816#[derive(Deserialize)]
817pub struct XConsumerInfo {
818 /// the consumer's name
819 pub name: String,
820
821 /// the number of pending messages for the client,
822 /// which are messages that were delivered but are yet to be acknowledged
823 pub pending: usize,
824
825 /// the number of milliseconds that have passed
826 /// since the consumer last interacted with the server
827 #[serde(rename = "idle")]
828 pub idle_millis: u64,
829}
830
831/// Result entry for the [`xinfo_groups`](StreamCommands::xinfo_groups) command.
832#[derive(Deserialize)]
833#[serde(rename_all = "kebab-case")]
834pub struct XGroupInfo {
835 /// the consumer group's name
836 pub name: String,
837
838 /// the number of consumers in the group
839 pub consumers: usize,
840
841 /// the length of the group's pending entries list (PEL),
842 /// which are messages that were delivered but are yet to be acknowledged
843 pub pending: usize,
844
845 /// the ID of the last entry delivered the group's consumers
846 pub last_delivered_id: String,
847
848 /// the logical "read counter" of the last entry delivered to group's consumers
849 pub entries_read: Option<usize>,
850
851 /// the number of entries in the stream that are still waiting to be delivered to the group's consumers,
852 /// or a NULL when that number can't be determined.
853 pub lag: Option<usize>,
854}
855
856/// Options for the [`xinfo_stream`](StreamCommands::xinfo_stream) command
857#[derive(Default)]
858pub struct XInfoStreamOptions {
859 command_args: CommandArgs,
860}
861
862impl XInfoStreamOptions {
863 /// The optional FULL modifier provides a more verbose reply.
864 #[must_use]
865 pub fn full(mut self) -> Self {
866 Self {
867 command_args: self.command_args.arg("FULL").build(),
868 }
869 }
870
871 /// The COUNT option can be used to limit the number of stream and PEL entries that are returned
872 /// (The first `count` entries are returned).
873 #[must_use]
874 pub fn count(mut self, count: usize) -> Self {
875 Self {
876 command_args: self.command_args.arg("COUNT").arg(count).build(),
877 }
878 }
879}
880
881impl ToArgs for XInfoStreamOptions {
882 fn write_args(&self, args: &mut CommandArgs) {
883 args.arg(&self.command_args);
884 }
885}
886
887/// Stream info returned by the [`xinfo_stream`](StreamCommands::xinfo_stream) command.
888#[derive(Deserialize)]
889#[serde(rename_all = "kebab-case")]
890pub struct XStreamInfo {
891 /// the number of entries in the stream (see [`xlen`](StreamCommands::xlen))
892 pub length: usize,
893
894 /// the number of keys in the underlying radix data structure
895 pub radix_tree_keys: usize,
896
897 /// the number of nodes in the underlying radix data structure
898 pub radix_tree_nodes: usize,
899
900 /// the number of consumer groups defined for the stream
901 pub groups: usize,
902
903 /// the ID of the least-recently entry that was added to the stream
904 pub last_generated_id: String,
905
906 /// the maximal entry ID that was deleted from the stream
907 pub max_deleted_entry_id: String,
908
909 /// the count of all entries added to the stream during its lifetime
910 pub entries_added: usize,
911
912 /// the ID and field-value tuples of the first entry in the stream
913 pub first_entry: StreamEntry<String>,
914
915 /// the ID and field-value tuples of the last entry in the stream
916 pub last_entry: StreamEntry<String>,
917
918 pub recorded_first_entry_id: String,
919}
920
921/// Options for the [`xread`](StreamCommands::xread) command
922#[derive(Default)]
923pub struct XReadOptions {
924 command_args: CommandArgs,
925}
926
927impl XReadOptions {
928 #[must_use]
929 pub fn count(mut self, count: usize) -> Self {
930 Self {
931 command_args: self.command_args.arg("COUNT").arg(count).build(),
932 }
933 }
934
935 #[must_use]
936 pub fn block(mut self, milliseconds: u64) -> Self {
937 Self {
938 command_args: self.command_args.arg("BLOCK").arg(milliseconds).build(),
939 }
940 }
941}
942
943impl ToArgs for XReadOptions {
944 fn write_args(&self, args: &mut CommandArgs) {
945 args.arg(&self.command_args);
946 }
947}
948
949/// Options for the [`xreadgroup`](StreamCommands::xreadgroup) command
950#[derive(Default)]
951pub struct XReadGroupOptions {
952 command_args: CommandArgs,
953}
954
955impl XReadGroupOptions {
956 #[must_use]
957 pub fn count(mut self, count: usize) -> Self {
958 Self {
959 command_args: self.command_args.arg("COUNT").arg(count).build(),
960 }
961 }
962
963 #[must_use]
964 pub fn block(mut self, milliseconds: u64) -> Self {
965 Self {
966 command_args: self.command_args.arg("BLOCK").arg(milliseconds).build(),
967 }
968 }
969
970 #[must_use]
971 pub fn no_ack(mut self) -> Self {
972 Self {
973 command_args: self.command_args.arg("NOACK").build(),
974 }
975 }
976}
977
978impl ToArgs for XReadGroupOptions {
979 fn write_args(&self, args: &mut CommandArgs) {
980 args.arg(&self.command_args);
981 }
982}
983
984/// Options for the [`xpending_with_options`](StreamCommands::xpending_with_options) command
985#[derive(Default)]
986pub struct XPendingOptions {
987 command_args: CommandArgs,
988}
989
990impl XPendingOptions {
991 #[must_use]
992 pub fn idle(mut self, min_idle_time: u64) -> Self {
993 Self {
994 command_args: self.command_args.arg("IDLE").arg(min_idle_time).build(),
995 }
996 }
997
998 #[must_use]
999 pub fn start<S: SingleArg>(mut self, start: S) -> Self {
1000 Self {
1001 command_args: self.command_args.arg(start).build(),
1002 }
1003 }
1004
1005 #[must_use]
1006 pub fn end<E: SingleArg>(mut self, end: E) -> Self {
1007 Self {
1008 command_args: self.command_args.arg(end).build(),
1009 }
1010 }
1011
1012 #[must_use]
1013 pub fn count(mut self, count: usize) -> Self {
1014 Self {
1015 command_args: self.command_args.arg(count).build(),
1016 }
1017 }
1018
1019 #[must_use]
1020 pub fn consumer<C: SingleArg>(mut self, consumer: C) -> Self {
1021 Self {
1022 command_args: self.command_args.arg(consumer).build(),
1023 }
1024 }
1025}
1026
1027impl ToArgs for XPendingOptions {
1028 fn write_args(&self, args: &mut CommandArgs) {
1029 args.arg(&self.command_args);
1030 }
1031}
1032
1033/// Result for the [`xpending`](StreamCommands::xpending) command
1034#[derive(Deserialize)]
1035pub struct XPendingResult {
1036 pub num_pending_messages: usize,
1037 pub smallest_id: String,
1038 pub greatest_id: String,
1039 pub consumers: Vec<XPendingConsumer>,
1040}
1041
1042/// Customer info result for the [`xpending`](StreamCommands::xpending) command
1043#[derive(Deserialize)]
1044pub struct XPendingConsumer {
1045 pub consumer: String,
1046 pub num_messages: usize,
1047}
1048
1049/// Message result for the [`xpending_with_options`](StreamCommands::xpending_with_options) command
1050#[derive(Deserialize)]
1051pub struct XPendingMessageResult {
1052 pub message_id: String,
1053 pub consumer: String,
1054 pub elapsed_millis: u64,
1055 pub times_delivered: usize,
1056}