rustis/commands/stream_commands.rs
1use crate::{
2 client::{prepare_command, PreparedCommand},
3 resp::{
4 cmd, CommandArgs, KeyValueArgsCollection, KeyValueCollectionResponse, PrimitiveResponse,
5 SingleArg, SingleArgCollection, ToArgs,
6 },
7};
8use serde::{de::DeserializeOwned, Deserialize};
9use std::collections::HashMap;
10
11/// A group of Redis commands related to [`Streams`](https://redis.io/docs/data-types/streams/)
12/// # See Also
13/// [Redis Generic Commands](https://redis.io/commands/?group=stream)
14/// [Streams tutorial](https://redis.io/docs/data-types/streams-tutorial/)
15pub trait StreamCommands<'a> {
16 /// The XACK command removes one or multiple messages
17 /// from the Pending Entries List (PEL) of a stream consumer group
18 ///
19 /// # Return
20 /// The command returns the number of messages successfully acknowledged.
21 /// Certain message IDs may no longer be part of the PEL (for example because they have already been acknowledged),
22 /// and XACK will not count them as successfully acknowledged.
23 ///
24 /// # See Also
25 /// [<https://redis.io/commands/xack/>](https://redis.io/commands/xack/)
26 fn xack<K, G, I, II>(self, key: K, group: G, ids: II) -> PreparedCommand<'a, Self, usize>
27 where
28 Self: Sized,
29 K: SingleArg,
30 G: SingleArg,
31 I: SingleArg,
32 II: SingleArgCollection<I>,
33 {
34 prepare_command(self, cmd("XACK").arg(key).arg(group).arg(ids))
35 }
36
37 /// Appends the specified stream entry to the stream at the specified key.
38 ///
39 /// # Return
40 /// the ID of the added entry.
41 ///
42 /// The ID is the one auto-generated if * is passed as ID argument,
43 /// otherwise the command just returns the same ID specified by the user during insertion.
44 ///
45 /// The command returns a Null reply when used with create_stream=false and the key doesn't exist.
46 ///
47 /// # See Also
48 /// [<https://redis.io/commands/xadd/>](https://redis.io/commands/xadd/)
49 fn xadd<K, I, F, V, FFVV, R>(
50 self,
51 key: K,
52 stream_id: I,
53 items: FFVV,
54 options: XAddOptions,
55 ) -> PreparedCommand<'a, Self, R>
56 where
57 Self: Sized,
58 K: SingleArg,
59 I: SingleArg,
60 F: SingleArg,
61 V: SingleArg,
62 FFVV: KeyValueArgsCollection<F, V>,
63 R: PrimitiveResponse,
64 {
65 prepare_command(
66 self,
67 cmd("XADD").arg(key).arg(options).arg(stream_id).arg(items),
68 )
69 }
70
71 /// This command transfers ownership of pending stream entries that match the specified criteria.
72 ///
73 /// # Return
74 /// An instance of StreamAutoClaimResult
75 ///
76 /// # See Also
77 /// [<https://redis.io/commands/xautoclaim/>](https://redis.io/commands/xautoclaim/)
78 fn xautoclaim<K, G, C, I, V>(
79 self,
80 key: K,
81 group: G,
82 consumer: C,
83 min_idle_time: u64,
84 start: I,
85 options: XAutoClaimOptions,
86 ) -> PreparedCommand<'a, Self, XAutoClaimResult<V>>
87 where
88 Self: Sized,
89 K: SingleArg,
90 G: SingleArg,
91 C: SingleArg,
92 I: SingleArg,
93 V: PrimitiveResponse + DeserializeOwned,
94 {
95 prepare_command(
96 self,
97 cmd("XAUTOCLAIM")
98 .arg(key)
99 .arg(group)
100 .arg(consumer)
101 .arg(min_idle_time)
102 .arg(start)
103 .arg(options),
104 )
105 }
106
107 /// In the context of a stream consumer group, this command changes the ownership of a pending message,
108 /// so that the new owner is the consumer specified as the command argument.
109 ///
110 /// # Return
111 /// The ID of the added entry.
112 ///
113 /// The ID is the one auto-generated if * is passed as ID argument,
114 /// otherwise the command just returns the same ID specified by the user during insertion.
115 ///
116 /// The command returns a Null reply when used with create_stream=false and the key doesn't exist.
117 ///
118 /// # See Also
119 /// [<https://redis.io/commands/xclaim/>](https://redis.io/commands/xclaim/)
120 fn xclaim<K, G, C, I, II, V>(
121 self,
122 key: K,
123 group: G,
124 consumer: C,
125 min_idle_time: u64,
126 ids: II,
127 options: XClaimOptions,
128 ) -> PreparedCommand<'a, Self, Vec<StreamEntry<V>>>
129 where
130 Self: Sized,
131 K: SingleArg,
132 G: SingleArg,
133 C: SingleArg,
134 I: SingleArg,
135 II: SingleArgCollection<I>,
136 V: PrimitiveResponse + DeserializeOwned,
137 {
138 prepare_command(
139 self,
140 cmd("XCLAIM")
141 .arg(key)
142 .arg(group)
143 .arg(consumer)
144 .arg(min_idle_time)
145 .arg(ids)
146 .arg(options),
147 )
148 }
149
150 /// Removes the specified entries from a stream, and returns the number of entries deleted.
151 ///
152 /// # Return
153 /// The number of entries actually deleted.
154 ///
155 /// # See Also
156 /// [<https://redis.io/commands/xdel/>](https://redis.io/commands/xdel/)
157 fn xdel<K, I, II>(self, key: K, ids: II) -> PreparedCommand<'a, Self, usize>
158 where
159 Self: Sized,
160 K: SingleArg,
161 I: SingleArg,
162 II: SingleArgCollection<I>,
163 {
164 prepare_command(self, cmd("XDEL").arg(key).arg(ids))
165 }
166
167 /// This command creates a new consumer group uniquely identified by `groupname` for the stream stored at `key`.
168 ///
169 /// # Return
170 /// * `true` success
171 /// * `false`failure
172 ///
173 /// # See Also
174 /// [<https://redis.io/commands/xgroup-create/>](https://redis.io/commands/xgroup-create/)
175 fn xgroup_create<K, G, I>(
176 self,
177 key: K,
178 groupname: G,
179 id: I,
180 options: XGroupCreateOptions,
181 ) -> PreparedCommand<'a, Self, bool>
182 where
183 Self: Sized,
184 K: SingleArg,
185 G: SingleArg,
186 I: SingleArg,
187 {
188 prepare_command(
189 self,
190 cmd("XGROUP")
191 .arg("CREATE")
192 .arg(key)
193 .arg(groupname)
194 .arg(id)
195 .arg(options),
196 )
197 }
198
199 /// Create a consumer named `consumername` in the consumer group `groupname``
200 /// of the stream that's stored at `key.
201 ///
202 /// # Return
203 /// * `true` success
204 /// * `false`failure
205 ///
206 /// # See Also
207 /// [<https://redis.io/commands/xgroup-createconsumer/>](https://redis.io/commands/xgroup-createconsumer/)
208 fn xgroup_createconsumer<K, G, C>(
209 self,
210 key: K,
211 groupname: G,
212 consumername: C,
213 ) -> PreparedCommand<'a, Self, bool>
214 where
215 Self: Sized,
216 K: SingleArg,
217 G: SingleArg,
218 C: SingleArg,
219 {
220 prepare_command(
221 self,
222 cmd("XGROUP")
223 .arg("CREATECONSUMER")
224 .arg(key)
225 .arg(groupname)
226 .arg(consumername),
227 )
228 }
229
230 /// The XGROUP DELCONSUMER command deletes a consumer from the consumer group.
231 ///
232 /// # Return
233 /// The number of pending messages that the consumer had before it was deleted
234 ///
235 /// # See Also
236 /// [<https://redis.io/commands/xgroup-delconsumer/>](https://redis.io/commands/xgroup-delconsumer/)
237 fn xgroup_delconsumer<K, G, C>(
238 self,
239 key: K,
240 groupname: G,
241 consumername: C,
242 ) -> PreparedCommand<'a, Self, usize>
243 where
244 Self: Sized,
245 K: SingleArg,
246 G: SingleArg,
247 C: SingleArg,
248 {
249 prepare_command(
250 self,
251 cmd("XGROUP")
252 .arg("DELCONSUMER")
253 .arg(key)
254 .arg(groupname)
255 .arg(consumername),
256 )
257 }
258
259 /// The XGROUP DESTROY command completely destroys a consumer group.
260 ///
261 /// # Return
262 /// * `true` success
263 /// * `false`failure
264 ///
265 /// # See Also
266 /// [<https://redis.io/commands/xgroup-destroy/>](https://redis.io/commands/xgroup-destroy/)
267 fn xgroup_destroy<K, G>(self, key: K, groupname: G) -> PreparedCommand<'a, Self, bool>
268 where
269 Self: Sized,
270 K: SingleArg,
271 G: SingleArg,
272 {
273 prepare_command(self, cmd("XGROUP").arg("DESTROY").arg(key).arg(groupname))
274 }
275
276 /// The command returns a helpful text describing the different XGROUP subcommands.
277 ///
278 /// # Return
279 /// An array of strings.
280 ///
281 /// # Example
282 /// ```
283 /// # use rustis::{
284 /// # client::Client,
285 /// # commands::StreamCommands,
286 /// # Result,
287 /// # };
288 /// #
289 /// # #[cfg_attr(feature = "tokio-runtime", tokio::main)]
290 /// # #[cfg_attr(feature = "async-std-runtime", async_std::main)]
291 /// # async fn main() -> Result<()> {
292 /// # let client = Client::connect("127.0.0.1:6379").await?;
293 /// let result: Vec<String> = client.xgroup_help().await?;
294 /// assert!(result.iter().any(|e| e == "HELP"));
295 /// # Ok(())
296 /// # }
297 /// ```
298 ///
299 /// # See Also
300 /// [<https://redis.io/commands/xgroup-help/>](https://redis.io/commands/xgroup-help/)
301 #[must_use]
302 fn xgroup_help(self) -> PreparedCommand<'a, Self, Vec<String>>
303 where
304 Self: Sized,
305 {
306 prepare_command(self, cmd("XGROUP").arg("HELP"))
307 }
308
309 /// Set the last delivered ID for a consumer group.
310 ///
311 /// # See Also
312 /// [<https://redis.io/commands/xgroup-setid/>](https://redis.io/commands/xgroup-setid/)
313 fn xgroup_setid<K, G, I>(
314 self,
315 key: K,
316 groupname: G,
317 id: I,
318 entries_read: Option<usize>,
319 ) -> PreparedCommand<'a, Self, ()>
320 where
321 Self: Sized,
322 K: SingleArg,
323 G: SingleArg,
324 I: SingleArg,
325 {
326 prepare_command(
327 self,
328 cmd("XGROUP")
329 .arg("SETID")
330 .arg(key)
331 .arg(groupname)
332 .arg(id)
333 .arg(entries_read.map(|e| ("ENTRIESREAD", e))),
334 )
335 }
336
337 /// This command returns the list of consumers that belong to the `groupname` consumer group of the stream stored at `key`.
338 ///
339 /// # Return
340 /// A collection of XConsumerInfo.
341 ///
342 /// # See Also
343 /// [<https://redis.io/commands/xinfo-consumers/>](https://redis.io/commands/xinfo-consumers/)
344 fn xinfo_consumers<K, G>(
345 self,
346 key: K,
347 groupname: G,
348 ) -> PreparedCommand<'a, Self, Vec<XConsumerInfo>>
349 where
350 Self: Sized,
351 K: SingleArg,
352 G: SingleArg,
353 {
354 prepare_command(self, cmd("XINFO").arg("CONSUMERS").arg(key).arg(groupname))
355 }
356
357 /// This command returns the list of consumers that belong
358 /// to the `groupname` consumer group of the stream stored at `key`.
359 ///
360 /// # Return
361 /// A collection of XGroupInfo.
362 ///
363 /// # See Also
364 /// [<https://redis.io/commands/xinfo-groups/>](https://redis.io/commands/xinfo-groups/)
365 fn xinfo_groups<K>(self, key: K) -> PreparedCommand<'a, Self, Vec<XGroupInfo>>
366 where
367 Self: Sized,
368 K: SingleArg,
369 {
370 prepare_command(self, cmd("XINFO").arg("GROUPS").arg(key))
371 }
372
373 /// The command returns a helpful text describing the different XINFO subcommands.
374 ///
375 /// # Return
376 /// An array of strings.
377 ///
378 /// # Example
379 /// ```
380 /// # use rustis::{
381 /// # client::Client,
382 /// # commands::StreamCommands,
383 /// # Result,
384 /// # };
385 /// #
386 /// # #[cfg_attr(feature = "tokio-runtime", tokio::main)]
387 /// # #[cfg_attr(feature = "async-std-runtime", async_std::main)]
388 /// # async fn main() -> Result<()> {
389 /// # let client = Client::connect("127.0.0.1:6379").await?;
390 /// let result: Vec<String> = client.xinfo_help().await?;
391 /// assert!(result.iter().any(|e| e == "HELP"));
392 /// # Ok(())
393 /// # }
394 /// ```
395 ///
396 /// # See Also
397 /// [<https://redis.io/commands/xinfo-help/>](https://redis.io/commands/xinfo-help/)
398 #[must_use]
399 fn xinfo_help(self) -> PreparedCommand<'a, Self, Vec<String>>
400 where
401 Self: Sized,
402 {
403 prepare_command(self, cmd("XINFO").arg("HELP"))
404 }
405
406 /// This command returns information about the stream stored at `key`.
407 ///
408 /// # Return
409 /// A collection of XGroupInfo.
410 ///
411 /// # See Also
412 /// [<https://redis.io/commands/xinfo-stream/>](https://redis.io/commands/xinfo-stream/)
413 fn xinfo_stream<K>(
414 self,
415 key: K,
416 options: XInfoStreamOptions,
417 ) -> PreparedCommand<'a, Self, XStreamInfo>
418 where
419 Self: Sized,
420 K: SingleArg,
421 {
422 prepare_command(self, cmd("XINFO").arg("STREAM").arg(key).arg(options))
423 }
424
425 /// Returns the number of entries inside a stream.
426 ///
427 /// # Return
428 /// The number of entries of the stream at `key`.
429 ///
430 /// # See Also
431 /// [<https://redis.io/commands/xrange/>](https://redis.io/commands/xrange/)
432 fn xlen<K>(self, key: K) -> PreparedCommand<'a, Self, usize>
433 where
434 Self: Sized,
435 K: SingleArg,
436 {
437 prepare_command(self, cmd("XLEN").arg(key))
438 }
439
440 /// The XPENDING command is the interface to inspect the list of pending messages.
441 ///
442 /// # See Also
443 /// [<https://redis.io/commands/xpending/>](https://redis.io/commands/xpending/)
444 fn xpending<K, G>(self, key: K, group: G) -> PreparedCommand<'a, Self, XPendingResult>
445 where
446 Self: Sized,
447 K: SingleArg,
448 G: SingleArg,
449 {
450 prepare_command(self, cmd("XPENDING").arg(key).arg(group))
451 }
452
453 /// The XPENDING command is the interface to inspect the list of pending messages.
454 ///
455 /// # See Also
456 /// [<https://redis.io/commands/xpending/>](https://redis.io/commands/xpending/)
457 fn xpending_with_options<K, G>(
458 self,
459 key: K,
460 group: G,
461 options: XPendingOptions,
462 ) -> PreparedCommand<'a, Self, Vec<XPendingMessageResult>>
463 where
464 Self: Sized,
465 K: SingleArg,
466 G: SingleArg,
467 {
468 prepare_command(self, cmd("XPENDING").arg(key).arg(group).arg(options))
469 }
470
471 /// The command returns the stream entries matching a given range of IDs.
472 ///
473 /// # Return
474 /// A collection of StreamEntry
475 ///
476 /// The command returns the entries with IDs matching the specified range.
477 /// The returned entries are complete, that means that the ID and all the fields they are composed are returned.
478 /// Moreover, the entries are returned with their fields and values in the exact same order as XADD added them.
479 ///
480 /// # See Also
481 /// [<https://redis.io/commands/xrange/>](https://redis.io/commands/xrange/)
482 fn xrange<K, S, E, V>(
483 self,
484 key: K,
485 start: S,
486 end: E,
487 count: Option<usize>,
488 ) -> PreparedCommand<'a, Self, Vec<StreamEntry<V>>>
489 where
490 Self: Sized,
491 K: SingleArg,
492 S: SingleArg,
493 E: SingleArg,
494 V: PrimitiveResponse + DeserializeOwned,
495 {
496 prepare_command(
497 self,
498 cmd("XRANGE")
499 .arg(key)
500 .arg(start)
501 .arg(end)
502 .arg(count.map(|c| ("COUNT", c))),
503 )
504 }
505
506 /// Read data from one or multiple streams,
507 /// only returning entries with an ID greater than the last received ID reported by the caller.
508 ///
509 /// # Return
510 /// A collection of XReadStreamResult
511 ///
512 /// # See Also
513 /// [<https://redis.io/commands/xread/>](https://redis.io/commands/xread/)
514 fn xread<K, KK, I, II, V, R>(
515 self,
516 options: XReadOptions,
517 keys: KK,
518 ids: II,
519 ) -> PreparedCommand<'a, Self, R>
520 where
521 Self: Sized,
522 K: SingleArg,
523 KK: SingleArgCollection<K>,
524 I: SingleArg,
525 II: SingleArgCollection<I>,
526 V: PrimitiveResponse + DeserializeOwned,
527 R: KeyValueCollectionResponse<String, Vec<StreamEntry<V>>>,
528 {
529 prepare_command(
530 self,
531 cmd("XREAD").arg(options).arg("STREAMS").arg(keys).arg(ids),
532 )
533 }
534
535 /// The XREADGROUP command is a special version of the [`xread`](StreamCommands::xread)
536 /// command with support for consumer groups.
537 ///
538 /// # Return
539 /// A collection of XReadStreamResult
540 ///
541 /// # See Also
542 /// [<https://redis.io/commands/xreadgroup/>](https://redis.io/commands/xreadgroup/)
543 fn xreadgroup<G, C, K, KK, I, II, V, R>(
544 self,
545 group: G,
546 consumer: C,
547 options: XReadGroupOptions,
548 keys: KK,
549 ids: II,
550 ) -> PreparedCommand<'a, Self, R>
551 where
552 Self: Sized,
553 G: SingleArg,
554 C: SingleArg,
555 K: SingleArg,
556 KK: SingleArgCollection<K>,
557 I: SingleArg,
558 II: SingleArgCollection<I>,
559 V: PrimitiveResponse + DeserializeOwned,
560 R: KeyValueCollectionResponse<String, Vec<StreamEntry<V>>>,
561 {
562 prepare_command(
563 self,
564 cmd("XREADGROUP")
565 .arg("GROUP")
566 .arg(group)
567 .arg(consumer)
568 .arg(options)
569 .arg("STREAMS")
570 .arg(keys)
571 .arg(ids),
572 )
573 }
574
575 /// This command is exactly like [`xrange`](StreamCommands::xrange),
576 /// but with the notable difference of returning the entries in reverse order,
577 /// and also taking the start-end range in reverse order
578 ///
579 /// # Return
580 /// A collection of StreamEntry
581 ///
582 /// # See Also
583 /// [<https://redis.io/commands/xrevrange/>](https://redis.io/commands/xrevrange/)
584 fn xrevrange<K, E, S, V>(
585 self,
586 key: K,
587 end: E,
588 start: S,
589 count: Option<usize>,
590 ) -> PreparedCommand<'a, Self, Vec<StreamEntry<V>>>
591 where
592 Self: Sized,
593 K: SingleArg,
594 E: SingleArg,
595 S: SingleArg,
596 V: PrimitiveResponse + DeserializeOwned,
597 {
598 prepare_command(
599 self,
600 cmd("XREVRANGE")
601 .arg(key)
602 .arg(end)
603 .arg(start)
604 .arg(count.map(|c| ("COUNT", c))),
605 )
606 }
607
608 /// XTRIM trims the stream by evicting older entries (entries with lower IDs) if needed.
609 ///
610 /// # Return
611 /// The number of entries deleted from the stream.
612 ///
613 /// # See Also
614 /// [<https://redis.io/commands/xtrim/>](https://redis.io/commands/xtrim/)
615 fn xtrim<K>(self, key: K, options: XTrimOptions) -> PreparedCommand<'a, Self, usize>
616 where
617 Self: Sized,
618 K: SingleArg,
619 {
620 prepare_command(self, cmd("XTRIM").arg(key).arg(options))
621 }
622}
623
624/// Stream Add options for the [`xadd`](StreamCommands::xadd) command.
625#[derive(Default)]
626pub struct XAddOptions {
627 command_args: CommandArgs,
628}
629
630impl XAddOptions {
631 #[must_use]
632 pub fn no_mk_stream(mut self) -> Self {
633 Self {
634 command_args: self.command_args.arg("NOMKSTREAM").build(),
635 }
636 }
637
638 #[must_use]
639 pub fn trim_options(mut self, trim_options: XTrimOptions) -> Self {
640 Self {
641 command_args: self.command_args.arg(trim_options).build(),
642 }
643 }
644}
645
646impl ToArgs for XAddOptions {
647 fn write_args(&self, args: &mut CommandArgs) {
648 args.arg(&self.command_args);
649 }
650}
651
652/// Stream Trim operator for the [`xadd`](StreamCommands::xadd)
653/// and [`xtrim`](StreamCommands::xtrim) commands
654#[derive(Default)]
655pub enum XTrimOperator {
656 #[default]
657 None,
658 /// =
659 Equal,
660 /// ~
661 Approximately,
662}
663
664impl ToArgs for XTrimOperator {
665 fn write_args(&self, args: &mut CommandArgs) {
666 match self {
667 XTrimOperator::None => {}
668 XTrimOperator::Equal => {
669 args.arg("=");
670 }
671 XTrimOperator::Approximately => {
672 args.arg("~");
673 }
674 }
675 }
676}
677
678/// Stream Trim options for the [`xadd`](StreamCommands::xadd)
679/// and [`xtrim`](StreamCommands::xtrim) commands
680#[derive(Default)]
681pub struct XTrimOptions {
682 command_args: CommandArgs,
683}
684
685impl XTrimOptions {
686 #[must_use]
687 pub fn max_len(operator: XTrimOperator, threshold: i64) -> Self {
688 Self {
689 command_args: CommandArgs::default()
690 .arg("MAXLEN")
691 .arg(operator)
692 .arg(threshold)
693 .build(),
694 }
695 }
696
697 #[must_use]
698 pub fn min_id<I: SingleArg>(operator: XTrimOperator, threshold_id: I) -> Self {
699 Self {
700 command_args: CommandArgs::default()
701 .arg("MINID")
702 .arg(operator)
703 .arg(threshold_id)
704 .build(),
705 }
706 }
707
708 #[must_use]
709 pub fn limit(mut self, count: usize) -> Self {
710 Self {
711 command_args: self.command_args.arg("LIMIT").arg(count).build(),
712 }
713 }
714}
715
716impl ToArgs for XTrimOptions {
717 fn write_args(&self, args: &mut CommandArgs) {
718 args.arg(&self.command_args);
719 }
720}
721
722/// Options for the [`xautoclaim`](StreamCommands::xautoclaim) command
723#[derive(Default)]
724pub struct XAutoClaimOptions {
725 command_args: CommandArgs,
726}
727
728impl XAutoClaimOptions {
729 #[must_use]
730 pub fn count(mut self, count: usize) -> Self {
731 Self {
732 command_args: self.command_args.arg("COUNT").arg(count).build(),
733 }
734 }
735
736 #[must_use]
737 pub fn just_id(mut self) -> Self {
738 Self {
739 command_args: self.command_args.arg("JUSTID").build(),
740 }
741 }
742}
743
744impl ToArgs for XAutoClaimOptions {
745 fn write_args(&self, args: &mut CommandArgs) {
746 args.arg(&self.command_args);
747 }
748}
749
750/// Result for the [`xrange`](StreamCommands::xrange) and other associated commands.
751#[derive(Deserialize)]
752pub struct StreamEntry<V>
753where
754 V: PrimitiveResponse,
755{
756 /// The stream Id
757 pub stream_id: String,
758 /// entries with their fields and values in the exact same
759 /// order as [`xadd`](StreamCommands::xadd) added them.
760 pub items: HashMap<String, V>,
761}
762
763/// Result for the [`xautoclaim`](StreamCommands::xautoclaim) command.
764#[derive(Deserialize)]
765pub struct XAutoClaimResult<V>
766where
767 V: PrimitiveResponse,
768{
769 /// A stream ID to be used as the `start` argument for
770 /// the next call to [`xautoclaim`](StreamCommands::xautoclaim).
771 pub start_stream_id: String,
772 /// An array containing all the successfully claimed messages in
773 /// the same format as [`xrange`](StreamCommands::xrange).
774 pub entries: Vec<StreamEntry<V>>,
775 /// An array containing message IDs that no longer exist in the stream,
776 /// and were deleted from the PEL in which they were found.
777 pub deleted_ids: Vec<String>,
778}
779
780/// Options for the [`xclaim`](StreamCommands::xclaim) command
781#[derive(Default)]
782pub struct XClaimOptions {
783 command_args: CommandArgs,
784}
785
786impl XClaimOptions {
787 /// Set the idle time (last time it was delivered) of the message.
788 #[must_use]
789 pub fn idle_time(mut self, idle_time_millis: u64) -> Self {
790 Self {
791 command_args: self.command_args.arg("IDLE").arg(idle_time_millis).build(),
792 }
793 }
794
795 /// This is the same as `idle_time` but instead of a relative amount of milliseconds,
796 /// it sets the idle time to a specific Unix time (in milliseconds).
797 #[must_use]
798 pub fn time(mut self, unix_time_milliseconds: u64) -> Self {
799 Self {
800 command_args: self
801 .command_args
802 .arg("TIME")
803 .arg(unix_time_milliseconds)
804 .build(),
805 }
806 }
807
808 /// Set the retry counter to the specified value.
809 #[must_use]
810 pub fn retry_count(mut self, count: usize) -> Self {
811 Self {
812 command_args: self.command_args.arg("RETRYCOUNT").arg(count).build(),
813 }
814 }
815
816 /// Creates the pending message entry in the PEL
817 /// even if certain specified IDs are not already
818 /// in the PEL assigned to a different client.
819 #[must_use]
820 pub fn force(mut self) -> Self {
821 Self {
822 command_args: self.command_args.arg("FORCE").build(),
823 }
824 }
825
826 /// Return just an array of IDs of messages successfully claimed,
827 /// without returning the actual message.
828 #[must_use]
829 pub fn just_id(mut self) -> Self {
830 Self {
831 command_args: self.command_args.arg("JUSTID").build(),
832 }
833 }
834}
835
836impl ToArgs for XClaimOptions {
837 fn write_args(&self, args: &mut CommandArgs) {
838 args.arg(&self.command_args);
839 }
840}
841
842/// Options for the [`xgroup_create`](StreamCommands::xgroup_create) command
843#[derive(Default)]
844pub struct XGroupCreateOptions {
845 command_args: CommandArgs,
846}
847
848impl XGroupCreateOptions {
849 /// By default, the XGROUP CREATE command insists that the target stream exists and returns an error when it doesn't.
850 /// However, you can use the optional MKSTREAM subcommand as the last argument after the `id`
851 /// to automatically create the stream (with length of 0) if it doesn't exist
852 #[must_use]
853 pub fn mk_stream(mut self) -> Self {
854 Self {
855 command_args: self.command_args.arg("MKSTREAM").build(),
856 }
857 }
858
859 /// The optional entries_read named argument can be specified to enable consumer group lag tracking for an arbitrary ID.
860 /// An arbitrary ID is any ID that isn't the ID of the stream's first entry, its last entry or the zero ("0-0") ID.
861 /// This can be useful you know exactly how many entries are between the arbitrary ID (excluding it) and the stream's last entry.
862 /// In such cases, the entries_read can be set to the stream's entries_added subtracted with the number of entries.
863 #[must_use]
864 pub fn entries_read(mut self, entries_read: usize) -> Self {
865 Self {
866 command_args: self
867 .command_args
868 .arg("ENTRIESREAD")
869 .arg(entries_read)
870 .build(),
871 }
872 }
873}
874
875impl ToArgs for XGroupCreateOptions {
876 fn write_args(&self, args: &mut CommandArgs) {
877 args.arg(&self.command_args);
878 }
879}
880
881/// Result entry for the [`xinfo_consumers`](StreamCommands::xinfo_consumers) command.
882#[derive(Deserialize)]
883pub struct XConsumerInfo {
884 /// the consumer's name
885 pub name: String,
886
887 /// the number of pending messages for the client,
888 /// which are messages that were delivered but are yet to be acknowledged
889 pub pending: usize,
890
891 /// the number of milliseconds that have passed
892 /// since the consumer last interacted with the server
893 #[serde(rename = "idle")]
894 pub idle_millis: u64,
895}
896
897/// Result entry for the [`xinfo_groups`](StreamCommands::xinfo_groups) command.
898#[derive(Deserialize)]
899#[serde(rename_all = "kebab-case")]
900pub struct XGroupInfo {
901 /// the consumer group's name
902 pub name: String,
903
904 /// the number of consumers in the group
905 pub consumers: usize,
906
907 /// the length of the group's pending entries list (PEL),
908 /// which are messages that were delivered but are yet to be acknowledged
909 pub pending: usize,
910
911 /// the ID of the last entry delivered the group's consumers
912 pub last_delivered_id: String,
913
914 /// the logical "read counter" of the last entry delivered to group's consumers
915 pub entries_read: Option<usize>,
916
917 /// the number of entries in the stream that are still waiting to be delivered to the group's consumers,
918 /// or a NULL when that number can't be determined.
919 pub lag: Option<usize>,
920}
921
922/// Options for the [`xinfo_stream`](StreamCommands::xinfo_stream) command
923#[derive(Default)]
924pub struct XInfoStreamOptions {
925 command_args: CommandArgs,
926}
927
928impl XInfoStreamOptions {
929 /// The optional FULL modifier provides a more verbose reply.
930 #[must_use]
931 pub fn full(mut self) -> Self {
932 Self {
933 command_args: self.command_args.arg("FULL").build(),
934 }
935 }
936
937 /// The COUNT option can be used to limit the number of stream and PEL entries that are returned
938 /// (The first `count` entries are returned).
939 #[must_use]
940 pub fn count(mut self, count: usize) -> Self {
941 Self {
942 command_args: self.command_args.arg("COUNT").arg(count).build(),
943 }
944 }
945}
946
947impl ToArgs for XInfoStreamOptions {
948 fn write_args(&self, args: &mut CommandArgs) {
949 args.arg(&self.command_args);
950 }
951}
952
953/// Stream info returned by the [`xinfo_stream`](StreamCommands::xinfo_stream) command.
954#[derive(Deserialize)]
955#[serde(rename_all = "kebab-case")]
956pub struct XStreamInfo {
957 /// the number of entries in the stream (see [`xlen`](StreamCommands::xlen))
958 pub length: usize,
959
960 /// the number of keys in the underlying radix data structure
961 pub radix_tree_keys: usize,
962
963 /// the number of nodes in the underlying radix data structure
964 pub radix_tree_nodes: usize,
965
966 /// the number of consumer groups defined for the stream
967 pub groups: usize,
968
969 /// the ID of the least-recently entry that was added to the stream
970 pub last_generated_id: String,
971
972 /// the maximal entry ID that was deleted from the stream
973 pub max_deleted_entry_id: String,
974
975 /// the count of all entries added to the stream during its lifetime
976 pub entries_added: usize,
977
978 /// the ID and field-value tuples of the first entry in the stream
979 pub first_entry: StreamEntry<String>,
980
981 /// the ID and field-value tuples of the last entry in the stream
982 pub last_entry: StreamEntry<String>,
983
984 pub recorded_first_entry_id: String,
985}
986
987/// Options for the [`xread`](StreamCommands::xread) command
988#[derive(Default)]
989pub struct XReadOptions {
990 command_args: CommandArgs,
991}
992
993impl XReadOptions {
994 #[must_use]
995 pub fn count(mut self, count: usize) -> Self {
996 Self {
997 command_args: self.command_args.arg("COUNT").arg(count).build(),
998 }
999 }
1000
1001 #[must_use]
1002 pub fn block(mut self, milliseconds: u64) -> Self {
1003 Self {
1004 command_args: self.command_args.arg("BLOCK").arg(milliseconds).build(),
1005 }
1006 }
1007}
1008
1009impl ToArgs for XReadOptions {
1010 fn write_args(&self, args: &mut CommandArgs) {
1011 args.arg(&self.command_args);
1012 }
1013}
1014
1015/// Options for the [`xreadgroup`](StreamCommands::xreadgroup) command
1016#[derive(Default)]
1017pub struct XReadGroupOptions {
1018 command_args: CommandArgs,
1019}
1020
1021impl XReadGroupOptions {
1022 #[must_use]
1023 pub fn count(mut self, count: usize) -> Self {
1024 Self {
1025 command_args: self.command_args.arg("COUNT").arg(count).build(),
1026 }
1027 }
1028
1029 #[must_use]
1030 pub fn block(mut self, milliseconds: u64) -> Self {
1031 Self {
1032 command_args: self.command_args.arg("BLOCK").arg(milliseconds).build(),
1033 }
1034 }
1035
1036 #[must_use]
1037 pub fn no_ack(mut self) -> Self {
1038 Self {
1039 command_args: self.command_args.arg("NOACK").build(),
1040 }
1041 }
1042}
1043
1044impl ToArgs for XReadGroupOptions {
1045 fn write_args(&self, args: &mut CommandArgs) {
1046 args.arg(&self.command_args);
1047 }
1048}
1049
1050/// Options for the [`xpending_with_options`](StreamCommands::xpending_with_options) command
1051#[derive(Default)]
1052pub struct XPendingOptions {
1053 command_args: CommandArgs,
1054}
1055
1056impl XPendingOptions {
1057 #[must_use]
1058 pub fn idle(mut self, min_idle_time: u64) -> Self {
1059 Self {
1060 command_args: self.command_args.arg("IDLE").arg(min_idle_time).build(),
1061 }
1062 }
1063
1064 #[must_use]
1065 pub fn start<S: SingleArg>(mut self, start: S) -> Self {
1066 Self {
1067 command_args: self.command_args.arg(start).build(),
1068 }
1069 }
1070
1071 #[must_use]
1072 pub fn end<E: SingleArg>(mut self, end: E) -> Self {
1073 Self {
1074 command_args: self.command_args.arg(end).build(),
1075 }
1076 }
1077
1078 #[must_use]
1079 pub fn count(mut self, count: usize) -> Self {
1080 Self {
1081 command_args: self.command_args.arg(count).build(),
1082 }
1083 }
1084
1085 #[must_use]
1086 pub fn consumer<C: SingleArg>(mut self, consumer: C) -> Self {
1087 Self {
1088 command_args: self.command_args.arg(consumer).build(),
1089 }
1090 }
1091}
1092
1093impl ToArgs for XPendingOptions {
1094 fn write_args(&self, args: &mut CommandArgs) {
1095 args.arg(&self.command_args);
1096 }
1097}
1098
1099/// Result for the [`xpending`](StreamCommands::xpending) command
1100#[derive(Deserialize)]
1101pub struct XPendingResult {
1102 pub num_pending_messages: usize,
1103 pub smallest_id: String,
1104 pub greatest_id: String,
1105 pub consumers: Vec<XPendingConsumer>,
1106}
1107
1108/// Customer info result for the [`xpending`](StreamCommands::xpending) command
1109#[derive(Deserialize)]
1110pub struct XPendingConsumer {
1111 pub consumer: String,
1112 pub num_messages: usize,
1113}
1114
1115/// Message result for the [`xpending_with_options`](StreamCommands::xpending_with_options) command
1116#[derive(Deserialize)]
1117pub struct XPendingMessageResult {
1118 pub message_id: String,
1119 pub consumer: String,
1120 pub elapsed_millis: u64,
1121 pub times_delivered: usize,
1122}