1use std::collections::HashMap;
2
3use crate::{
4 prepare_command,
5 resp::{
6 cmd, CommandArg, CommandArgs, FromKeyValueValueArray, FromValue, HashMapExt, IntoArgs,
7 KeyValueArgOrCollection, SingleArgOrCollection, Value,
8 },
9 PreparedCommand, Result,
10};
11
12pub trait StreamCommands {
17 fn xack<K, G, I, II>(&mut self, key: K, group: G, ids: II) -> PreparedCommand<Self, usize>
28 where
29 Self: Sized,
30 K: Into<CommandArg>,
31 G: Into<CommandArg>,
32 I: Into<CommandArg>,
33 II: SingleArgOrCollection<I>,
34 {
35 prepare_command(self, cmd("XACK").arg(key).arg(group).arg(ids))
36 }
37
38 fn xadd<K, I, F, V, FFVV, R>(
51 &mut self,
52 key: K,
53 stream_id: I,
54 items: FFVV,
55 options: XAddOptions,
56 ) -> PreparedCommand<Self, R>
57 where
58 Self: Sized,
59 K: Into<CommandArg>,
60 I: Into<CommandArg>,
61 F: Into<CommandArg>,
62 V: Into<CommandArg>,
63 FFVV: KeyValueArgOrCollection<F, V>,
64 R: FromValue,
65 {
66 prepare_command(
67 self,
68 cmd("XADD").arg(key).arg(options).arg(stream_id).arg(items),
69 )
70 }
71
72 fn xautoclaim<K, G, C, I, V>(
80 &mut self,
81 key: K,
82 group: G,
83 consumer: C,
84 min_idle_time: u64,
85 start: I,
86 options: XAutoClaimOptions,
87 ) -> PreparedCommand<Self, XAutoClaimResult<V>>
88 where
89 Self: Sized,
90 K: Into<CommandArg>,
91 G: Into<CommandArg>,
92 C: Into<CommandArg>,
93 I: Into<CommandArg>,
94 V: FromValue,
95 {
96 prepare_command(
97 self,
98 cmd("XAUTOCLAIM")
99 .arg(key)
100 .arg(group)
101 .arg(consumer)
102 .arg(min_idle_time)
103 .arg(start)
104 .arg(options),
105 )
106 }
107
108 fn xclaim<K, G, C, I, II, V>(
122 &mut self,
123 key: K,
124 group: G,
125 consumer: C,
126 min_idle_time: u64,
127 ids: II,
128 options: XClaimOptions,
129 ) -> PreparedCommand<Self, Vec<StreamEntry<V>>>
130 where
131 Self: Sized,
132 K: Into<CommandArg>,
133 G: Into<CommandArg>,
134 C: Into<CommandArg>,
135 I: Into<CommandArg>,
136 II: SingleArgOrCollection<I>,
137 V: FromValue,
138 {
139 prepare_command(
140 self,
141 cmd("XCLAIM")
142 .arg(key)
143 .arg(group)
144 .arg(consumer)
145 .arg(min_idle_time)
146 .arg(ids)
147 .arg(options),
148 )
149 }
150
151 fn xdel<K, I, II>(&mut self, key: K, ids: II) -> PreparedCommand<Self, usize>
159 where
160 Self: Sized,
161 K: Into<CommandArg>,
162 I: Into<CommandArg>,
163 II: SingleArgOrCollection<I>,
164 {
165 prepare_command(self, cmd("XDEL").arg(key).arg(ids))
166 }
167
168 fn xgroup_create<K, G, I>(
177 &mut self,
178 key: K,
179 groupname: G,
180 id: I,
181 options: XGroupCreateOptions,
182 ) -> PreparedCommand<Self, bool>
183 where
184 Self: Sized,
185 K: Into<CommandArg>,
186 G: Into<CommandArg>,
187 I: Into<CommandArg>,
188 {
189 prepare_command(
190 self,
191 cmd("XGROUP")
192 .arg("CREATE")
193 .arg(key)
194 .arg(groupname)
195 .arg(id)
196 .arg(options),
197 )
198 }
199
200 fn xgroup_createconsumer<K, G, C>(
210 &mut self,
211 key: K,
212 groupname: G,
213 consumername: C,
214 ) -> PreparedCommand<Self, bool>
215 where
216 Self: Sized,
217 K: Into<CommandArg>,
218 G: Into<CommandArg>,
219 C: Into<CommandArg>,
220 {
221 prepare_command(
222 self,
223 cmd("XGROUP")
224 .arg("CREATECONSUMER")
225 .arg(key)
226 .arg(groupname)
227 .arg(consumername),
228 )
229 }
230
231 fn xgroup_delconsumer<K, G, C>(
239 &mut self,
240 key: K,
241 groupname: G,
242 consumername: C,
243 ) -> PreparedCommand<Self, usize>
244 where
245 Self: Sized,
246 K: Into<CommandArg>,
247 G: Into<CommandArg>,
248 C: Into<CommandArg>,
249 {
250 prepare_command(
251 self,
252 cmd("XGROUP")
253 .arg("DELCONSUMER")
254 .arg(key)
255 .arg(groupname)
256 .arg(consumername),
257 )
258 }
259
260 fn xgroup_destroy<K, G>(&mut self, key: K, groupname: G) -> PreparedCommand<Self, bool>
269 where
270 Self: Sized,
271 K: Into<CommandArg>,
272 G: Into<CommandArg>,
273 {
274 prepare_command(self, cmd("XGROUP").arg("DESTROY").arg(key).arg(groupname))
275 }
276
277 fn xgroup_setid<K, G, I>(
282 &mut self,
283 key: K,
284 groupname: G,
285 id: I,
286 entries_read: Option<usize>,
287 ) -> PreparedCommand<Self, ()>
288 where
289 Self: Sized,
290 K: Into<CommandArg>,
291 G: Into<CommandArg>,
292 I: Into<CommandArg>,
293 {
294 prepare_command(
295 self,
296 cmd("XGROUP")
297 .arg("SETID")
298 .arg(key)
299 .arg(groupname)
300 .arg(id)
301 .arg(entries_read.map(|e| ("ENTRIESREAD", e))),
302 )
303 }
304
305 fn xinfo_consumers<K, G>(
313 &mut self,
314 key: K,
315 groupname: G,
316 ) -> PreparedCommand<Self, Vec<XConsumerInfo>>
317 where
318 Self: Sized,
319 K: Into<CommandArg>,
320 G: Into<CommandArg>,
321 {
322 prepare_command(self, cmd("XINFO").arg("CONSUMERS").arg(key).arg(groupname))
323 }
324
325 fn xinfo_groups<K>(&mut self, key: K) -> PreparedCommand<Self, Vec<XGroupInfo>>
334 where
335 Self: Sized,
336 K: Into<CommandArg>,
337 {
338 prepare_command(self, cmd("XINFO").arg("GROUPS").arg(key))
339 }
340
341 fn xinfo_stream<K>(
349 &mut self,
350 key: K,
351 options: XInfoStreamOptions,
352 ) -> PreparedCommand<Self, XStreamInfo>
353 where
354 Self: Sized,
355 K: Into<CommandArg>,
356 {
357 prepare_command(self, cmd("XINFO").arg("STREAM").arg(key).arg(options))
358 }
359
360 fn xlen<K>(&mut self, key: K) -> PreparedCommand<Self, usize>
368 where
369 Self: Sized,
370 K: Into<CommandArg>,
371 {
372 prepare_command(self, cmd("XLEN").arg(key))
373 }
374
375 fn xpending<K, G>(&mut self, key: K, group: G) -> PreparedCommand<Self, XPendingResult>
380 where
381 Self: Sized,
382 K: Into<CommandArg>,
383 G: Into<CommandArg>,
384 {
385 prepare_command(self, cmd("XPENDING").arg(key).arg(group))
386 }
387
388 fn xpending_with_options<K, G>(
393 &mut self,
394 key: K,
395 group: G,
396 options: XPendingOptions,
397 ) -> PreparedCommand<Self, Vec<XPendingMessageResult>>
398 where
399 Self: Sized,
400 K: Into<CommandArg>,
401 G: Into<CommandArg>,
402 {
403 prepare_command(self, cmd("XPENDING").arg(key).arg(group).arg(options))
404 }
405
406 fn xrange<K, S, E, V>(
418 &mut self,
419 key: K,
420 start: S,
421 end: E,
422 count: Option<usize>,
423 ) -> PreparedCommand<Self, Vec<StreamEntry<V>>>
424 where
425 Self: Sized,
426 K: Into<CommandArg>,
427 S: Into<CommandArg>,
428 E: Into<CommandArg>,
429 V: FromValue,
430 {
431 prepare_command(
432 self,
433 cmd("XRANGE")
434 .arg(key)
435 .arg(start)
436 .arg(end)
437 .arg(count.map(|c| ("COUNT", c))),
438 )
439 }
440
441 fn xread<K, KK, I, II, V, R>(
450 &mut self,
451 options: XReadOptions,
452 keys: KK,
453 ids: II,
454 ) -> PreparedCommand<Self, R>
455 where
456 Self: Sized,
457 K: Into<CommandArg>,
458 KK: SingleArgOrCollection<K>,
459 I: Into<CommandArg>,
460 II: SingleArgOrCollection<I>,
461 V: FromValue,
462 R: FromKeyValueValueArray<String, Vec<StreamEntry<V>>>,
463 {
464 prepare_command(
465 self,
466 cmd("XREAD").arg(options).arg("STREAMS").arg(keys).arg(ids),
467 )
468 }
469
470 fn xreadgroup<G, C, K, KK, I, II, V, R>(
479 &mut self,
480 group: G,
481 consumer: C,
482 options: XReadGroupOptions,
483 keys: KK,
484 ids: II,
485 ) -> PreparedCommand<Self, R>
486 where
487 Self: Sized,
488 G: Into<CommandArg>,
489 C: Into<CommandArg>,
490 K: Into<CommandArg>,
491 KK: SingleArgOrCollection<K>,
492 I: Into<CommandArg>,
493 II: SingleArgOrCollection<I>,
494 V: FromValue,
495 R: FromKeyValueValueArray<String, Vec<StreamEntry<V>>>,
496 {
497 prepare_command(
498 self,
499 cmd("XREADGROUP")
500 .arg("GROUP")
501 .arg(group)
502 .arg(consumer)
503 .arg(options)
504 .arg("STREAMS")
505 .arg(keys)
506 .arg(ids),
507 )
508 }
509
510 fn xrevrange<K, E, S, V>(
520 &mut self,
521 key: K,
522 end: E,
523 start: S,
524 count: Option<usize>,
525 ) -> PreparedCommand<Self, Vec<StreamEntry<V>>>
526 where
527 Self: Sized,
528 K: Into<CommandArg>,
529 E: Into<CommandArg>,
530 S: Into<CommandArg>,
531 V: FromValue,
532 {
533 prepare_command(
534 self,
535 cmd("XREVRANGE")
536 .arg(key)
537 .arg(end)
538 .arg(start)
539 .arg(count.map(|c| ("COUNT", c))),
540 )
541 }
542
543 fn xtrim<K>(&mut self, key: K, options: XTrimOptions) -> PreparedCommand<Self, usize>
551 where
552 Self: Sized,
553 K: Into<CommandArg>,
554 {
555 prepare_command(self, cmd("XTRIM").arg(key).arg(options))
556 }
557}
558
559#[derive(Default)]
561pub struct XAddOptions {
562 command_args: CommandArgs,
563}
564
565impl XAddOptions {
566 #[must_use]
567 pub fn no_mk_stream(self) -> Self {
568 Self {
569 command_args: self.command_args.arg("NOMKSTREAM"),
570 }
571 }
572
573 #[must_use]
574 pub fn trim_options(self, trim_options: XTrimOptions) -> Self {
575 Self {
576 command_args: self.command_args.arg(trim_options),
577 }
578 }
579}
580
581impl IntoArgs for XAddOptions {
582 fn into_args(self, args: CommandArgs) -> CommandArgs {
583 args.arg(self.command_args)
584 }
585}
586
587pub enum XTrimOperator {
590 None,
591 Equal,
593 Approximately,
595}
596
597impl IntoArgs for XTrimOperator {
598 fn into_args(self, args: CommandArgs) -> CommandArgs {
599 match self {
600 XTrimOperator::None => args,
601 XTrimOperator::Equal => args.arg(CommandArg::Str("=")),
602 XTrimOperator::Approximately => args.arg(CommandArg::Str("~")),
603 }
604 }
605}
606
607impl Default for XTrimOperator {
608 fn default() -> Self {
609 XTrimOperator::None
610 }
611}
612
613#[derive(Default)]
616pub struct XTrimOptions {
617 command_args: CommandArgs,
618}
619
620impl XTrimOptions {
621 #[must_use]
622 pub fn max_len(operator: XTrimOperator, threshold: i64) -> Self {
623 Self {
624 command_args: CommandArgs::default()
625 .arg("MAXLEN")
626 .arg(operator)
627 .arg(threshold),
628 }
629 }
630
631 #[must_use]
632 pub fn min_id<I: Into<CommandArg>>(operator: XTrimOperator, threshold_id: I) -> Self {
633 Self {
634 command_args: CommandArgs::default()
635 .arg("MINID")
636 .arg(operator)
637 .arg(threshold_id),
638 }
639 }
640
641 #[must_use]
642 pub fn limit(self, count: usize) -> Self {
643 Self {
644 command_args: self.command_args.arg("LIMIT").arg(count),
645 }
646 }
647}
648
649impl IntoArgs for XTrimOptions {
650 fn into_args(self, args: CommandArgs) -> CommandArgs {
651 args.arg(self.command_args)
652 }
653}
654
655#[derive(Default)]
657pub struct XAutoClaimOptions {
658 command_args: CommandArgs,
659}
660
661impl XAutoClaimOptions {
662 #[must_use]
663 pub fn count(self, count: usize) -> Self {
664 Self {
665 command_args: self.command_args.arg("COUNT").arg(count),
666 }
667 }
668
669 #[must_use]
670 pub fn just_id(self) -> Self {
671 Self {
672 command_args: self.command_args.arg("JUSTID"),
673 }
674 }
675}
676
677impl IntoArgs for XAutoClaimOptions {
678 fn into_args(self, args: CommandArgs) -> CommandArgs {
679 args.arg(self.command_args)
680 }
681}
682
683pub struct StreamEntry<V>
684where
685 V: FromValue,
686{
687 pub stream_id: String,
688 pub items: HashMap<String, V>,
689}
690
691impl<V> FromValue for StreamEntry<V>
692where
693 V: FromValue,
694{
695 fn from_value(value: Value) -> Result<Self> {
696 let (stream_id, items): (String, HashMap<String, V>) = value.into()?;
697 Ok(Self { stream_id, items })
698 }
699}
700
701pub struct XAutoClaimResult<V>
702where
703 V: FromValue,
704{
705 pub start_stream_id: String,
706 pub entries: Vec<StreamEntry<V>>,
707 pub deleted_id: Vec<String>,
708}
709
710impl<V> FromValue for XAutoClaimResult<V>
711where
712 V: FromValue,
713{
714 fn from_value(value: Value) -> Result<Self> {
715 let (start_stream_id, entries, deleted_id): (String, Vec<StreamEntry<V>>, Vec<String>) =
716 value.into()?;
717 Ok(Self {
718 start_stream_id,
719 entries,
720 deleted_id,
721 })
722 }
723}
724
725#[derive(Default)]
727pub struct XClaimOptions {
728 command_args: CommandArgs,
729}
730
731impl XClaimOptions {
732 #[must_use]
734 pub fn idle_time(self, idle_time_millis: u64) -> Self {
735 Self {
736 command_args: self.command_args.arg("IDLE").arg(idle_time_millis),
737 }
738 }
739
740 #[must_use]
743 pub fn time(self, unix_time_milliseconds: u64) -> Self {
744 Self {
745 command_args: self.command_args.arg("TIME").arg(unix_time_milliseconds),
746 }
747 }
748
749 #[must_use]
751 pub fn retry_count(self, count: usize) -> Self {
752 Self {
753 command_args: self.command_args.arg("RETRYCOUNT").arg(count),
754 }
755 }
756
757 #[must_use]
761 pub fn force(self) -> Self {
762 Self {
763 command_args: self.command_args.arg("FORCE"),
764 }
765 }
766
767 #[must_use]
770 pub fn just_id(self) -> Self {
771 Self {
772 command_args: self.command_args.arg("JUSTID"),
773 }
774 }
775}
776
777impl IntoArgs for XClaimOptions {
778 fn into_args(self, args: CommandArgs) -> CommandArgs {
779 args.arg(self.command_args)
780 }
781}
782
783#[derive(Default)]
785pub struct XGroupCreateOptions {
786 command_args: CommandArgs,
787}
788
789impl XGroupCreateOptions {
790 #[must_use]
794 pub fn mk_stream(self) -> Self {
795 Self {
796 command_args: self.command_args.arg("MKSTREAM"),
797 }
798 }
799
800 #[must_use]
805 pub fn entries_read(self, entries_read: usize) -> Self {
806 Self {
807 command_args: self.command_args.arg("ENTRIESREAD").arg(entries_read),
808 }
809 }
810}
811
812impl IntoArgs for XGroupCreateOptions {
813 fn into_args(self, args: CommandArgs) -> CommandArgs {
814 args.arg(self.command_args)
815 }
816}
817
818pub struct XConsumerInfo {
820 pub name: String,
822
823 pub pending: usize,
826
827 pub idle_millis: u64,
830}
831
832impl FromValue for XConsumerInfo {
833 fn from_value(value: Value) -> Result<Self> {
834 let mut values: HashMap<String, Value> = value.into()?;
835
836 Ok(Self {
837 name: values.remove_with_result("name")?.into()?,
838 pending: values.remove_with_result("pending")?.into()?,
839 idle_millis: values.remove_with_result("idle")?.into()?,
840 })
841 }
842}
843
844pub struct XGroupInfo {
846 pub name: String,
848
849 pub consumers: usize,
851
852 pub pending: usize,
855
856 pub last_delivered_id: String,
858
859 pub entries_read: Option<usize>,
861
862 pub lag: Option<usize>,
865}
866
867impl FromValue for XGroupInfo {
868 fn from_value(value: Value) -> Result<Self> {
869 let mut values: HashMap<String, Value> = value.into()?;
870
871 Ok(Self {
872 name: values.remove_with_result("name")?.into()?,
873 consumers: values.remove_with_result("consumers")?.into()?,
874 pending: values.remove_with_result("pending")?.into()?,
875 last_delivered_id: values.remove_with_result("last-delivered-id")?.into()?,
876 entries_read: values.remove_with_result("entries-read")?.into()?,
877 lag: values.remove_with_result("lag")?.into()?,
878 })
879 }
880}
881
882#[derive(Default)]
884pub struct XInfoStreamOptions {
885 command_args: CommandArgs,
886}
887
888impl XInfoStreamOptions {
889 #[must_use]
891 pub fn full(self) -> Self {
892 Self {
893 command_args: self.command_args.arg("FULL"),
894 }
895 }
896
897 #[must_use]
900 pub fn count(self, count: usize) -> Self {
901 Self {
902 command_args: self.command_args.arg("COUNT").arg(count),
903 }
904 }
905}
906
907impl IntoArgs for XInfoStreamOptions {
908 fn into_args(self, args: CommandArgs) -> CommandArgs {
909 args.arg(self.command_args)
910 }
911}
912
913pub struct XStreamInfo {
915 pub length: usize,
917
918 pub radix_tree_keys: usize,
920
921 pub radix_tree_nodes: usize,
923
924 pub groups: usize,
926
927 pub last_generated_id: String,
929
930 pub max_deleted_entry_id: String,
932
933 pub entries_added: usize,
935
936 pub first_entry: StreamEntry<String>,
938
939 pub last_entry: StreamEntry<String>,
941
942 pub recorded_first_entry_id: String,
943}
944
945impl FromValue for XStreamInfo {
946 fn from_value(value: Value) -> Result<Self> {
947 let mut values: HashMap<String, Value> = value.into()?;
948
949 Ok(Self {
950 length: values.remove_with_result("length")?.into()?,
951 radix_tree_keys: values.remove_with_result("radix-tree-keys")?.into()?,
952 radix_tree_nodes: values.remove_with_result("radix-tree-nodes")?.into()?,
953 groups: values.remove_with_result("groups")?.into()?,
954 last_generated_id: values.remove_with_result("last-generated-id")?.into()?,
955 max_deleted_entry_id: values.remove_with_result("max-deleted-entry-id")?.into()?,
956 entries_added: values.remove_with_result("entries-added")?.into()?,
957 first_entry: values.remove_with_result("first-entry")?.into()?,
958 last_entry: values.remove_with_result("last-entry")?.into()?,
959 recorded_first_entry_id: values
960 .remove_with_result("recorded-first-entry-id")?
961 .into()?,
962 })
963 }
964}
965
966#[derive(Default)]
968pub struct XReadOptions {
969 command_args: CommandArgs,
970}
971
972impl XReadOptions {
973 #[must_use]
974 pub fn count(self, count: usize) -> Self {
975 Self {
976 command_args: self.command_args.arg("COUNT").arg(count),
977 }
978 }
979
980 #[must_use]
981 pub fn block(self, milliseconds: u64) -> Self {
982 Self {
983 command_args: self.command_args.arg("BLOCK").arg(milliseconds),
984 }
985 }
986}
987
988impl IntoArgs for XReadOptions {
989 fn into_args(self, args: CommandArgs) -> CommandArgs {
990 args.arg(self.command_args)
991 }
992}
993
994#[derive(Default)]
996pub struct XReadGroupOptions {
997 command_args: CommandArgs,
998}
999
1000impl XReadGroupOptions {
1001 #[must_use]
1002 pub fn count(self, count: usize) -> Self {
1003 Self {
1004 command_args: self.command_args.arg("COUNT").arg(count),
1005 }
1006 }
1007
1008 #[must_use]
1009 pub fn block(self, milliseconds: u64) -> Self {
1010 Self {
1011 command_args: self.command_args.arg("BLOCK").arg(milliseconds),
1012 }
1013 }
1014
1015 #[must_use]
1016 pub fn no_ack(self) -> Self {
1017 Self {
1018 command_args: self.command_args.arg("NOACK"),
1019 }
1020 }
1021}
1022
1023impl IntoArgs for XReadGroupOptions {
1024 fn into_args(self, args: CommandArgs) -> CommandArgs {
1025 args.arg(self.command_args)
1026 }
1027}
1028
1029#[derive(Default)]
1031pub struct XPendingOptions {
1032 command_args: CommandArgs,
1033}
1034
1035impl XPendingOptions {
1036 #[must_use]
1037 pub fn idle(self, min_idle_time: u64) -> Self {
1038 Self {
1039 command_args: self.command_args.arg("IDLE").arg(min_idle_time),
1040 }
1041 }
1042
1043 #[must_use]
1044 pub fn start<S: Into<CommandArg>>(self, start: S) -> Self {
1045 Self {
1046 command_args: self.command_args.arg(start),
1047 }
1048 }
1049
1050 #[must_use]
1051 pub fn end<E: Into<CommandArg>>(self, end: E) -> Self {
1052 Self {
1053 command_args: self.command_args.arg(end),
1054 }
1055 }
1056
1057 #[must_use]
1058 pub fn count(self, count: usize) -> Self {
1059 Self {
1060 command_args: self.command_args.arg(count),
1061 }
1062 }
1063
1064 #[must_use]
1065 pub fn consumer<C: Into<CommandArg>>(self, consumer: C) -> Self {
1066 Self {
1067 command_args: self.command_args.arg(consumer),
1068 }
1069 }
1070}
1071
1072impl IntoArgs for XPendingOptions {
1073 fn into_args(self, args: CommandArgs) -> CommandArgs {
1074 args.arg(self.command_args)
1075 }
1076}
1077
1078pub struct XPendingResult {
1080 pub num_pending_messages: usize,
1081 pub smallest_id: String,
1082 pub greatest_id: String,
1083 pub consumers: Vec<XPendingConsumer>,
1084}
1085
1086impl FromValue for XPendingResult {
1087 fn from_value(value: Value) -> Result<Self> {
1088 let (num_pending_messages, smallest_id, greatest_id, consumers): (
1089 usize,
1090 String,
1091 String,
1092 Vec<XPendingConsumer>,
1093 ) = value.into()?;
1094 Ok(Self {
1095 num_pending_messages,
1096 smallest_id,
1097 greatest_id,
1098 consumers,
1099 })
1100 }
1101}
1102
1103pub struct XPendingConsumer {
1105 pub consumer: String,
1106 pub num_messages: usize,
1107}
1108
1109impl FromValue for XPendingConsumer {
1110 fn from_value(value: Value) -> Result<Self> {
1111 let (consumer, num_messages): (String, usize) = value.into()?;
1112 Ok(Self {
1113 consumer,
1114 num_messages,
1115 })
1116 }
1117}
1118
1119pub struct XPendingMessageResult {
1121 pub message_id: String,
1122 pub consumer: String,
1123 pub elapsed_millis: u64,
1124 pub times_delivered: usize,
1125}
1126
1127impl FromValue for XPendingMessageResult {
1128 fn from_value(value: Value) -> Result<Self> {
1129 let (message_id, consumer, elapsed_millis, times_delivered): (String, String, u64, usize) =
1130 value.into()?;
1131 Ok(Self {
1132 message_id,
1133 consumer,
1134 elapsed_millis,
1135 times_delivered,
1136 })
1137 }
1138}