1#[cfg(feature = "streams")]
4use crate::{
5 errors::{invalid_type_error, ParsingError},
6 types::HashMap,
7 FromRedisValue, RedisWrite, ToRedisArgs, Value,
8};
9use crate::{from_redis_value, from_redis_value_ref, types::ToSingleRedisArg};
10
11#[derive(PartialEq, Eq, Clone, Debug, Copy)]
17#[non_exhaustive]
18pub enum StreamMaxlen {
19 Equals(usize),
21 Approx(usize),
23}
24
25impl ToRedisArgs for StreamMaxlen {
26 fn write_redis_args<W>(&self, out: &mut W)
27 where
28 W: ?Sized + RedisWrite,
29 {
30 let (ch, val) = match *self {
31 StreamMaxlen::Equals(v) => ("=", v),
32 StreamMaxlen::Approx(v) => ("~", v),
33 };
34 out.write_arg(b"MAXLEN");
35 out.write_arg(ch.as_bytes());
36 val.write_redis_args(out);
37 }
38}
39
40#[derive(Debug)]
43#[non_exhaustive]
44pub enum StreamTrimmingMode {
45 Exact,
47 Approx,
49}
50
51impl ToRedisArgs for StreamTrimmingMode {
52 fn write_redis_args<W>(&self, out: &mut W)
53 where
54 W: ?Sized + RedisWrite,
55 {
56 match self {
57 Self::Exact => out.write_arg(b"="),
58 Self::Approx => out.write_arg(b"~"),
59 };
60 }
61}
62
63#[derive(Debug)]
67#[non_exhaustive]
68pub enum StreamTrimStrategy {
69 MaxLen(StreamTrimmingMode, usize, Option<usize>),
71 MinId(StreamTrimmingMode, String, Option<usize>),
73}
74
75impl StreamTrimStrategy {
76 pub fn maxlen(trim: StreamTrimmingMode, max_entries: usize) -> Self {
78 Self::MaxLen(trim, max_entries, None)
79 }
80
81 pub fn minid(trim: StreamTrimmingMode, stream_id: impl Into<String>) -> Self {
83 Self::MinId(trim, stream_id.into(), None)
84 }
85
86 pub fn limit(self, limit: usize) -> Self {
88 match self {
89 StreamTrimStrategy::MaxLen(m, t, _) => StreamTrimStrategy::MaxLen(m, t, Some(limit)),
90 StreamTrimStrategy::MinId(m, t, _) => StreamTrimStrategy::MinId(m, t, Some(limit)),
91 }
92 }
93}
94
95impl ToRedisArgs for StreamTrimStrategy {
96 fn write_redis_args<W>(&self, out: &mut W)
97 where
98 W: ?Sized + RedisWrite,
99 {
100 let limit = match self {
101 StreamTrimStrategy::MaxLen(m, t, limit) => {
102 out.write_arg(b"MAXLEN");
103 m.write_redis_args(out);
104 t.write_redis_args(out);
105 limit
106 }
107 StreamTrimStrategy::MinId(m, t, limit) => {
108 out.write_arg(b"MINID");
109 m.write_redis_args(out);
110 t.write_redis_args(out);
111 limit
112 }
113 };
114 if let Some(limit) = limit {
115 out.write_arg(b"LIMIT");
116 limit.write_redis_args(out);
117 }
118 }
119}
120
121#[derive(Debug)]
126pub struct StreamTrimOptions {
127 strategy: StreamTrimStrategy,
128 deletion_policy: Option<StreamDeletionPolicy>,
129}
130
131impl StreamTrimOptions {
132 pub fn maxlen(mode: StreamTrimmingMode, max_entries: usize) -> Self {
134 Self {
135 strategy: StreamTrimStrategy::maxlen(mode, max_entries),
136 deletion_policy: None,
137 }
138 }
139
140 pub fn minid(mode: StreamTrimmingMode, stream_id: impl Into<String>) -> Self {
142 Self {
143 strategy: StreamTrimStrategy::minid(mode, stream_id),
144 deletion_policy: None,
145 }
146 }
147
148 pub fn limit(mut self, limit: usize) -> Self {
150 self.strategy = self.strategy.limit(limit);
151 self
152 }
153
154 pub fn set_deletion_policy(mut self, deletion_policy: StreamDeletionPolicy) -> Self {
156 self.deletion_policy = Some(deletion_policy);
157 self
158 }
159}
160
161impl ToRedisArgs for StreamTrimOptions {
162 fn write_redis_args<W>(&self, out: &mut W)
163 where
164 W: ?Sized + RedisWrite,
165 {
166 self.strategy.write_redis_args(out);
167 if let Some(deletion_policy) = self.deletion_policy.as_ref() {
168 deletion_policy.write_redis_args(out);
169 }
170 }
171}
172
173#[derive(Default, Debug)]
178pub struct StreamAddOptions {
179 nomkstream: bool,
180 trim: Option<StreamTrimStrategy>,
181 deletion_policy: Option<StreamDeletionPolicy>,
182}
183
184impl StreamAddOptions {
185 pub fn nomkstream(mut self) -> Self {
187 self.nomkstream = true;
188 self
189 }
190
191 pub fn trim(mut self, trim: StreamTrimStrategy) -> Self {
193 self.trim = Some(trim);
194 self
195 }
196
197 pub fn set_deletion_policy(mut self, deletion_policy: StreamDeletionPolicy) -> Self {
199 self.deletion_policy = Some(deletion_policy);
200 self
201 }
202}
203
204impl ToRedisArgs for StreamAddOptions {
205 fn write_redis_args<W>(&self, out: &mut W)
206 where
207 W: ?Sized + RedisWrite,
208 {
209 if self.nomkstream {
210 out.write_arg(b"NOMKSTREAM");
211 }
212 if let Some(strategy) = self.trim.as_ref() {
213 strategy.write_redis_args(out);
214 }
215 if let Some(deletion_policy) = self.deletion_policy.as_ref() {
216 deletion_policy.write_redis_args(out);
217 }
218 }
219}
220
221#[derive(Default, Debug)]
226pub struct StreamAutoClaimOptions {
227 count: Option<usize>,
228 justid: bool,
229}
230
231impl StreamAutoClaimOptions {
232 pub fn count(mut self, n: usize) -> Self {
234 self.count = Some(n);
235 self
236 }
237
238 pub fn with_justid(mut self) -> Self {
241 self.justid = true;
242 self
243 }
244}
245
246impl ToRedisArgs for StreamAutoClaimOptions {
247 fn write_redis_args<W>(&self, out: &mut W)
248 where
249 W: ?Sized + RedisWrite,
250 {
251 if let Some(ref count) = self.count {
252 out.write_arg(b"COUNT");
253 out.write_arg(format!("{count}").as_bytes());
254 }
255 if self.justid {
256 out.write_arg(b"JUSTID");
257 }
258 }
259}
260
261#[derive(Default, Debug)]
266pub struct StreamClaimOptions {
267 idle: Option<usize>,
269 time: Option<usize>,
271 retry: Option<usize>,
273 force: bool,
275 justid: bool,
278 lastid: Option<String>,
280}
281
282impl StreamClaimOptions {
283 pub fn idle(mut self, ms: usize) -> Self {
285 self.idle = Some(ms);
286 self
287 }
288
289 pub fn time(mut self, ms_time: usize) -> Self {
291 self.time = Some(ms_time);
292 self
293 }
294
295 pub fn retry(mut self, count: usize) -> Self {
297 self.retry = Some(count);
298 self
299 }
300
301 pub fn with_force(mut self) -> Self {
303 self.force = true;
304 self
305 }
306
307 pub fn with_justid(mut self) -> Self {
310 self.justid = true;
311 self
312 }
313
314 pub fn with_lastid(mut self, lastid: impl Into<String>) -> Self {
316 self.lastid = Some(lastid.into());
317 self
318 }
319}
320
321impl ToRedisArgs for StreamClaimOptions {
322 fn write_redis_args<W>(&self, out: &mut W)
323 where
324 W: ?Sized + RedisWrite,
325 {
326 if let Some(ref ms) = self.idle {
327 out.write_arg(b"IDLE");
328 out.write_arg(format!("{ms}").as_bytes());
329 }
330 if let Some(ref ms_time) = self.time {
331 out.write_arg(b"TIME");
332 out.write_arg(format!("{ms_time}").as_bytes());
333 }
334 if let Some(ref count) = self.retry {
335 out.write_arg(b"RETRYCOUNT");
336 out.write_arg(format!("{count}").as_bytes());
337 }
338 if self.force {
339 out.write_arg(b"FORCE");
340 }
341 if self.justid {
342 out.write_arg(b"JUSTID");
343 }
344 if let Some(ref lastid) = self.lastid {
345 out.write_arg(b"LASTID");
346 lastid.write_redis_args(out);
347 }
348 }
349}
350
351type SRGroup = Option<(Vec<Vec<u8>>, Vec<Vec<u8>>)>;
355#[derive(Default, Debug)]
360pub struct StreamReadOptions {
361 block: Option<usize>,
363 count: Option<usize>,
365 noack: Option<bool>,
367 group: SRGroup,
370 claim: Option<usize>,
373}
374
375impl StreamReadOptions {
376 pub fn read_only(&self) -> bool {
379 self.group.is_none()
380 }
381
382 pub fn noack(mut self) -> Self {
386 self.noack = Some(true);
387 self
388 }
389
390 pub fn block(mut self, ms: usize) -> Self {
392 self.block = Some(ms);
393 self
394 }
395
396 pub fn count(mut self, n: usize) -> Self {
398 self.count = Some(n);
399 self
400 }
401
402 pub fn group<GN: ToRedisArgs, CN: ToRedisArgs>(
404 mut self,
405 group_name: GN,
406 consumer_name: CN,
407 ) -> Self {
408 self.group = Some((
409 ToRedisArgs::to_redis_args(&group_name),
410 ToRedisArgs::to_redis_args(&consumer_name),
411 ));
412 self
413 }
414
415 pub fn claim(mut self, min_idle_time: usize) -> Self {
417 self.claim = Some(min_idle_time);
418 self
419 }
420}
421
422impl ToRedisArgs for StreamReadOptions {
423 fn write_redis_args<W>(&self, out: &mut W)
424 where
425 W: ?Sized + RedisWrite,
426 {
427 if let Some(ref group) = self.group {
428 out.write_arg(b"GROUP");
429 for i in &group.0 {
430 out.write_arg(i);
431 }
432 for i in &group.1 {
433 out.write_arg(i);
434 }
435 }
436
437 if let Some(ref ms) = self.block {
438 out.write_arg(b"BLOCK");
439 out.write_arg(format!("{ms}").as_bytes());
440 }
441
442 if let Some(ref n) = self.count {
443 out.write_arg(b"COUNT");
444 out.write_arg(format!("{n}").as_bytes());
445 }
446
447 if self.group.is_some() {
448 if self.noack == Some(true) {
450 out.write_arg(b"NOACK");
451 }
452 if let Some(ref min_idle_time) = self.claim {
454 out.write_arg(b"CLAIM");
455 out.write_arg(format!("{min_idle_time}").as_bytes());
456 }
457 }
458 }
459}
460
461#[derive(Default, Debug, Clone)]
466pub struct StreamAutoClaimReply {
467 pub next_stream_id: String,
469 pub claimed: Vec<StreamId>,
471 pub deleted_ids: Vec<String>,
473 pub invalid_entries: bool,
477}
478
479#[derive(Default, Debug, Clone)]
485pub struct StreamReadReply {
486 pub keys: Vec<StreamKey>,
488}
489
490#[derive(Default, Debug, Clone)]
502pub struct StreamRangeReply {
503 pub ids: Vec<StreamId>,
505}
506
507#[derive(Default, Debug, Clone)]
514pub struct StreamClaimReply {
515 pub ids: Vec<StreamId>,
517}
518
519#[derive(Debug, Clone, Default)]
527#[non_exhaustive]
528pub enum StreamPendingReply {
529 #[default]
531 Empty,
532 Data(StreamPendingData),
534}
535
536impl StreamPendingReply {
537 pub fn count(&self) -> usize {
539 match self {
540 StreamPendingReply::Empty => 0,
541 StreamPendingReply::Data(x) => x.count,
542 }
543 }
544}
545
546#[derive(Default, Debug, Clone)]
550pub struct StreamPendingData {
551 pub count: usize,
553 pub start_id: String,
555 pub end_id: String,
557 pub consumers: Vec<StreamInfoConsumer>,
561}
562
563#[derive(Default, Debug, Clone)]
573pub struct StreamPendingCountReply {
574 pub ids: Vec<StreamPendingId>,
578}
579
580#[derive(Default, Debug, Clone)]
589pub struct StreamInfoStreamReply {
590 pub last_generated_id: String,
593 pub radix_tree_keys: usize,
596 pub groups: usize,
598 pub length: usize,
600 pub first_entry: StreamId,
602 pub last_entry: StreamId,
604}
605
606#[derive(Default, Debug, Clone)]
612pub struct StreamInfoConsumersReply {
613 pub consumers: Vec<StreamInfoConsumer>,
615}
616
617#[derive(Default, Debug, Clone)]
625pub struct StreamInfoGroupsReply {
626 pub groups: Vec<StreamInfoGroup>,
628}
629
630#[derive(Default, Debug, Clone)]
635pub struct StreamInfoConsumer {
636 pub name: String,
638 pub pending: usize,
640 pub idle: usize,
642}
643
644#[derive(Default, Debug, Clone)]
649pub struct StreamInfoGroup {
650 pub name: String,
652 pub consumers: usize,
654 pub pending: usize,
656 pub last_delivered_id: String,
658 pub entries_read: Option<usize>,
661 pub lag: Option<usize>,
664}
665
666#[derive(Default, Debug, Clone)]
670pub struct StreamPendingId {
671 pub id: String,
673 pub consumer: String,
677 pub last_delivered_ms: usize,
680 pub times_delivered: usize,
682}
683
684#[derive(Default, Debug, Clone)]
686pub struct StreamKey {
687 pub key: String,
689 pub ids: Vec<StreamId>,
691}
692
693#[derive(Default, Debug, Clone, PartialEq)]
696pub struct StreamId {
697 pub id: String,
699 pub map: HashMap<String, Value>,
701 pub milliseconds_elapsed_from_delivery: Option<usize>,
703 pub delivered_count: Option<usize>,
705}
706
707impl StreamId {
708 fn from_array_value(v: Value) -> Result<Self, ParsingError> {
710 let mut stream_id = StreamId::default();
711 if let Value::Array(mut values) = v {
712 if let Some(v) = values.first_mut() {
713 stream_id.id = from_redis_value(std::mem::take(v))?;
714 }
715 if let Some(v) = values.first_mut() {
716 stream_id.map = from_redis_value(std::mem::take(v))?;
717 }
718 }
719
720 Ok(stream_id)
721 }
722
723 pub fn get<T: FromRedisValue>(&self, key: &str) -> Option<T> {
726 match self.map.get(key) {
727 Some(x) => from_redis_value_ref(x).ok(),
728 None => None,
729 }
730 }
731
732 pub fn contains_key(&self, key: &str) -> bool {
734 self.map.contains_key(key)
735 }
736
737 pub fn len(&self) -> usize {
739 self.map.len()
740 }
741
742 pub fn is_empty(&self) -> bool {
744 self.len() == 0
745 }
746}
747
748type SACRows = Vec<HashMap<String, HashMap<String, Value>>>;
749
750impl FromRedisValue for StreamAutoClaimReply {
751 fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
752 let Value::Array(mut items) = v else {
753 invalid_type_error!("Not a array response", v);
754 };
755
756 if items.len() > 3 || items.len() < 2 {
757 invalid_type_error!("Incorrect number of items", &items);
758 }
759
760 let deleted_ids = if items.len() == 3 {
761 from_redis_value(items.pop().unwrap())?
762 } else {
763 Vec::new()
764 };
765 let claimed = items.pop().unwrap();
767 let next_stream_id = from_redis_value(items.pop().unwrap())?;
768
769 let Value::Array(arr) = &claimed else {
770 invalid_type_error!("Incorrect type", claimed)
771 };
772 let Some(entry) = arr.iter().find(|val| !matches!(val, Value::Nil)) else {
773 return Ok(Self {
774 next_stream_id,
775 claimed: Vec::new(),
776 deleted_ids,
777 invalid_entries: !arr.is_empty(),
778 });
779 };
780 let (claimed, invalid_entries) = match entry {
781 Value::BulkString(_) => {
782 let claimed_count = arr.len();
784 let ids: Vec<Option<String>> = from_redis_value(claimed)?;
785
786 let claimed: Vec<_> = ids
787 .into_iter()
788 .filter_map(|id| {
789 id.map(|id| StreamId {
790 id,
791 ..Default::default()
792 })
793 })
794 .collect();
795 let invalid_entries = claimed.len() < claimed_count;
797 (claimed, invalid_entries)
798 }
799 Value::Array(_) => {
800 let claimed_count = arr.len();
802 let rows: SACRows = from_redis_value(claimed)?;
803
804 let claimed: Vec<_> = rows
805 .into_iter()
806 .flat_map(|row| {
807 row.into_iter().map(|(id, map)| StreamId {
808 id,
809 map,
810 milliseconds_elapsed_from_delivery: None,
811 delivered_count: None,
812 })
813 })
814 .collect();
815 let invalid_entries = claimed.len() < claimed_count;
817 (claimed, invalid_entries)
818 }
819 _ => invalid_type_error!("Incorrect type", claimed),
820 };
821
822 Ok(Self {
823 next_stream_id,
824 claimed,
825 deleted_ids,
826 invalid_entries,
827 })
828 }
829}
830
831type SRRows = Vec<HashMap<String, Vec<HashMap<String, HashMap<String, Value>>>>>;
832type SRClaimRows =
833 Vec<HashMap<String, Vec<(String, HashMap<String, Value>, Option<usize>, Option<usize>)>>>;
834
835impl FromRedisValue for StreamReadReply {
836 fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
837 if let Ok(rows) = from_redis_value::<SRRows>(v.clone()) {
839 return Ok(Self::from_standard_rows(rows));
840 }
841
842 if let Ok(rows) = from_redis_value::<SRClaimRows>(v.clone()) {
845 return Ok(Self::from_claim_rows(rows));
846 }
847
848 invalid_type_error!("Could not parse StreamReadReply in any known format", v)
849 }
850}
851
852impl StreamReadReply {
853 fn from_standard_rows(rows: SRRows) -> Self {
854 let keys = rows
855 .into_iter()
856 .flat_map(|row| {
857 row.into_iter().map(|(key, entries)| StreamKey {
858 key,
859 ids: entries
860 .into_iter()
861 .flat_map(|id_row| {
862 id_row.into_iter().map(|(id, map)| StreamId {
863 id,
864 map,
865 milliseconds_elapsed_from_delivery: None,
866 delivered_count: None,
867 })
868 })
869 .collect(),
870 })
871 })
872 .collect();
873 StreamReadReply { keys }
874 }
875
876 fn from_claim_rows(rows: SRClaimRows) -> Self {
877 let keys = rows
878 .into_iter()
879 .flat_map(|row| {
880 row.into_iter().map(|(key, entries)| StreamKey {
881 key,
882 ids: entries
883 .into_iter()
884 .map(
885 |(id, map, milliseconds_elapsed_from_delivery, delivered_count)| {
886 StreamId {
887 id,
888 map,
889 milliseconds_elapsed_from_delivery,
890 delivered_count,
891 }
892 },
893 )
894 .collect(),
895 })
896 })
897 .collect();
898 StreamReadReply { keys }
899 }
900}
901
902impl FromRedisValue for StreamRangeReply {
903 fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
904 let rows: Vec<HashMap<String, HashMap<String, Value>>> = from_redis_value(v)?;
905 let ids: Vec<StreamId> = rows
906 .into_iter()
907 .flat_map(|row| {
908 row.into_iter().map(|(id, map)| StreamId {
909 id,
910 map,
911 milliseconds_elapsed_from_delivery: None,
912 delivered_count: None,
913 })
914 })
915 .collect();
916 Ok(StreamRangeReply { ids })
917 }
918}
919
920impl FromRedisValue for StreamClaimReply {
921 fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
922 let rows: Vec<HashMap<String, HashMap<String, Value>>> = from_redis_value(v)?;
923 let ids: Vec<StreamId> = rows
924 .into_iter()
925 .flat_map(|row| {
926 row.into_iter().map(|(id, map)| StreamId {
927 id,
928 map,
929 milliseconds_elapsed_from_delivery: None,
930 delivered_count: None,
931 })
932 })
933 .collect();
934 Ok(StreamClaimReply { ids })
935 }
936}
937
938type SPRInner = (
939 usize,
940 Option<String>,
941 Option<String>,
942 Vec<Option<(String, String)>>,
943);
944impl FromRedisValue for StreamPendingReply {
945 fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
946 let (count, start, end, consumer_data): SPRInner = from_redis_value(v)?;
947
948 if count == 0 {
949 Ok(StreamPendingReply::Empty)
950 } else {
951 let mut result = StreamPendingData::default();
952
953 let start_id = start.ok_or_else(|| {
954 ParsingError::from(arcstr::literal!(
955 "IllegalState: Non-zero pending expects start id"
956 ))
957 })?;
958
959 let end_id = end.ok_or_else(|| {
960 ParsingError::from(arcstr::literal!(
961 "IllegalState: Non-zero pending expects end id"
962 ))
963 })?;
964
965 result.count = count;
966 result.start_id = start_id;
967 result.end_id = end_id;
968
969 result.consumers = consumer_data
970 .into_iter()
971 .flatten()
972 .map(|(name, pending)| StreamInfoConsumer {
973 name,
974 pending: pending.parse().unwrap_or_default(),
975 ..Default::default()
976 })
977 .collect();
978
979 Ok(StreamPendingReply::Data(result))
980 }
981 }
982}
983
984impl FromRedisValue for StreamPendingCountReply {
985 fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
986 let mut reply = StreamPendingCountReply::default();
987 match v {
988 Value::Array(outer_tuple) => {
989 for outer in outer_tuple {
990 match outer {
991 Value::Array(inner_tuple) => match &inner_tuple[..] {
992 [Value::BulkString(id_bytes), Value::BulkString(consumer_bytes), Value::Int(last_delivered_ms_u64), Value::Int(times_delivered_u64)] =>
993 {
994 let id = String::from_utf8(id_bytes.to_vec())?;
995 let consumer = String::from_utf8(consumer_bytes.to_vec())?;
996 let last_delivered_ms = *last_delivered_ms_u64 as usize;
997 let times_delivered = *times_delivered_u64 as usize;
998 reply.ids.push(StreamPendingId {
999 id,
1000 consumer,
1001 last_delivered_ms,
1002 times_delivered,
1003 });
1004 }
1005 _ => fail!(ParsingError::from(arcstr::literal!(
1006 "Cannot parse redis data (3)"
1007 ))),
1008 },
1009 _ => fail!(ParsingError::from(arcstr::literal!(
1010 "Cannot parse redis data (2)"
1011 ))),
1012 }
1013 }
1014 }
1015 _ => fail!(ParsingError::from(arcstr::literal!(
1016 "Cannot parse redis data (1)"
1017 ))),
1018 };
1019 Ok(reply)
1020 }
1021}
1022
1023impl FromRedisValue for StreamInfoStreamReply {
1024 fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
1025 let mut map: HashMap<String, Value> = from_redis_value(v)?;
1026 let mut reply = StreamInfoStreamReply::default();
1027 if let Some(v) = map.remove("last-generated-id") {
1028 reply.last_generated_id = from_redis_value(v)?;
1029 }
1030 if let Some(v) = map.remove("radix-tree-nodes") {
1031 reply.radix_tree_keys = from_redis_value(v)?;
1032 }
1033 if let Some(v) = map.remove("groups") {
1034 reply.groups = from_redis_value(v)?;
1035 }
1036 if let Some(v) = map.remove("length") {
1037 reply.length = from_redis_value(v)?;
1038 }
1039 if let Some(v) = map.remove("first-entry") {
1040 reply.first_entry = StreamId::from_array_value(v)?;
1041 }
1042 if let Some(v) = map.remove("last-entry") {
1043 reply.last_entry = StreamId::from_array_value(v)?;
1044 }
1045 Ok(reply)
1046 }
1047}
1048
1049impl FromRedisValue for StreamInfoConsumersReply {
1050 fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
1051 let consumers: Vec<HashMap<String, Value>> = from_redis_value(v)?;
1052 let mut reply = StreamInfoConsumersReply::default();
1053 for mut map in consumers {
1054 let mut c = StreamInfoConsumer::default();
1055 if let Some(v) = map.remove("name") {
1056 c.name = from_redis_value(v)?;
1057 }
1058 if let Some(v) = map.remove("pending") {
1059 c.pending = from_redis_value(v)?;
1060 }
1061 if let Some(v) = map.remove("idle") {
1062 c.idle = from_redis_value(v)?;
1063 }
1064 reply.consumers.push(c);
1065 }
1066
1067 Ok(reply)
1068 }
1069}
1070
1071impl FromRedisValue for StreamInfoGroupsReply {
1072 fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
1073 let groups: Vec<HashMap<String, Value>> = from_redis_value(v)?;
1074 let mut reply = StreamInfoGroupsReply::default();
1075 for mut map in groups {
1076 let mut g = StreamInfoGroup::default();
1077 if let Some(v) = map.remove("name") {
1078 g.name = from_redis_value(v)?;
1079 }
1080 if let Some(v) = map.remove("pending") {
1081 g.pending = from_redis_value(v)?;
1082 }
1083 if let Some(v) = map.remove("consumers") {
1084 g.consumers = from_redis_value(v)?;
1085 }
1086 if let Some(v) = map.remove("last-delivered-id") {
1087 g.last_delivered_id = from_redis_value(v)?;
1088 }
1089 if let Some(v) = map.remove("entries-read") {
1090 g.entries_read = if let Value::Nil = v {
1091 None
1092 } else {
1093 Some(from_redis_value(v)?)
1094 };
1095 }
1096 if let Some(v) = map.remove("lag") {
1097 g.lag = if let Value::Nil = v {
1098 None
1099 } else {
1100 Some(from_redis_value(v)?)
1101 };
1102 }
1103 reply.groups.push(g);
1104 }
1105 Ok(reply)
1106 }
1107}
1108
1109#[derive(Debug, Clone, Default)]
1111#[non_exhaustive]
1112pub enum StreamDeletionPolicy {
1113 #[default]
1115 KeepRef,
1116 DelRef,
1118 Acked,
1120}
1121
1122impl ToRedisArgs for StreamDeletionPolicy {
1123 fn write_redis_args<W>(&self, out: &mut W)
1124 where
1125 W: ?Sized + RedisWrite,
1126 {
1127 match self {
1128 StreamDeletionPolicy::KeepRef => out.write_arg(b"KEEPREF"),
1129 StreamDeletionPolicy::DelRef => out.write_arg(b"DELREF"),
1130 StreamDeletionPolicy::Acked => out.write_arg(b"ACKED"),
1131 }
1132 }
1133}
1134impl ToSingleRedisArg for StreamDeletionPolicy {}
1135
1136#[cfg(feature = "streams")]
1138#[cfg_attr(docsrs, doc(cfg(feature = "streams")))]
1139#[derive(Debug, PartialEq, Eq)]
1140#[non_exhaustive]
1141pub enum XDelExStatusCode {
1142 IdNotFound = -1,
1144 Deleted = 1,
1146 NotDeletedUnacknowledgedOrStillReferenced = 2,
1149}
1150
1151#[cfg(feature = "streams")]
1152impl FromRedisValue for XDelExStatusCode {
1153 fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
1154 match v {
1155 Value::Int(code) => match code {
1156 -1 => Ok(XDelExStatusCode::IdNotFound),
1157 1 => Ok(XDelExStatusCode::Deleted),
1158 2 => Ok(XDelExStatusCode::NotDeletedUnacknowledgedOrStillReferenced),
1159 _ => Err(format!("Invalid XDelExStatusCode status code: {code}").into()),
1160 },
1161 _ => Err(arcstr::literal!("Response type not XAckDelStatusCode compatible").into()),
1162 }
1163 }
1164}
1165
1166#[cfg(feature = "streams")]
1168#[cfg_attr(docsrs, doc(cfg(feature = "streams")))]
1169#[derive(Debug, PartialEq, Eq)]
1170#[non_exhaustive]
1171pub enum XAckDelStatusCode {
1172 IdNotFound = -1,
1174 AcknowledgedAndDeleted = 1,
1176 AcknowledgedNotDeletedStillReferenced = 2,
1178}
1179
1180#[cfg(feature = "streams")]
1181impl FromRedisValue for XAckDelStatusCode {
1182 fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
1183 match v {
1184 Value::Int(code) => match code {
1185 -1 => Ok(XAckDelStatusCode::IdNotFound),
1186 1 => Ok(XAckDelStatusCode::AcknowledgedAndDeleted),
1187 2 => Ok(XAckDelStatusCode::AcknowledgedNotDeletedStillReferenced),
1188 _ => Err(arcstr::literal!("Invalid XAckDelStatusCode status code: {code}").into()),
1189 },
1190 _ => Err(arcstr::literal!("Response type not XAckDelStatusCode compatible").into()),
1191 }
1192 }
1193}
1194
1195#[cfg(test)]
1196mod tests {
1197 use super::*;
1198
1199 fn assert_command_eq(object: impl ToRedisArgs, expected: &[u8]) {
1200 let mut out: Vec<Vec<u8>> = Vec::new();
1201
1202 object.write_redis_args(&mut out);
1203
1204 let mut cmd: Vec<u8> = Vec::new();
1205
1206 out.iter_mut().for_each(|item| {
1207 cmd.append(item);
1208 cmd.push(b' ');
1209 });
1210
1211 cmd.pop();
1212
1213 assert_eq!(cmd, expected);
1214 }
1215
1216 mod stream_auto_claim_reply {
1217 use super::*;
1218 use crate::Value;
1219
1220 #[test]
1221 fn short_response() {
1222 let value = Value::Array(vec![Value::BulkString("1713465536578-0".into())]);
1223
1224 StreamAutoClaimReply::from_redis_value(value).unwrap_err();
1225 }
1226
1227 #[test]
1228 fn parses_none_claimed_response() {
1229 let value = Value::Array(vec![
1230 Value::BulkString("0-0".into()),
1231 Value::Array(vec![]),
1232 Value::Array(vec![]),
1233 ]);
1234
1235 let reply: StreamAutoClaimReply = FromRedisValue::from_redis_value(value).unwrap();
1236
1237 assert_eq!(reply.next_stream_id.as_str(), "0-0");
1238 assert_eq!(reply.claimed.len(), 0);
1239 assert_eq!(reply.deleted_ids.len(), 0);
1240 }
1241
1242 #[test]
1243 fn parses_response() {
1244 let value = Value::Array(vec![
1245 Value::BulkString("1713465536578-0".into()),
1246 Value::Array(vec![
1247 Value::Array(vec![
1248 Value::BulkString("1713465533411-0".into()),
1249 Value::Array(vec![
1251 Value::BulkString("name".into()),
1252 Value::BulkString("test".into()),
1253 Value::BulkString("other".into()),
1254 Value::BulkString("whaterver".into()),
1255 ]),
1256 ]),
1257 Value::Array(vec![
1258 Value::BulkString("1713465536069-0".into()),
1259 Value::Array(vec![
1260 Value::BulkString("name".into()),
1261 Value::BulkString("another test".into()),
1262 Value::BulkString("other".into()),
1263 Value::BulkString("something".into()),
1264 ]),
1265 ]),
1266 ]),
1267 Value::Array(vec![Value::BulkString("123456789-0".into())]),
1268 ]);
1269
1270 let reply: StreamAutoClaimReply = FromRedisValue::from_redis_value(value).unwrap();
1271
1272 assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1273 assert_eq!(reply.claimed.len(), 2);
1274 assert_eq!(reply.claimed[0].id.as_str(), "1713465533411-0");
1275 assert!(
1276 matches!(reply.claimed[0].map.get("name"), Some(Value::BulkString(v)) if v == "test".as_bytes())
1277 );
1278 assert_eq!(reply.claimed[1].id.as_str(), "1713465536069-0");
1279 assert_eq!(reply.deleted_ids.len(), 1);
1280 assert!(reply.deleted_ids.contains(&"123456789-0".to_string()))
1281 }
1282
1283 #[test]
1284 fn parses_v6_response() {
1285 let value = Value::Array(vec![
1286 Value::BulkString("1713465536578-0".into()),
1287 Value::Array(vec![
1288 Value::Array(vec![
1289 Value::BulkString("1713465533411-0".into()),
1290 Value::Array(vec![
1291 Value::BulkString("name".into()),
1292 Value::BulkString("test".into()),
1293 Value::BulkString("other".into()),
1294 Value::BulkString("whaterver".into()),
1295 ]),
1296 ]),
1297 Value::Array(vec![
1298 Value::BulkString("1713465536069-0".into()),
1299 Value::Array(vec![
1300 Value::BulkString("name".into()),
1301 Value::BulkString("another test".into()),
1302 Value::BulkString("other".into()),
1303 Value::BulkString("something".into()),
1304 ]),
1305 ]),
1306 ]),
1307 ]);
1309
1310 let reply: StreamAutoClaimReply = FromRedisValue::from_redis_value(value).unwrap();
1311
1312 assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1313 assert_eq!(reply.claimed.len(), 2);
1314 let ids: Vec<_> = reply.claimed.iter().map(|e| e.id.as_str()).collect();
1315 assert!(ids.contains(&"1713465533411-0"));
1316 assert!(ids.contains(&"1713465536069-0"));
1317 assert_eq!(reply.deleted_ids.len(), 0);
1318 }
1319
1320 #[test]
1321 fn parses_justid_response() {
1322 let value = Value::Array(vec![
1323 Value::BulkString("1713465536578-0".into()),
1324 Value::Array(vec![
1325 Value::BulkString("1713465533411-0".into()),
1326 Value::BulkString("1713465536069-0".into()),
1327 ]),
1328 Value::Array(vec![Value::BulkString("123456789-0".into())]),
1329 ]);
1330
1331 let reply: StreamAutoClaimReply = FromRedisValue::from_redis_value(value).unwrap();
1332
1333 assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1334 assert_eq!(reply.claimed.len(), 2);
1335 let ids: Vec<_> = reply.claimed.iter().map(|e| e.id.as_str()).collect();
1336 assert!(ids.contains(&"1713465533411-0"));
1337 assert!(ids.contains(&"1713465536069-0"));
1338 assert_eq!(reply.deleted_ids.len(), 1);
1339 assert!(reply.deleted_ids.contains(&"123456789-0".to_string()))
1340 }
1341
1342 #[test]
1343 fn parses_v6_justid_response() {
1344 let value = Value::Array(vec![
1345 Value::BulkString("1713465536578-0".into()),
1346 Value::Array(vec![
1347 Value::BulkString("1713465533411-0".into()),
1348 Value::BulkString("1713465536069-0".into()),
1349 ]),
1350 ]);
1352
1353 let reply: StreamAutoClaimReply = FromRedisValue::from_redis_value(value).unwrap();
1354
1355 assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1356 assert_eq!(reply.claimed.len(), 2);
1357 let ids: Vec<_> = reply.claimed.iter().map(|e| e.id.as_str()).collect();
1358 assert!(ids.contains(&"1713465533411-0"));
1359 assert!(ids.contains(&"1713465536069-0"));
1360 assert_eq!(reply.deleted_ids.len(), 0);
1361 }
1362 }
1363
1364 mod stream_trim_options {
1365 use super::*;
1366
1367 #[test]
1368 fn maxlen_trim() {
1369 let options = StreamTrimOptions::maxlen(StreamTrimmingMode::Approx, 10);
1370
1371 assert_command_eq(options, b"MAXLEN ~ 10");
1372 }
1373
1374 #[test]
1375 fn maxlen_exact_trim() {
1376 let options = StreamTrimOptions::maxlen(StreamTrimmingMode::Exact, 10);
1377
1378 assert_command_eq(options, b"MAXLEN = 10");
1379 }
1380
1381 #[test]
1382 fn maxlen_trim_limit() {
1383 let options = StreamTrimOptions::maxlen(StreamTrimmingMode::Approx, 10).limit(5);
1384
1385 assert_command_eq(options, b"MAXLEN ~ 10 LIMIT 5");
1386 }
1387 #[test]
1388 fn minid_trim_limit() {
1389 let options = StreamTrimOptions::minid(StreamTrimmingMode::Exact, "123456-7").limit(5);
1390
1391 assert_command_eq(options, b"MINID = 123456-7 LIMIT 5");
1392 }
1393 }
1394
1395 mod stream_add_options {
1396 use super::*;
1397
1398 #[test]
1399 fn the_default() {
1400 let options = StreamAddOptions::default();
1401
1402 assert_command_eq(options, b"");
1403 }
1404
1405 #[test]
1406 fn with_maxlen_trim() {
1407 let options = StreamAddOptions::default()
1408 .trim(StreamTrimStrategy::maxlen(StreamTrimmingMode::Exact, 10));
1409
1410 assert_command_eq(options, b"MAXLEN = 10");
1411 }
1412
1413 #[test]
1414 fn with_nomkstream() {
1415 let options = StreamAddOptions::default().nomkstream();
1416
1417 assert_command_eq(options, b"NOMKSTREAM");
1418 }
1419
1420 #[test]
1421 fn with_nomkstream_and_maxlen_trim() {
1422 let options = StreamAddOptions::default()
1423 .nomkstream()
1424 .trim(StreamTrimStrategy::maxlen(StreamTrimmingMode::Exact, 10));
1425
1426 assert_command_eq(options, b"NOMKSTREAM MAXLEN = 10");
1427 }
1428 }
1429}