1use std::time::{Duration, SystemTime, UNIX_EPOCH};
2
3use redis::{FromRedisValue, RedisError, RedisResult, RedisWrite, ToRedisArgs, Value};
4use serde::{Deserialize, Serialize};
5use uuid::Uuid;
6
7macro_rules! parse_value {
8 (String, $value:ident, $field:literal) => {
9 match $value {
10 Value::SimpleString(s) => Ok(s.clone()),
11 Value::BulkString(s) => String::from_utf8(s.clone()).map_err(|_| {
12 RedisError::from((
13 redis::ErrorKind::TypeError,
14 concat!("Invalid string for ", $field),
15 ))
16 }),
17 _ => Err(RedisError::from((
18 redis::ErrorKind::TypeError,
19 concat!($field, " must be a string"),
20 ))),
21 }
22 };
23 (Uuid, $value:ident, $field:literal) => {
24 match $value {
25 Value::SimpleString(s) => Uuid::parse_str(s).map_err(|_| {
26 RedisError::from((
27 redis::ErrorKind::TypeError,
28 concat!("Invalid UUID format for ", $field),
29 ))
30 }),
31 Value::BulkString(s) => str::from_utf8(s)
32 .ok()
33 .and_then(|s| Uuid::parse_str(s).ok())
34 .ok_or_else(|| {
35 RedisError::from((
36 redis::ErrorKind::TypeError,
37 concat!("Invalid UUID format for ", $field),
38 ))
39 }),
40 _ => Err(RedisError::from((
41 redis::ErrorKind::TypeError,
42 concat!($field, " must be a string"),
43 ))),
44 }
45 };
46}
47
48#[derive(Clone, Debug, PartialEq, Eq)]
50pub enum SierraMessage {
51 Event { event: Event, cursor: u64 },
53 SubscriptionConfirmed { subscription_count: i64 },
55}
56
57#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
58pub struct HelloResp {
59 pub server: String,
60 pub version: String,
61 pub peer_id: String,
62 pub num_partitions: u16,
63}
64
65impl FromRedisValue for HelloResp {
66 fn from_redis_value(value: &Value) -> RedisResult<Self> {
67 match value {
68 Value::Map(fields) => {
69 let mut server = None;
70 let mut version = None;
71 let mut peer_id = None;
72 let mut num_partitions = None;
73
74 for (key, val) in fields {
76 let field_name = match key {
77 Value::SimpleString(s) => s.as_str(),
78 Value::BulkString(s) => std::str::from_utf8(s).map_err(|_| {
79 RedisError::from((
80 redis::ErrorKind::TypeError,
81 "Invalid string for hello resp map key",
82 ))
83 })?,
84 _ => continue,
85 };
86
87 match field_name {
88 "server" => {
89 server = Some(parse_value!(String, val, "server")?);
90 }
91 "version" => {
92 version = Some(parse_value!(String, val, "version")?);
93 }
94 "peer_id" => {
95 peer_id = Some(parse_value!(String, val, "peer_id")?);
96 }
97 "num_partitions" => {
98 num_partitions = Some(match val {
99 Value::Int(n) => *n as u16,
100 _ => {
101 return Err(RedisError::from((
102 redis::ErrorKind::TypeError,
103 "num_partitions must be an integer",
104 )));
105 }
106 });
107 }
108 _ => {} }
110 }
111
112 let server = server.ok_or_else(|| {
114 RedisError::from((
115 redis::ErrorKind::TypeError,
116 "Missing required field: server",
117 ))
118 })?;
119 let version = version.ok_or_else(|| {
120 RedisError::from((
121 redis::ErrorKind::TypeError,
122 "Missing required field: version",
123 ))
124 })?;
125 let peer_id = peer_id.ok_or_else(|| {
126 RedisError::from((
127 redis::ErrorKind::TypeError,
128 "Missing required field: peer_id",
129 ))
130 })?;
131 let num_partitions = num_partitions.ok_or_else(|| {
132 RedisError::from((
133 redis::ErrorKind::TypeError,
134 "Missing required field: num_partitions",
135 ))
136 })?;
137
138 Ok(HelloResp {
139 server,
140 version,
141 peer_id,
142 num_partitions,
143 })
144 }
145 _ => Err(RedisError::from((
146 redis::ErrorKind::TypeError,
147 "Hello resp must be a Redis map",
148 ))),
149 }
150 }
151}
152
153#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
154pub struct AppendInfo {
155 pub event_id: Uuid,
156 pub partition_key: Uuid,
157 pub partition_id: u16,
158 pub partition_sequence: u64,
159 pub stream_version: u64,
160 pub timestamp: SystemTime,
161}
162
163impl FromRedisValue for AppendInfo {
164 fn from_redis_value(value: &Value) -> RedisResult<Self> {
165 match value {
166 Value::Map(fields) => {
167 let mut event_id = None;
168 let mut partition_key = None;
169 let mut partition_id = None;
170 let mut partition_sequence = None;
171 let mut stream_version = None;
172 let mut timestamp = None;
173
174 for (key, val) in fields {
176 let field_name = match key {
177 Value::SimpleString(s) => s.as_str(),
178 _ => continue,
179 };
180
181 match field_name {
182 "event_id" => {
183 event_id = Some(parse_value!(Uuid, val, "event_id")?);
184 }
185 "partition_key" => {
186 partition_key = Some(parse_value!(Uuid, val, "partition_key")?);
187 }
188 "partition_id" => {
189 partition_id = Some(match val {
190 Value::Int(n) => *n as u16,
191 _ => {
192 return Err(RedisError::from((
193 redis::ErrorKind::TypeError,
194 "partition_id must be an integer",
195 )));
196 }
197 });
198 }
199 "partition_sequence" => {
200 partition_sequence = Some(match val {
201 Value::Int(n) => *n as u64,
202 _ => {
203 return Err(RedisError::from((
204 redis::ErrorKind::TypeError,
205 "partition_sequence must be an integer",
206 )));
207 }
208 });
209 }
210 "stream_version" => {
211 stream_version = Some(match val {
212 Value::Int(n) => *n as u64,
213 _ => {
214 return Err(RedisError::from((
215 redis::ErrorKind::TypeError,
216 "stream_version must be an integer",
217 )));
218 }
219 });
220 }
221 "timestamp" => {
222 timestamp = Some(match val {
223 Value::Int(ms) => {
224 let duration = Duration::from_millis(*ms as u64);
225 UNIX_EPOCH + duration
226 }
227 _ => {
228 return Err(RedisError::from((
229 redis::ErrorKind::TypeError,
230 "timestamp must be an integer",
231 )));
232 }
233 });
234 }
235 _ => {} }
237 }
238
239 let event_id = event_id.ok_or_else(|| {
241 RedisError::from((
242 redis::ErrorKind::TypeError,
243 "Missing required field: event_id",
244 ))
245 })?;
246 let partition_key = partition_key.ok_or_else(|| {
247 RedisError::from((
248 redis::ErrorKind::TypeError,
249 "Missing required field: partition_key",
250 ))
251 })?;
252 let partition_id = partition_id.ok_or_else(|| {
253 RedisError::from((
254 redis::ErrorKind::TypeError,
255 "Missing required field: partition_id",
256 ))
257 })?;
258 let partition_sequence = partition_sequence.ok_or_else(|| {
259 RedisError::from((
260 redis::ErrorKind::TypeError,
261 "Missing required field: partition_sequence",
262 ))
263 })?;
264 let stream_version = stream_version.ok_or_else(|| {
265 RedisError::from((
266 redis::ErrorKind::TypeError,
267 "Missing required field: stream_version",
268 ))
269 })?;
270 let timestamp = timestamp.ok_or_else(|| {
271 RedisError::from((
272 redis::ErrorKind::TypeError,
273 "Missing required field: timestamp",
274 ))
275 })?;
276
277 Ok(AppendInfo {
278 event_id,
279 partition_key,
280 partition_id,
281 partition_sequence,
282 stream_version,
283 timestamp,
284 })
285 }
286 _ => Err(RedisError::from((
287 redis::ErrorKind::TypeError,
288 "Append info must be a Redis map",
289 ))),
290 }
291 }
292}
293
294#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
295pub struct EventBatch {
296 pub events: Vec<Event>,
297 pub has_more: bool,
298}
299
300impl FromRedisValue for EventBatch {
301 fn from_redis_value(value: &Value) -> RedisResult<Self> {
302 match value {
303 Value::Map(fields) => {
304 let mut has_more = None;
305 let mut events = None;
306
307 for (key, val) in fields {
309 let field_name = match key {
310 Value::SimpleString(s) => s.as_str(),
311 _ => continue,
312 };
313
314 match field_name {
315 "has_more" => {
316 has_more = Some(match val {
317 Value::Boolean(b) => *b,
318 _ => {
319 return Err(RedisError::from((
320 redis::ErrorKind::TypeError,
321 "has_more must be a boolean",
322 )));
323 }
324 });
325 }
326 "events" => {
327 events = Some(match val {
328 Value::Array(event_values) => event_values
329 .iter()
330 .map(Event::from_redis_value)
331 .collect::<Result<Vec<_>, _>>()?,
332 _ => {
333 return Err(RedisError::from((
334 redis::ErrorKind::TypeError,
335 "events must be an array",
336 )));
337 }
338 });
339 }
340 _ => {} }
342 }
343
344 let has_more = has_more.ok_or_else(|| {
346 RedisError::from((
347 redis::ErrorKind::TypeError,
348 "Missing required field: has_more",
349 ))
350 })?;
351 let events = events.ok_or_else(|| {
352 RedisError::from((
353 redis::ErrorKind::TypeError,
354 "Missing required field: events",
355 ))
356 })?;
357
358 Ok(EventBatch { events, has_more })
359 }
360 _ => Err(RedisError::from((
361 redis::ErrorKind::TypeError,
362 "Event batch must be a Redis map",
363 ))),
364 }
365 }
366}
367
368#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
370pub struct Event {
371 pub event_id: Uuid,
374
375 pub partition_key: Uuid,
381
382 pub partition_id: u16,
389
390 pub transaction_id: Uuid,
396
397 pub partition_sequence: u64,
403
404 pub stream_version: u64,
411
412 pub timestamp: SystemTime,
417
418 pub stream_id: String,
424
425 pub event_name: String,
430
431 pub metadata: Vec<u8>,
436
437 pub payload: Vec<u8>,
442}
443
444impl Event {
445 pub fn sequence_or_version_for_stream(&self) -> u64 {
451 self.stream_version
452 }
453
454 pub fn sequence_for_partition(&self) -> u64 {
460 self.partition_sequence
461 }
462
463 pub fn version_for_stream(&self) -> u64 {
468 self.stream_version
469 }
470}
471
472impl FromRedisValue for Event {
473 fn from_redis_value(value: &Value) -> RedisResult<Self> {
474 match value {
475 Value::Map(fields) => {
476 let mut event_id = None;
477 let mut partition_key = None;
478 let mut partition_id = None;
479 let mut transaction_id = None;
480 let mut partition_sequence = None;
481 let mut stream_version = None;
482 let mut timestamp = None;
483 let mut stream_id = None;
484 let mut event_name = None;
485 let mut metadata = None;
486 let mut payload = None;
487
488 for (key, val) in fields {
490 let field_name = match key {
491 Value::SimpleString(s) => s.as_str(),
492 _ => continue,
493 };
494
495 match field_name {
496 "event_id" => {
497 event_id = Some(parse_value!(Uuid, val, "event_id")?);
498 }
499 "partition_key" => {
500 partition_key = Some(parse_value!(Uuid, val, "partition_key")?);
501 }
502 "partition_id" => {
503 partition_id = Some(match val {
504 Value::Int(n) => *n as u16,
505 _ => {
506 return Err(RedisError::from((
507 redis::ErrorKind::TypeError,
508 "partition_id must be an integer",
509 )));
510 }
511 });
512 }
513 "transaction_id" => {
514 transaction_id = Some(parse_value!(Uuid, val, "transaction_id")?);
515 }
516 "partition_sequence" => {
517 partition_sequence = Some(match val {
518 Value::Int(n) => *n as u64,
519 _ => {
520 return Err(RedisError::from((
521 redis::ErrorKind::TypeError,
522 "partition_sequence must be an integer",
523 )));
524 }
525 });
526 }
527 "stream_version" => {
528 stream_version = Some(match val {
529 Value::Int(n) => *n as u64,
530 _ => {
531 return Err(RedisError::from((
532 redis::ErrorKind::TypeError,
533 "stream_version must be an integer",
534 )));
535 }
536 });
537 }
538 "timestamp" => {
539 timestamp = Some(match val {
540 Value::Int(ms) => {
541 let duration = Duration::from_millis(*ms as u64);
542 UNIX_EPOCH + duration
543 }
544 _ => {
545 return Err(RedisError::from((
546 redis::ErrorKind::TypeError,
547 "timestamp must be an integer",
548 )));
549 }
550 });
551 }
552 "stream_id" => {
553 stream_id = Some(parse_value!(String, val, "stream_id")?);
554 }
555 "event_name" => {
556 event_name = Some(parse_value!(String, val, "event_name")?);
557 }
558 "metadata" => {
559 metadata = Some(match val {
560 Value::BulkString(data) => data.clone(),
561 Value::Nil => Vec::new(), _ => {
563 return Err(RedisError::from((
564 redis::ErrorKind::TypeError,
565 "metadata must be bulk data",
566 )));
567 }
568 });
569 }
570 "payload" => {
571 payload = Some(match val {
572 Value::BulkString(data) => data.clone(),
573 Value::Nil => Vec::new(),
574 _ => {
575 return Err(RedisError::from((
576 redis::ErrorKind::TypeError,
577 "payload must be bulk data",
578 )));
579 }
580 });
581 }
582 _ => {} }
584 }
585
586 let event_id = event_id.ok_or_else(|| {
588 RedisError::from((
589 redis::ErrorKind::TypeError,
590 "Missing required field: event_id",
591 ))
592 })?;
593 let partition_key = partition_key.ok_or_else(|| {
594 RedisError::from((
595 redis::ErrorKind::TypeError,
596 "Missing required field: partition_key",
597 ))
598 })?;
599 let partition_id = partition_id.ok_or_else(|| {
600 RedisError::from((
601 redis::ErrorKind::TypeError,
602 "Missing required field: partition_id",
603 ))
604 })?;
605 let transaction_id = transaction_id.ok_or_else(|| {
606 RedisError::from((
607 redis::ErrorKind::TypeError,
608 "Missing required field: transaction_id",
609 ))
610 })?;
611 let partition_sequence = partition_sequence.ok_or_else(|| {
612 RedisError::from((
613 redis::ErrorKind::TypeError,
614 "Missing required field: partition_sequence",
615 ))
616 })?;
617 let stream_version = stream_version.ok_or_else(|| {
618 RedisError::from((
619 redis::ErrorKind::TypeError,
620 "Missing required field: stream_version",
621 ))
622 })?;
623 let timestamp = timestamp.ok_or_else(|| {
624 RedisError::from((
625 redis::ErrorKind::TypeError,
626 "Missing required field: timestamp",
627 ))
628 })?;
629 let stream_id = stream_id.ok_or_else(|| {
630 RedisError::from((
631 redis::ErrorKind::TypeError,
632 "Missing required field: stream_id",
633 ))
634 })?;
635 let event_name = event_name.ok_or_else(|| {
636 RedisError::from((
637 redis::ErrorKind::TypeError,
638 "Missing required field: event_name",
639 ))
640 })?;
641 let metadata = metadata.ok_or_else(|| {
642 RedisError::from((
643 redis::ErrorKind::TypeError,
644 "Missing required field: metadata",
645 ))
646 })?;
647 let payload = payload.ok_or_else(|| {
648 RedisError::from((
649 redis::ErrorKind::TypeError,
650 "Missing required field: payload",
651 ))
652 })?;
653
654 Ok(Event {
655 event_id,
656 partition_key,
657 partition_id,
658 transaction_id,
659 partition_sequence,
660 stream_version,
661 timestamp,
662 stream_id,
663 event_name,
664 metadata,
665 payload,
666 })
667 }
668 _ => Err(RedisError::from((
669 redis::ErrorKind::TypeError,
670 "Event must be a Redis map",
671 ))),
672 }
673 }
674}
675
676#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
677pub enum RangeValue {
678 Start, Value(u64), End, }
682
683impl From<u64> for RangeValue {
684 fn from(value: u64) -> Self {
685 RangeValue::Value(value)
686 }
687}
688
689impl ToRedisArgs for RangeValue {
690 fn write_redis_args<W>(&self, out: &mut W)
691 where
692 W: ?Sized + RedisWrite,
693 {
694 match self {
695 RangeValue::Start => out.write_arg(b"-"),
696 RangeValue::End => out.write_arg(b"+"),
697 RangeValue::Value(v) => out.write_arg(v.to_string().as_bytes()),
698 }
699 }
700}
701
702#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
704pub struct MultiAppendInfo {
705 pub partition_key: Uuid,
706 pub partition_id: u16,
707 pub first_partition_sequence: u64,
708 pub last_partition_sequence: u64,
709 pub events: Vec<EventInfo>,
710}
711
712#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
714pub struct EventInfo {
715 pub event_id: Uuid,
716 pub partition_id: u16,
717 pub partition_sequence: u64,
718 pub stream_id: String,
719 pub stream_version: u64,
720 pub timestamp: SystemTime,
721}
722
723impl EventInfo {
724 fn from_redis_value_inner(value: &Value, is_multi: bool) -> RedisResult<Self> {
725 match value {
726 Value::Map(fields) => {
727 let mut event_id = None;
728 let mut partition_id = None;
729 let mut partition_sequence = None;
730 let mut stream_id = None;
731 let mut stream_version = None;
732 let mut timestamp = None;
733
734 for (key, val) in fields {
736 let field_name = match key {
737 Value::SimpleString(s) => s.as_str(),
738 _ => continue,
739 };
740
741 match field_name {
742 "event_id" => {
743 event_id = Some(parse_value!(Uuid, val, "event_id")?);
744 }
745 "partition_id" => {
746 partition_id = Some(match val {
747 Value::Int(n) => *n as u16,
748 _ => {
749 return Err(RedisError::from((
750 redis::ErrorKind::TypeError,
751 "partition_id must be an integer",
752 )));
753 }
754 });
755 }
756 "partition_sequence" => {
757 partition_sequence = Some(match val {
758 Value::Int(n) => *n as u64,
759 _ => {
760 return Err(RedisError::from((
761 redis::ErrorKind::TypeError,
762 "partition_sequence must be an integer",
763 )));
764 }
765 });
766 }
767 "stream_id" => {
768 stream_id = Some(parse_value!(String, val, "stream_id")?);
769 }
770 "stream_version" => {
771 stream_version = Some(match val {
772 Value::Int(n) => *n as u64,
773 _ => {
774 return Err(RedisError::from((
775 redis::ErrorKind::TypeError,
776 "stream_version must be an integer",
777 )));
778 }
779 });
780 }
781 "timestamp" => {
782 timestamp = Some(match val {
783 Value::Int(ms) => {
784 let duration = Duration::from_millis(*ms as u64);
785 UNIX_EPOCH + duration
786 }
787 _ => {
788 return Err(RedisError::from((
789 redis::ErrorKind::TypeError,
790 "timestamp must be an integer",
791 )));
792 }
793 });
794 }
795 _ => {} }
797 }
798
799 let event_id = event_id.ok_or_else(|| {
801 RedisError::from((
802 redis::ErrorKind::TypeError,
803 "Missing required field: event_id",
804 ))
805 })?;
806 let partition_id = partition_id.or(is_multi.then_some(0)).ok_or_else(|| {
807 RedisError::from((
808 redis::ErrorKind::TypeError,
809 "Missing required field: partition_id",
810 ))
811 })?;
812 let partition_sequence =
813 partition_sequence
814 .or(is_multi.then_some(0))
815 .ok_or_else(|| {
816 RedisError::from((
817 redis::ErrorKind::TypeError,
818 "Missing required field: partition_sequence",
819 ))
820 })?;
821 let stream_id = stream_id.ok_or_else(|| {
822 RedisError::from((
823 redis::ErrorKind::TypeError,
824 "Missing required field: stream_id",
825 ))
826 })?;
827 let stream_version = stream_version.ok_or_else(|| {
828 RedisError::from((
829 redis::ErrorKind::TypeError,
830 "Missing required field: stream_version",
831 ))
832 })?;
833 let timestamp = timestamp.ok_or_else(|| {
834 RedisError::from((
835 redis::ErrorKind::TypeError,
836 "Missing required field: timestamp",
837 ))
838 })?;
839
840 Ok(EventInfo {
841 event_id,
842 partition_id,
843 partition_sequence,
844 stream_id,
845 stream_version,
846 timestamp,
847 })
848 }
849 _ => Err(RedisError::from((
850 redis::ErrorKind::TypeError,
851 "Event info must be a Redis map",
852 ))),
853 }
854 }
855}
856
857impl FromRedisValue for EventInfo {
858 fn from_redis_value(value: &Value) -> RedisResult<Self> {
859 Self::from_redis_value_inner(value, false)
860 }
861}
862
863impl FromRedisValue for MultiAppendInfo {
864 fn from_redis_value(value: &Value) -> RedisResult<Self> {
865 match value {
866 Value::Map(fields) => {
867 let mut partition_key = None;
868 let mut partition_id = None;
869 let mut first_partition_sequence = None;
870 let mut last_partition_sequence = None;
871 let mut events = None;
872
873 for (key, val) in fields {
875 let field_name = match key {
876 Value::SimpleString(s) => s.as_str(),
877 _ => continue,
878 };
879
880 match field_name {
881 "partition_key" => {
882 partition_key = Some(parse_value!(Uuid, val, "partition_key")?);
883 }
884 "partition_id" => {
885 partition_id = Some(match val {
886 Value::Int(n) => *n as u16,
887 _ => {
888 return Err(RedisError::from((
889 redis::ErrorKind::TypeError,
890 "partition_id must be an integer",
891 )));
892 }
893 });
894 }
895 "first_partition_sequence" => {
896 first_partition_sequence = Some(match val {
897 Value::Int(n) => *n as u64,
898 _ => {
899 return Err(RedisError::from((
900 redis::ErrorKind::TypeError,
901 "first_partition_sequence must be an integer",
902 )));
903 }
904 });
905 }
906 "last_partition_sequence" => {
907 last_partition_sequence = Some(match val {
908 Value::Int(n) => *n as u64,
909 _ => {
910 return Err(RedisError::from((
911 redis::ErrorKind::TypeError,
912 "last_partition_sequence must be an integer",
913 )));
914 }
915 });
916 }
917 "events" => {
918 events = Some(match val {
919 Value::Array(event_values) => event_values
920 .iter()
921 .map(|value| EventInfo::from_redis_value_inner(value, true))
922 .collect::<Result<Vec<_>, _>>()?,
923 _ => {
924 return Err(RedisError::from((
925 redis::ErrorKind::TypeError,
926 "events must be an array",
927 )));
928 }
929 });
930 }
931 _ => {} }
933 }
934
935 let partition_key = partition_key.ok_or_else(|| {
937 RedisError::from((
938 redis::ErrorKind::TypeError,
939 "Missing required field: partition_key",
940 ))
941 })?;
942 let partition_id = partition_id.ok_or_else(|| {
943 RedisError::from((
944 redis::ErrorKind::TypeError,
945 "Missing required field: partition_id",
946 ))
947 })?;
948 let first_partition_sequence = first_partition_sequence.ok_or_else(|| {
949 RedisError::from((
950 redis::ErrorKind::TypeError,
951 "Missing required field: first_partition_sequence",
952 ))
953 })?;
954 let last_partition_sequence = last_partition_sequence.ok_or_else(|| {
955 RedisError::from((
956 redis::ErrorKind::TypeError,
957 "Missing required field: last_partition_sequence",
958 ))
959 })?;
960 let events = events
961 .ok_or_else(|| {
962 RedisError::from((
963 redis::ErrorKind::TypeError,
964 "Missing required field: events",
965 ))
966 })?
967 .into_iter()
968 .enumerate()
969 .map(|(i, mut event)| {
970 event.partition_id = partition_id;
971 event.partition_sequence = first_partition_sequence + i as u64;
972 event
973 })
974 .collect();
975
976 Ok(MultiAppendInfo {
977 partition_key,
978 partition_id,
979 first_partition_sequence,
980 last_partition_sequence,
981 events,
982 })
983 }
984 _ => Err(RedisError::from((
985 redis::ErrorKind::TypeError,
986 "Multi append info must be a Redis map",
987 ))),
988 }
989 }
990}
991
992#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
994pub struct SubscriptionInfo {
995 pub subscription_type: String,
997 pub channel: String,
999 pub active_subscriptions: i64,
1001}
1002
1003impl FromRedisValue for SubscriptionInfo {
1004 fn from_redis_value(value: &Value) -> RedisResult<Self> {
1005 match value {
1006 Value::Map(fields) => {
1007 let mut subscription_type = None;
1008 let mut channel = None;
1009 let mut active_subscriptions = None;
1010
1011 for (key, val) in fields {
1013 let field_name = match key {
1014 Value::SimpleString(s) => s.as_str(),
1015 _ => continue,
1016 };
1017
1018 match field_name {
1019 "subscription_type" => {
1020 subscription_type =
1021 Some(parse_value!(String, val, "subscription_type")?);
1022 }
1023 "channel" => {
1024 channel = Some(parse_value!(String, val, "channel")?);
1025 }
1026 "active_subscriptions" => {
1027 active_subscriptions = Some(match val {
1028 Value::Int(n) => *n,
1029 _ => {
1030 return Err(RedisError::from((
1031 redis::ErrorKind::TypeError,
1032 "active_subscriptions must be an integer",
1033 )));
1034 }
1035 });
1036 }
1037 _ => {} }
1039 }
1040
1041 let subscription_type = subscription_type.ok_or_else(|| {
1043 RedisError::from((
1044 redis::ErrorKind::TypeError,
1045 "Missing required field: subscription_type",
1046 ))
1047 })?;
1048 let channel = channel.ok_or_else(|| {
1049 RedisError::from((
1050 redis::ErrorKind::TypeError,
1051 "Missing required field: channel",
1052 ))
1053 })?;
1054 let active_subscriptions = active_subscriptions.ok_or_else(|| {
1055 RedisError::from((
1056 redis::ErrorKind::TypeError,
1057 "Missing required field: active_subscriptions",
1058 ))
1059 })?;
1060
1061 Ok(SubscriptionInfo {
1062 subscription_type,
1063 channel,
1064 active_subscriptions,
1065 })
1066 }
1067 _ => Err(RedisError::from((
1068 redis::ErrorKind::TypeError,
1069 "Subscription info must be a Redis map",
1070 ))),
1071 }
1072 }
1073}