1use redis::{
2 from_redis_value, FromRedisValue, RedisError, RedisResult, RedisWrite, ToRedisArgs, Value,
3};
4use std::collections::HashMap;
5use std::fmt::{Debug, Display};
6use std::str;
7
8#[derive(PartialEq, Eq, Clone, Debug, Copy)]
11pub enum TsAggregationType {
12 Avg(u64),
13 Sum(u64),
14 Min(u64),
15 Max(u64),
16 Range(u64),
17 Count(u64),
18 First(u64),
19 Last(u64),
20 StdP(u64),
21 StdS(u64),
22 VarP(u64),
23 VarS(u64),
24 Twa(u64),
25}
26
27impl ToRedisArgs for TsAggregationType {
28 fn write_redis_args<W>(&self, out: &mut W)
29 where
30 W: ?Sized + RedisWrite,
31 {
32 let (t, val) = match *self {
33 TsAggregationType::Avg(v) => ("avg", v),
34 TsAggregationType::Sum(v) => ("sum", v),
35 TsAggregationType::Min(v) => ("min", v),
36 TsAggregationType::Max(v) => ("max", v),
37 TsAggregationType::Range(v) => ("range", v),
38 TsAggregationType::Count(v) => ("count", v),
39 TsAggregationType::First(v) => ("first", v),
40 TsAggregationType::Last(v) => ("last", v),
41 TsAggregationType::StdP(v) => ("std.p", v),
42 TsAggregationType::StdS(v) => ("std.s", v),
43 TsAggregationType::VarP(v) => ("var.p", v),
44 TsAggregationType::VarS(v) => ("var.s", v),
45 TsAggregationType::Twa(v) => ("twa", v),
46 };
47
48 out.write_arg(b"AGGREGATION");
49 out.write_arg(t.as_bytes());
50 val.write_redis_args(out);
51 }
52}
53
54#[derive(PartialEq, Eq, Clone, Copy, Debug)]
60pub enum TsAlign {
61 Start,
62 End,
63 Ts(u64),
64}
65
66impl ToRedisArgs for TsAlign {
67 fn write_redis_args<W>(&self, out: &mut W)
68 where
69 W: ?Sized + RedisWrite,
70 {
71 out.write_arg(b"ALIGN");
72 match self {
73 TsAlign::Start => out.write_arg(b"-"),
74 TsAlign::End => out.write_arg(b"+"),
75 TsAlign::Ts(v) => v.write_redis_args(out),
76 }
77 }
78}
79
80#[derive(PartialEq, Eq, Clone, Debug, Copy)]
85pub enum TsBucketTimestamp {
86 Low,
87 High,
88 Mid,
89}
90
91impl ToRedisArgs for TsBucketTimestamp {
92 fn write_redis_args<W>(&self, out: &mut W)
93 where
94 W: ?Sized + RedisWrite,
95 {
96 out.write_arg(b"BUCKETTIMESTAMP");
97 match self {
98 TsBucketTimestamp::Low => out.write_arg(b"-"),
99 TsBucketTimestamp::High => out.write_arg(b"+"),
100 TsBucketTimestamp::Mid => out.write_arg(b"~"),
101 }
102 }
103}
104
105#[derive(Clone, Debug, Copy)]
106pub enum Integer {
107 Usize(usize),
108 U8(u8),
109 U16(u16),
110 U32(u32),
111 U64(u64),
112 Isize(isize),
113 I8(i8),
114 I16(i16),
115 I32(i32),
116 I64(i64),
117}
118
119impl ToRedisArgs for Integer {
120 fn write_redis_args<W>(&self, out: &mut W)
121 where
122 W: ?Sized + RedisWrite,
123 {
124 match self {
125 Integer::Usize(v) => v.write_redis_args(out),
126 Integer::U8(v) => v.write_redis_args(out),
127 Integer::U16(v) => v.write_redis_args(out),
128 Integer::U32(v) => v.write_redis_args(out),
129 Integer::U64(v) => v.write_redis_args(out),
130 Integer::Isize(v) => v.write_redis_args(out),
131 Integer::I8(v) => v.write_redis_args(out),
132 Integer::I16(v) => v.write_redis_args(out),
133 Integer::I32(v) => v.write_redis_args(out),
134 Integer::I64(v) => v.write_redis_args(out),
135 }
136 }
137}
138
139impl From<usize> for Integer {
140 fn from(value: usize) -> Self {
141 Integer::Usize(value)
142 }
143}
144
145impl From<u8> for Integer {
146 fn from(value: u8) -> Self {
147 Integer::U8(value)
148 }
149}
150
151impl From<u16> for Integer {
152 fn from(value: u16) -> Self {
153 Integer::U16(value)
154 }
155}
156
157impl From<u32> for Integer {
158 fn from(value: u32) -> Self {
159 Integer::U32(value)
160 }
161}
162
163impl From<u64> for Integer {
164 fn from(value: u64) -> Self {
165 Integer::U64(value)
166 }
167}
168
169impl From<isize> for Integer {
170 fn from(value: isize) -> Self {
171 Integer::Isize(value)
172 }
173}
174
175impl From<i8> for Integer {
176 fn from(value: i8) -> Self {
177 Integer::I8(value)
178 }
179}
180
181impl From<i16> for Integer {
182 fn from(value: i16) -> Self {
183 Integer::I16(value)
184 }
185}
186
187impl From<i32> for Integer {
188 fn from(value: i32) -> Self {
189 Integer::I32(value)
190 }
191}
192
193impl From<i64> for Integer {
194 fn from(value: i64) -> Self {
195 Integer::I64(value)
196 }
197}
198
199#[derive(Default, Debug, Clone)]
214pub struct TsRangeQuery {
215 from: Option<Integer>,
216 to: Option<Integer>,
217 latest: bool,
218 filter_by_ts: Vec<Integer>,
219 filter_by_value: Option<(f64, f64)>,
220 count: Option<u64>,
221 align: Option<TsAlign>,
222 aggregation_type: Option<TsAggregationType>,
223 bucket_timestamp: Option<TsBucketTimestamp>,
224 empty: bool,
225}
226
227impl TsRangeQuery {
228 pub fn from<T: Into<Integer>>(mut self, from: T) -> Self {
231 self.from = Some(Into::into(from));
232 self
233 }
234
235 pub fn to<T: Into<Integer>>(mut self, to: T) -> Self {
238 self.to = Some(Into::into(to));
239 self
240 }
241
242 pub fn latest(mut self, latest: bool) -> Self {
244 self.latest = latest;
245 self
246 }
247
248 pub fn filter_by_ts<T: Into<Integer>>(mut self, ts: Vec<T>) -> Self {
251 self.filter_by_ts = ts.into_iter().map(|v| Into::into(v)).collect();
252 self
253 }
254
255 pub fn filter_by_value(mut self, min: f64, max: f64) -> Self {
257 self.filter_by_value = Some((min, max));
258 self
259 }
260
261 pub fn count(mut self, count: u64) -> Self {
263 self.count = Some(count);
264 self
265 }
266
267 pub fn align(mut self, align: TsAlign) -> Self {
270 self.align = Some(align);
271 self
272 }
273
274 pub fn aggregation_type(mut self, aggregation_type: TsAggregationType) -> Self {
276 self.aggregation_type = Some(aggregation_type);
277 self
278 }
279
280 pub fn bucket_timestamp(mut self, bucket_timestamp: TsBucketTimestamp) -> Self {
283 self.bucket_timestamp = Some(bucket_timestamp);
284 self
285 }
286
287 pub fn empty(mut self, empty: bool) -> Self {
289 self.empty = empty;
290 self
291 }
292}
293
294impl ToRedisArgs for TsRangeQuery {
295 fn write_redis_args<W>(&self, out: &mut W)
296 where
297 W: ?Sized + RedisWrite,
298 {
299 if let Some(ref from) = self.from {
300 from.write_redis_args(out);
301 } else {
302 out.write_arg(b"-");
303 }
304
305 if let Some(ref to) = self.to {
306 to.write_redis_args(out);
307 } else {
308 out.write_arg(b"+");
309 }
310
311 if self.latest {
312 out.write_arg(b"LATEST");
313 }
314
315 if !self.filter_by_ts.is_empty() {
316 out.write_arg(b"FILTER_BY_TS");
317 for ts in self.filter_by_ts.iter() {
318 ts.write_redis_args(out);
319 }
320 }
321
322 if let Some((min, max)) = self.filter_by_value {
323 out.write_arg(b"FILTER_BY_VALUE");
324 min.write_redis_args(out);
325 max.write_redis_args(out);
326 }
327
328 if let Some(count) = self.count {
329 out.write_arg(b"COUNT");
330 count.write_redis_args(out);
331 }
332
333 if let Some(ref agg) = self.aggregation_type {
334 if let Some(ref align) = self.align {
335 align.write_redis_args(out);
336 }
337
338 agg.write_redis_args(out);
339
340 if let Some(ref bkt_ts) = self.bucket_timestamp {
341 bkt_ts.write_redis_args(out);
342 }
343
344 if self.empty {
345 out.write_arg(b"EMPTY")
346 }
347 }
348 }
349}
350
351#[derive(PartialEq, Eq, Clone, Debug)]
357pub enum TsDuplicatePolicy {
358 Block,
359 First,
360 Last,
361 Min,
362 Max,
363 Other(String),
364}
365
366impl ToRedisArgs for TsDuplicatePolicy {
367 fn write_redis_args<W>(&self, out: &mut W)
368 where
369 W: ?Sized + RedisWrite,
370 {
371 let policy = match self {
372 TsDuplicatePolicy::Block => "BLOCK",
373 TsDuplicatePolicy::First => "FIRST",
374 TsDuplicatePolicy::Last => "LAST",
375 TsDuplicatePolicy::Min => "MIN",
376 TsDuplicatePolicy::Max => "MAX",
377 TsDuplicatePolicy::Other(v) => v.as_str(),
378 };
379 out.write_arg(b"DUPLICATE_POLICY");
380 out.write_arg(policy.as_bytes());
381 }
382}
383
384impl FromRedisValue for TsDuplicatePolicy {
385 fn from_redis_value(v: &Value) -> RedisResult<Self> {
386 let string: String = from_redis_value(v)?;
387 let res = match string.as_str() {
388 "block" => TsDuplicatePolicy::Block,
389 "first" => TsDuplicatePolicy::First,
390 "last" => TsDuplicatePolicy::Last,
391 "min" => TsDuplicatePolicy::Min,
392 "max" => TsDuplicatePolicy::Max,
393 v => TsDuplicatePolicy::Other(v.to_string()),
394 };
395 Ok(res)
396 }
397}
398
399#[derive(Default, Debug, Clone)]
403pub struct TsOptions {
404 retention_time: Option<u64>,
405 uncompressed: bool,
406 labels: Option<Vec<Vec<u8>>>,
407 duplicate_policy: Option<TsDuplicatePolicy>,
408 chunk_size: Option<u64>,
409}
410
411impl TsOptions {
428 pub fn retention_time(mut self, time: u64) -> Self {
430 self.retention_time = Some(time);
431 self
432 }
433
434 pub fn uncompressed(mut self, value: bool) -> Self {
438 self.uncompressed = value;
439 self
440 }
441
442 pub fn labels(mut self, labels: Vec<(&str, &str)>) -> Self {
446 if !labels.is_empty() {
447 self.labels = Some(ToRedisArgs::to_redis_args(&labels));
448 } else {
449 self.labels = None;
450 }
451 self
452 }
453
454 pub fn label(mut self, name: &str, value: &str) -> Self {
456 let mut l = ToRedisArgs::to_redis_args(&vec![(name, value)]);
457 let mut res: Vec<Vec<u8>> = vec![];
458 if let Some(mut cur) = self.labels {
459 res.append(&mut cur);
460 }
461 res.append(&mut l);
462 self.labels = Some(res);
463 self
464 }
465
466 pub fn duplicate_policy(mut self, policy: TsDuplicatePolicy) -> Self {
468 self.duplicate_policy = Some(policy);
469 self
470 }
471
472 pub fn chunk_size(mut self, size: u64) -> Self {
474 self.chunk_size = Some(size);
475 self
476 }
477}
478
479impl ToRedisArgs for TsOptions {
480 fn write_redis_args<W>(&self, out: &mut W)
481 where
482 W: ?Sized + RedisWrite,
483 {
484 if let Some(ref rt) = self.retention_time {
485 out.write_arg(b"RETENTION");
486 out.write_arg(format!("{rt}").as_bytes());
487 }
488
489 if self.uncompressed {
490 out.write_arg(b"UNCOMPRESSED");
491 }
492
493 if let Some(ref policy) = self.duplicate_policy {
494 policy.write_redis_args(out);
495 }
496
497 if let Some(ref alloc) = self.chunk_size {
498 out.write_arg(b"CHUNK_SIZE");
499 out.write_arg(format!("{alloc}").as_bytes());
500 }
501
502 if let Some(ref l) = self.labels {
503 out.write_arg(b"LABELS");
504 for arg in l {
505 out.write_arg(arg);
506 }
507 }
508 }
509}
510
511#[derive(Debug, Default, Clone)]
514pub struct TsFilterOptions {
515 with_labels: bool,
516 filters: Vec<TsFilter>,
517}
518
519impl TsFilterOptions {
536 pub fn with_labels(mut self, value: bool) -> Self {
539 self.with_labels = value;
540 self
541 }
542
543 pub fn equals<L: Display + ToRedisArgs, V: Display + ToRedisArgs>(
545 mut self,
546 name: L,
547 value: V,
548 ) -> Self {
549 self.filters.push(TsFilter {
550 name: format!("{name}"),
551 value: format!("{value}"),
552 compare: TsCompare::Eq,
553 });
554 self
555 }
556
557 pub fn not_equals<L: Display + ToRedisArgs, V: Display + ToRedisArgs>(
559 mut self,
560 name: L,
561 value: V,
562 ) -> Self {
563 self.filters.push(TsFilter {
564 name: format!("{name}"),
565 value: format!("{value}"),
566 compare: TsCompare::NotEq,
567 });
568 self
569 }
570
571 pub fn in_set<L: Display + ToRedisArgs, V: Display + ToRedisArgs>(
573 mut self,
574 name: L,
575 values: Vec<V>,
576 ) -> Self {
577 let set = format!(
578 "({})",
579 values
580 .iter()
581 .map(|v| { format!("{v}") })
582 .collect::<Vec<String>>()
583 .join(",")
584 );
585 self.filters.push(TsFilter {
586 name: format!("{name}"),
587 value: set,
588 compare: TsCompare::Eq,
589 });
590 self
591 }
592
593 pub fn not_in_set<L: Display + ToRedisArgs, V: Display + ToRedisArgs>(
595 mut self,
596 name: L,
597 values: Vec<V>,
598 ) -> Self {
599 let set = format!(
600 "({})",
601 values
602 .iter()
603 .map(|v| { format!("{v}") })
604 .collect::<Vec<String>>()
605 .join(",")
606 );
607 self.filters.push(TsFilter {
608 name: format!("{name}"),
609 value: set,
610 compare: TsCompare::NotEq,
611 });
612 self
613 }
614
615 pub fn has_label<L: Display + ToRedisArgs>(mut self, name: L) -> Self {
617 self.filters.push(TsFilter {
618 name: format!("{name}"),
619 value: "".to_string(),
620 compare: TsCompare::NotEq,
621 });
622 self
623 }
624
625 pub fn not_has_label<L: Display + ToRedisArgs>(mut self, name: L) -> Self {
627 self.filters.push(TsFilter {
628 name: format!("{name}"),
629 value: "".to_string(),
630 compare: TsCompare::Eq,
631 });
632 self
633 }
634
635 pub fn get_filters(self) -> Vec<TsFilter> {
636 self.filters
637 }
638}
639
640impl ToRedisArgs for TsFilterOptions {
641 fn write_redis_args<W>(&self, out: &mut W)
642 where
643 W: ?Sized + RedisWrite,
644 {
645 if self.with_labels {
646 out.write_arg(b"WITHLABELS");
647 }
648 out.write_arg(b"FILTER");
649
650 for f in self.filters.iter() {
651 f.write_redis_args(out)
652 }
653 }
654}
655
656#[derive(Debug, Default)]
658pub struct TsInfo {
659 pub total_samples: u64,
660 pub memory_usage: u64,
661 pub first_timestamp: u64,
662 pub last_timestamp: u64,
663 pub retention_time: u64,
664 pub chunk_count: u64,
665 pub max_samples_per_chunk: u16,
666 pub chunk_size: u64,
667 pub duplicate_policy: Option<TsDuplicatePolicy>,
668 pub labels: Vec<(String, String)>,
669 pub source_key: Option<String>,
670 pub rules: Vec<(String, u64, String)>,
671}
672
673impl FromRedisValue for TsInfo {
674 fn from_redis_value(v: &Value) -> RedisResult<Self> {
675 match *v {
676 Value::Bulk(ref values) => {
677 let mut result = TsInfo::default();
678 let mut map: HashMap<String, Value> = HashMap::new();
679
680 for pair in values.chunks(2) {
681 map.insert(from_redis_value(&pair[0])?, pair[1].clone());
682 }
683
684 if let Some(v) = map.get("totalSamples") {
687 result.total_samples = from_redis_value(v)?;
688 }
689
690 if let Some(v) = map.get("memoryUsage") {
691 result.memory_usage = from_redis_value(v)?;
692 }
693
694 if let Some(v) = map.get("firstTimestamp") {
695 result.first_timestamp = from_redis_value(v)?;
696 }
697
698 if let Some(v) = map.get("lastTimestamp") {
699 result.last_timestamp = from_redis_value(v)?;
700 }
701
702 if let Some(v) = map.get("retentionTime") {
703 result.retention_time = from_redis_value(v)?;
704 }
705
706 if let Some(v) = map.get("chunkCount") {
707 result.chunk_count = from_redis_value(v)?;
708 }
709
710 if let Some(v) = map.get("maxSamplesPerChunk") {
711 result.max_samples_per_chunk = from_redis_value(v)?;
712 }
713
714 if let Some(v) = map.get("chunkSize") {
715 result.chunk_size = from_redis_value(v)?;
716 }
717
718 if let Some(v) = map.get("sourceKey") {
719 result.source_key = from_redis_value(v)?;
720 }
721
722 if let Some(v) = map.get("duplicatePolicy") {
723 result.duplicate_policy = from_redis_value(v)?;
724 }
725
726 result.rules = match map.get("rules") {
727 Some(Value::Bulk(ref values)) => values
728 .iter()
729 .flat_map(|value| match value {
730 Value::Bulk(ref vs) => Some((
731 from_redis_value(&vs[0]).unwrap(),
732 from_redis_value(&vs[1]).unwrap(),
733 from_redis_value(&vs[2]).unwrap(),
734 )),
735 _ => None,
736 })
737 .collect(),
738 _ => vec![],
739 };
740
741 result.labels = match map.get("labels") {
742 Some(Value::Bulk(ref values)) => values
743 .iter()
744 .flat_map(|value| match value {
745 Value::Bulk(ref vs) => Some((
746 from_redis_value(&vs[0]).unwrap(),
747 from_redis_value(&vs[1]).unwrap(),
748 )),
749 _ => None,
750 })
751 .collect(),
752 _ => vec![],
753 };
754
755 Ok(result)
756 }
757 _ => Err(RedisError::from(std::io::Error::new(
758 std::io::ErrorKind::Other,
759 "no_ts_info_data",
760 ))),
761 }
762 }
763}
764
765#[derive(Debug)]
768pub struct TsMget<TS: FromRedisValue, V: FromRedisValue> {
769 pub values: Vec<TsMgetEntry<TS, V>>,
770}
771
772impl<TS: Default + FromRedisValue, V: Default + FromRedisValue> FromRedisValue for TsMget<TS, V> {
773 fn from_redis_value(v: &Value) -> RedisResult<Self> {
774 let res = match *v {
775 Value::Bulk(ref values) => TsMget {
776 values: FromRedisValue::from_redis_values(values)?,
777 },
778 _ => TsMget { values: vec![] },
779 };
780 Ok(res)
781 }
782}
783
784#[derive(Debug, Default)]
787pub struct TsMgetEntry<TS: FromRedisValue, V: FromRedisValue> {
788 pub key: String,
789 pub labels: Vec<(String, String)>,
790 pub value: Option<(TS, V)>,
791}
792
793impl<TS: Default + FromRedisValue, V: Default + FromRedisValue> FromRedisValue
794 for TsMgetEntry<TS, V>
795{
796 fn from_redis_value(v: &Value) -> RedisResult<Self> {
797 match *v {
798 Value::Bulk(ref values) => {
799 let result = TsMgetEntry::<TS, V> {
800 key: from_redis_value(&values[0])?,
801 labels: match values[1] {
802 Value::Bulk(ref vs) => vs
803 .iter()
804 .flat_map(|value| match value {
805 Value::Bulk(ref v) => Some((
806 from_redis_value(&v[0]).unwrap(),
807 from_redis_value(&v[1]).unwrap(),
808 )),
809 _ => None,
810 })
811 .collect(),
812 _ => vec![],
813 },
814 value: match values[2] {
815 Value::Bulk(ref vs) if !vs.is_empty() => Some((
816 from_redis_value(&vs[0]).unwrap(),
817 from_redis_value(&vs[1]).unwrap(),
818 )),
819 _ => None,
820 },
821 };
822
823 Ok(result)
824 }
825 _ => Err(RedisError::from(std::io::Error::new(
826 std::io::ErrorKind::Other,
827 "no_mget_data",
828 ))),
829 }
830 }
831}
832
833#[derive(Debug)]
836pub struct TsRange<TS: FromRedisValue + Copy, V: FromRedisValue + Copy> {
837 pub values: Vec<(TS, V)>,
838}
839
840impl<TS: FromRedisValue + Copy, V: FromRedisValue + Copy> FromRedisValue for TsRange<TS, V> {
841 fn from_redis_value(v: &Value) -> RedisResult<Self> {
842 match *v {
843 Value::Bulk(ref values) => {
844 let items: Vec<TsValueReply<TS, V>> = FromRedisValue::from_redis_values(values)?;
845 Ok(TsRange {
846 values: items.iter().map(|i| (i.ts, i.value)).collect(),
847 })
848 }
849 _ => Err(RedisError::from(std::io::Error::new(
850 std::io::ErrorKind::Other,
851 "no_range_data",
852 ))),
853 }
854 }
855}
856
857#[derive(Debug)]
860pub struct TsMrange<TS: FromRedisValue + Copy, V: FromRedisValue + Copy> {
861 pub values: Vec<TsMrangeEntry<TS, V>>,
862}
863
864impl<TS: Default + FromRedisValue + Copy, V: Default + FromRedisValue + Copy> FromRedisValue
865 for TsMrange<TS, V>
866{
867 fn from_redis_value(v: &Value) -> RedisResult<Self> {
868 let res = match *v {
869 Value::Bulk(ref values) => TsMrange {
870 values: FromRedisValue::from_redis_values(values)?,
871 },
872 _ => TsMrange { values: vec![] },
873 };
874 Ok(res)
875 }
876}
877
878#[derive(Debug, Default)]
881pub struct TsMrangeEntry<TS: FromRedisValue + Copy, V: FromRedisValue + Copy> {
882 pub key: String,
883 pub labels: Vec<(String, String)>,
884 pub values: Vec<(TS, V)>,
885}
886
887impl<TS: Default + FromRedisValue + Copy, V: Default + FromRedisValue + Copy> FromRedisValue
888 for TsMrangeEntry<TS, V>
889{
890 fn from_redis_value(v: &Value) -> RedisResult<Self> {
891 match *v {
892 Value::Bulk(ref values) => {
893 let result = TsMrangeEntry::<TS, V> {
894 key: from_redis_value(&values[0]).unwrap(),
895 labels: match values[1] {
896 Value::Bulk(ref vs) => vs
897 .iter()
898 .flat_map(|value| match value {
899 Value::Bulk(ref v) => Some((
900 from_redis_value(&v[0]).unwrap(),
901 from_redis_value(&v[1]).unwrap(),
902 )),
903 _ => None,
904 })
905 .collect(),
906 _ => vec![],
907 },
908 values: match values[2] {
909 Value::Bulk(ref vs) => {
910 let items: Vec<TsValueReply<TS, V>> =
911 FromRedisValue::from_redis_values(vs)?;
912 items.iter().map(|i| (i.ts, i.value)).collect()
913 }
914 _ => vec![],
915 },
916 };
917
918 Ok(result)
919 }
920 _ => Err(RedisError::from(std::io::Error::new(
921 std::io::ErrorKind::Other,
922 "no_mget_data",
923 ))),
924 }
925 }
926}
927
928#[derive(Debug)]
929struct TsValueReply<TS: FromRedisValue, V: FromRedisValue> {
930 pub ts: TS,
931 pub value: V,
932}
933
934impl<TS: FromRedisValue, V: FromRedisValue> FromRedisValue for TsValueReply<TS, V> {
935 fn from_redis_value(v: &Value) -> RedisResult<Self> {
936 match *v {
937 Value::Bulk(ref values) => Ok(TsValueReply {
938 ts: from_redis_value(&values[0]).unwrap(),
939 value: from_redis_value(&values[1]).unwrap(),
940 }),
941 _ => Err(RedisError::from(std::io::Error::new(
942 std::io::ErrorKind::Other,
943 "no_value_data",
944 ))),
945 }
946 }
947}
948
949#[derive(PartialEq, Eq, Clone, Debug, Copy)]
950enum TsCompare {
951 Eq,
952 NotEq,
953}
954
955impl ToRedisArgs for TsCompare {
956 fn write_redis_args<W>(&self, out: &mut W)
957 where
958 W: ?Sized + RedisWrite,
959 {
960 let val = match *self {
961 TsCompare::Eq => "=",
962 TsCompare::NotEq => "!=",
963 };
964
965 val.write_redis_args(out);
966 }
967}
968
969#[derive(Debug, Clone)]
970pub struct TsFilter {
971 name: String,
972 value: String,
973 compare: TsCompare,
974}
975
976impl ToRedisArgs for TsFilter {
977 fn write_redis_args<W>(&self, out: &mut W)
978 where
979 W: ?Sized + RedisWrite,
980 {
981 let comp = match self.compare {
982 TsCompare::Eq => "=",
983 TsCompare::NotEq => "!=",
984 };
985
986 let arg = format!("{}{}{}", self.name, comp, self.value);
987 out.write_arg(arg.as_bytes());
988 }
989}