redis_ts/
types.rs

1use redis::{
2    from_redis_value, FromRedisValue, RedisError, RedisResult, RedisWrite, ToRedisArgs, Value,
3};
4use std::collections::HashMap;
5use std::fmt::{Debug, Display};
6use std::str;
7
8/// Allows you to specify a redis time series aggreation with a time
9/// bucket.
10#[derive(PartialEq, Eq, Clone, Debug, Copy)]
11pub enum TsAggregationType {
12    Avg(u64),
13    Sum(u64),
14    Min(u64),
15    Max(u64),
16    Range(u64),
17    Count(u64),
18    First(u64),
19    Last(u64),
20    StdP(u64),
21    StdS(u64),
22    VarP(u64),
23    VarS(u64),
24    Twa(u64),
25}
26
27impl ToRedisArgs for TsAggregationType {
28    fn write_redis_args<W>(&self, out: &mut W)
29    where
30        W: ?Sized + RedisWrite,
31    {
32        let (t, val) = match *self {
33            TsAggregationType::Avg(v) => ("avg", v),
34            TsAggregationType::Sum(v) => ("sum", v),
35            TsAggregationType::Min(v) => ("min", v),
36            TsAggregationType::Max(v) => ("max", v),
37            TsAggregationType::Range(v) => ("range", v),
38            TsAggregationType::Count(v) => ("count", v),
39            TsAggregationType::First(v) => ("first", v),
40            TsAggregationType::Last(v) => ("last", v),
41            TsAggregationType::StdP(v) => ("std.p", v),
42            TsAggregationType::StdS(v) => ("std.s", v),
43            TsAggregationType::VarP(v) => ("var.p", v),
44            TsAggregationType::VarS(v) => ("var.s", v),
45            TsAggregationType::Twa(v) => ("twa", v),
46        };
47
48        out.write_arg(b"AGGREGATION");
49        out.write_arg(t.as_bytes());
50        val.write_redis_args(out);
51    }
52}
53
54///A time bucket alignment control for AGGREGATION. It controls the time bucket
55/// timestamps by changing the reference timestamp on which a bucket is defined.
56/// - Start: The reference timestamp will be the query start interval time.
57/// - End: The reference timestamp will be the query end interval time.
58/// - Ts(time): A specific timestamp: align the reference timestamp to a specific time.
59#[derive(PartialEq, Eq, Clone, Copy, Debug)]
60pub enum TsAlign {
61    Start,
62    End,
63    Ts(u64),
64}
65
66impl ToRedisArgs for TsAlign {
67    fn write_redis_args<W>(&self, out: &mut W)
68    where
69        W: ?Sized + RedisWrite,
70    {
71        out.write_arg(b"ALIGN");
72        match self {
73            TsAlign::Start => out.write_arg(b"-"),
74            TsAlign::End => out.write_arg(b"+"),
75            TsAlign::Ts(v) => v.write_redis_args(out),
76        }
77    }
78}
79
80/// Bucket timestamp controls how bucket timestamps are reported.
81/// - Low: the bucket's start time (default).
82/// - High: the bucket's end time.
83/// - Mid: the bucket's mid time (rounded down if not an integer).
84#[derive(PartialEq, Eq, Clone, Debug, Copy)]
85pub enum TsBucketTimestamp {
86    Low,
87    High,
88    Mid,
89}
90
91impl ToRedisArgs for TsBucketTimestamp {
92    fn write_redis_args<W>(&self, out: &mut W)
93    where
94        W: ?Sized + RedisWrite,
95    {
96        out.write_arg(b"BUCKETTIMESTAMP");
97        match self {
98            TsBucketTimestamp::Low => out.write_arg(b"-"),
99            TsBucketTimestamp::High => out.write_arg(b"+"),
100            TsBucketTimestamp::Mid => out.write_arg(b"~"),
101        }
102    }
103}
104
105#[derive(Clone, Debug, Copy)]
106pub enum Integer {
107    Usize(usize),
108    U8(u8),
109    U16(u16),
110    U32(u32),
111    U64(u64),
112    Isize(isize),
113    I8(i8),
114    I16(i16),
115    I32(i32),
116    I64(i64),
117}
118
119impl ToRedisArgs for Integer {
120    fn write_redis_args<W>(&self, out: &mut W)
121    where
122        W: ?Sized + RedisWrite,
123    {
124        match self {
125            Integer::Usize(v) => v.write_redis_args(out),
126            Integer::U8(v) => v.write_redis_args(out),
127            Integer::U16(v) => v.write_redis_args(out),
128            Integer::U32(v) => v.write_redis_args(out),
129            Integer::U64(v) => v.write_redis_args(out),
130            Integer::Isize(v) => v.write_redis_args(out),
131            Integer::I8(v) => v.write_redis_args(out),
132            Integer::I16(v) => v.write_redis_args(out),
133            Integer::I32(v) => v.write_redis_args(out),
134            Integer::I64(v) => v.write_redis_args(out),
135        }
136    }
137}
138
139impl From<usize> for Integer {
140    fn from(value: usize) -> Self {
141        Integer::Usize(value)
142    }
143}
144
145impl From<u8> for Integer {
146    fn from(value: u8) -> Self {
147        Integer::U8(value)
148    }
149}
150
151impl From<u16> for Integer {
152    fn from(value: u16) -> Self {
153        Integer::U16(value)
154    }
155}
156
157impl From<u32> for Integer {
158    fn from(value: u32) -> Self {
159        Integer::U32(value)
160    }
161}
162
163impl From<u64> for Integer {
164    fn from(value: u64) -> Self {
165        Integer::U64(value)
166    }
167}
168
169impl From<isize> for Integer {
170    fn from(value: isize) -> Self {
171        Integer::Isize(value)
172    }
173}
174
175impl From<i8> for Integer {
176    fn from(value: i8) -> Self {
177        Integer::I8(value)
178    }
179}
180
181impl From<i16> for Integer {
182    fn from(value: i16) -> Self {
183        Integer::I16(value)
184    }
185}
186
187impl From<i32> for Integer {
188    fn from(value: i32) -> Self {
189        Integer::I32(value)
190    }
191}
192
193impl From<i64> for Integer {
194    fn from(value: i64) -> Self {
195        Integer::I64(value)
196    }
197}
198
199/// Let's you build a ts range query with all options via a builder pattern:
200///
201/// ```rust
202/// use redis_ts::{TsAggregationType, TsBucketTimestamp, TsRangeQuery};
203/// let query = TsRangeQuery::default()
204///     .from(1234)
205///     .to(5678)
206///     .latest(true)
207///     .filter_by_value(1.0, 5.0)
208///     .aggregation_type(TsAggregationType::Avg(5000))
209///     .bucket_timestamp(TsBucketTimestamp::High)
210///     .empty(true);
211/// ```
212///
213#[derive(Default, Debug, Clone)]
214pub struct TsRangeQuery {
215    from: Option<Integer>,
216    to: Option<Integer>,
217    latest: bool,
218    filter_by_ts: Vec<Integer>,
219    filter_by_value: Option<(f64, f64)>,
220    count: Option<u64>,
221    align: Option<TsAlign>,
222    aggregation_type: Option<TsAggregationType>,
223    bucket_timestamp: Option<TsBucketTimestamp>,
224    empty: bool,
225}
226
227impl TsRangeQuery {
228    /// Start timestamp of the series to query. Defaults to '-' (earliest sample)
229    /// if left empty.
230    pub fn from<T: Into<Integer>>(mut self, from: T) -> Self {
231        self.from = Some(Into::into(from));
232        self
233    }
234
235    /// End timestamp of the series to query. Defaults to '+' (latest sample)
236    /// if left empty.
237    pub fn to<T: Into<Integer>>(mut self, to: T) -> Self {
238        self.to = Some(Into::into(to));
239        self
240    }
241
242    /// Will enable the LATEST flag on the query.
243    pub fn latest(mut self, latest: bool) -> Self {
244        self.latest = latest;
245        self
246    }
247
248    /// Will enable the FILTER_BY_TS option with given timestamps. Will only
249    /// be added if the given Vec contains any ts values.
250    pub fn filter_by_ts<T: Into<Integer>>(mut self, ts: Vec<T>) -> Self {
251        self.filter_by_ts = ts.into_iter().map(|v| Into::into(v)).collect();
252        self
253    }
254
255    /// Will enable the FILTER_BY_VALUE option with given min and max values.
256    pub fn filter_by_value(mut self, min: f64, max: f64) -> Self {
257        self.filter_by_value = Some((min, max));
258        self
259    }
260
261    /// Determines the max amount of returned samples.
262    pub fn count(mut self, count: u64) -> Self {
263        self.count = Some(count);
264        self
265    }
266
267    /// Controls the aggregation alignment. Will only be added if the query actually
268    /// contains aggregation params.
269    pub fn align(mut self, align: TsAlign) -> Self {
270        self.align = Some(align);
271        self
272    }
273
274    /// The type of aggregation to be performed on the series.
275    pub fn aggregation_type(mut self, aggregation_type: TsAggregationType) -> Self {
276        self.aggregation_type = Some(aggregation_type);
277        self
278    }
279
280    /// Controls reporting of aggregation bucket timestamps. Will only be added if the
281    /// query actually contains aggregation params.
282    pub fn bucket_timestamp(mut self, bucket_timestamp: TsBucketTimestamp) -> Self {
283        self.bucket_timestamp = Some(bucket_timestamp);
284        self
285    }
286
287    /// Enables the EMPTY flag on the query.
288    pub fn empty(mut self, empty: bool) -> Self {
289        self.empty = empty;
290        self
291    }
292}
293
294impl ToRedisArgs for TsRangeQuery {
295    fn write_redis_args<W>(&self, out: &mut W)
296    where
297        W: ?Sized + RedisWrite,
298    {
299        if let Some(ref from) = self.from {
300            from.write_redis_args(out);
301        } else {
302            out.write_arg(b"-");
303        }
304
305        if let Some(ref to) = self.to {
306            to.write_redis_args(out);
307        } else {
308            out.write_arg(b"+");
309        }
310
311        if self.latest {
312            out.write_arg(b"LATEST");
313        }
314
315        if !self.filter_by_ts.is_empty() {
316            out.write_arg(b"FILTER_BY_TS");
317            for ts in self.filter_by_ts.iter() {
318                ts.write_redis_args(out);
319            }
320        }
321
322        if let Some((min, max)) = self.filter_by_value {
323            out.write_arg(b"FILTER_BY_VALUE");
324            min.write_redis_args(out);
325            max.write_redis_args(out);
326        }
327
328        if let Some(count) = self.count {
329            out.write_arg(b"COUNT");
330            count.write_redis_args(out);
331        }
332
333        if let Some(ref agg) = self.aggregation_type {
334            if let Some(ref align) = self.align {
335                align.write_redis_args(out);
336            }
337
338            agg.write_redis_args(out);
339
340            if let Some(ref bkt_ts) = self.bucket_timestamp {
341                bkt_ts.write_redis_args(out);
342            }
343
344            if self.empty {
345                out.write_arg(b"EMPTY")
346            }
347        }
348    }
349}
350
351/// Different options for handling inserts of duplicate values. Block
352/// is the behaviour redis time series was using before preventing all
353/// inserts of values older or equal to latest value in series. Fist
354/// will simply ignore the new value (as opposed to returning an error),
355/// Last will use the new value, Min the lower and Max the higher value.
356#[derive(PartialEq, Eq, Clone, Debug)]
357pub enum TsDuplicatePolicy {
358    Block,
359    First,
360    Last,
361    Min,
362    Max,
363    Other(String),
364}
365
366impl ToRedisArgs for TsDuplicatePolicy {
367    fn write_redis_args<W>(&self, out: &mut W)
368    where
369        W: ?Sized + RedisWrite,
370    {
371        let policy = match self {
372            TsDuplicatePolicy::Block => "BLOCK",
373            TsDuplicatePolicy::First => "FIRST",
374            TsDuplicatePolicy::Last => "LAST",
375            TsDuplicatePolicy::Min => "MIN",
376            TsDuplicatePolicy::Max => "MAX",
377            TsDuplicatePolicy::Other(v) => v.as_str(),
378        };
379        out.write_arg(b"DUPLICATE_POLICY");
380        out.write_arg(policy.as_bytes());
381    }
382}
383
384impl FromRedisValue for TsDuplicatePolicy {
385    fn from_redis_value(v: &Value) -> RedisResult<Self> {
386        let string: String = from_redis_value(v)?;
387        let res = match string.as_str() {
388            "block" => TsDuplicatePolicy::Block,
389            "first" => TsDuplicatePolicy::First,
390            "last" => TsDuplicatePolicy::Last,
391            "min" => TsDuplicatePolicy::Min,
392            "max" => TsDuplicatePolicy::Max,
393            v => TsDuplicatePolicy::Other(v.to_string()),
394        };
395        Ok(res)
396    }
397}
398
399/// Options for a redis time series key. Can be used in multiple redis
400/// time series calls (CREATE, ALTER, ADD, ...). The uncompressed option
401/// will only be respected in a TS.CREATE.
402#[derive(Default, Debug, Clone)]
403pub struct TsOptions {
404    retention_time: Option<u64>,
405    uncompressed: bool,
406    labels: Option<Vec<Vec<u8>>>,
407    duplicate_policy: Option<TsDuplicatePolicy>,
408    chunk_size: Option<u64>,
409}
410
411/// TsOptions allows you to build up your redis time series configuration. It
412/// supports default and a builder pattern so you can use it the following way:
413///
414/// ```rust
415/// use redis_ts::TsOptions;
416/// use redis_ts::TsDuplicatePolicy;
417///
418/// let opts:TsOptions = TsOptions::default()
419///     .retention_time(60000)
420///     .uncompressed(false)
421///     .chunk_size(16000)
422///     .duplicate_policy(TsDuplicatePolicy::Last)
423///     .label("label_1", "value_1")
424///     .label("label_2", "value_2");
425/// ```
426///
427impl TsOptions {
428    /// Specifies the retention time in millis for this time series options.
429    pub fn retention_time(mut self, time: u64) -> Self {
430        self.retention_time = Some(time);
431        self
432    }
433
434    /// Switches this time series into uncompressed mode. Note that
435    /// redis ts only respects this flag in TS.CREATE. All other options
436    /// usages will ignore this flag.
437    pub fn uncompressed(mut self, value: bool) -> Self {
438        self.uncompressed = value;
439        self
440    }
441
442    /// Resets all labels to the items in given labels. All labels that
443    /// where previously present will be removed. If the labels are empty
444    /// no labels will be used for the time series.
445    pub fn labels(mut self, labels: Vec<(&str, &str)>) -> Self {
446        if !labels.is_empty() {
447            self.labels = Some(ToRedisArgs::to_redis_args(&labels));
448        } else {
449            self.labels = None;
450        }
451        self
452    }
453
454    /// Adds a single label to this time series options.
455    pub fn label(mut self, name: &str, value: &str) -> Self {
456        let mut l = ToRedisArgs::to_redis_args(&vec![(name, value)]);
457        let mut res: Vec<Vec<u8>> = vec![];
458        if let Some(mut cur) = self.labels {
459            res.append(&mut cur);
460        }
461        res.append(&mut l);
462        self.labels = Some(res);
463        self
464    }
465
466    /// Overrides the servers default dplicatePoliciy.
467    pub fn duplicate_policy(mut self, policy: TsDuplicatePolicy) -> Self {
468        self.duplicate_policy = Some(policy);
469        self
470    }
471
472    /// Sets the allocation size for data in bytes.
473    pub fn chunk_size(mut self, size: u64) -> Self {
474        self.chunk_size = Some(size);
475        self
476    }
477}
478
479impl ToRedisArgs for TsOptions {
480    fn write_redis_args<W>(&self, out: &mut W)
481    where
482        W: ?Sized + RedisWrite,
483    {
484        if let Some(ref rt) = self.retention_time {
485            out.write_arg(b"RETENTION");
486            out.write_arg(format!("{rt}").as_bytes());
487        }
488
489        if self.uncompressed {
490            out.write_arg(b"UNCOMPRESSED");
491        }
492
493        if let Some(ref policy) = self.duplicate_policy {
494            policy.write_redis_args(out);
495        }
496
497        if let Some(ref alloc) = self.chunk_size {
498            out.write_arg(b"CHUNK_SIZE");
499            out.write_arg(format!("{alloc}").as_bytes());
500        }
501
502        if let Some(ref l) = self.labels {
503            out.write_arg(b"LABELS");
504            for arg in l {
505                out.write_arg(arg);
506            }
507        }
508    }
509}
510
511/// Let's you build redis time series filter query options via a builder pattern. Filters
512/// can be used in different commands like TS.MGET, TS.MRANGE and TS.QUERYINDEX.
513#[derive(Debug, Default, Clone)]
514pub struct TsFilterOptions {
515    with_labels: bool,
516    filters: Vec<TsFilter>,
517}
518
519/// TsFilterOptions allows you to build up your redis time series filter query. It
520/// supports default and a builder pattern so you can use it the following way:
521///
522/// ```rust
523/// use redis_ts::TsFilterOptions;
524///
525/// let filters = TsFilterOptions::default()
526///     .with_labels(true)
527///     .equals("label_1", "value_1")
528///     .not_equals("label_2", "hello")
529///     .in_set("label_3", vec!["a", "b", "c"])
530///     .not_in_set("label_3", vec!["d", "e"])
531///     .has_label("some_other")
532///     .not_has_label("unwanted");
533/// ```
534///
535impl TsFilterOptions {
536    /// Will add the WITHLABELS flag to the filter query. The query response will have
537    /// label information attached.
538    pub fn with_labels(mut self, value: bool) -> Self {
539        self.with_labels = value;
540        self
541    }
542
543    /// Select time series where the given label contains the the given value.
544    pub fn equals<L: Display + ToRedisArgs, V: Display + ToRedisArgs>(
545        mut self,
546        name: L,
547        value: V,
548    ) -> Self {
549        self.filters.push(TsFilter {
550            name: format!("{name}"),
551            value: format!("{value}"),
552            compare: TsCompare::Eq,
553        });
554        self
555    }
556
557    /// Select time series where given label does not contain the given value.
558    pub fn not_equals<L: Display + ToRedisArgs, V: Display + ToRedisArgs>(
559        mut self,
560        name: L,
561        value: V,
562    ) -> Self {
563        self.filters.push(TsFilter {
564            name: format!("{name}"),
565            value: format!("{value}"),
566            compare: TsCompare::NotEq,
567        });
568        self
569    }
570
571    /// Select time series where given label contains any of the given values.
572    pub fn in_set<L: Display + ToRedisArgs, V: Display + ToRedisArgs>(
573        mut self,
574        name: L,
575        values: Vec<V>,
576    ) -> Self {
577        let set = format!(
578            "({})",
579            values
580                .iter()
581                .map(|v| { format!("{v}") })
582                .collect::<Vec<String>>()
583                .join(",")
584        );
585        self.filters.push(TsFilter {
586            name: format!("{name}"),
587            value: set,
588            compare: TsCompare::Eq,
589        });
590        self
591    }
592
593    /// Select time series where given label does not contain any of the given values.
594    pub fn not_in_set<L: Display + ToRedisArgs, V: Display + ToRedisArgs>(
595        mut self,
596        name: L,
597        values: Vec<V>,
598    ) -> Self {
599        let set = format!(
600            "({})",
601            values
602                .iter()
603                .map(|v| { format!("{v}") })
604                .collect::<Vec<String>>()
605                .join(",")
606        );
607        self.filters.push(TsFilter {
608            name: format!("{name}"),
609            value: set,
610            compare: TsCompare::NotEq,
611        });
612        self
613    }
614
615    /// Select all time series that have the given label.
616    pub fn has_label<L: Display + ToRedisArgs>(mut self, name: L) -> Self {
617        self.filters.push(TsFilter {
618            name: format!("{name}"),
619            value: "".to_string(),
620            compare: TsCompare::NotEq,
621        });
622        self
623    }
624
625    /// Select all time series that do not have the given label.
626    pub fn not_has_label<L: Display + ToRedisArgs>(mut self, name: L) -> Self {
627        self.filters.push(TsFilter {
628            name: format!("{name}"),
629            value: "".to_string(),
630            compare: TsCompare::Eq,
631        });
632        self
633    }
634
635    pub fn get_filters(self) -> Vec<TsFilter> {
636        self.filters
637    }
638}
639
640impl ToRedisArgs for TsFilterOptions {
641    fn write_redis_args<W>(&self, out: &mut W)
642    where
643        W: ?Sized + RedisWrite,
644    {
645        if self.with_labels {
646            out.write_arg(b"WITHLABELS");
647        }
648        out.write_arg(b"FILTER");
649
650        for f in self.filters.iter() {
651            f.write_redis_args(out)
652        }
653    }
654}
655
656/// Provides information about a redis time series key.
657#[derive(Debug, Default)]
658pub struct TsInfo {
659    pub total_samples: u64,
660    pub memory_usage: u64,
661    pub first_timestamp: u64,
662    pub last_timestamp: u64,
663    pub retention_time: u64,
664    pub chunk_count: u64,
665    pub max_samples_per_chunk: u16,
666    pub chunk_size: u64,
667    pub duplicate_policy: Option<TsDuplicatePolicy>,
668    pub labels: Vec<(String, String)>,
669    pub source_key: Option<String>,
670    pub rules: Vec<(String, u64, String)>,
671}
672
673impl FromRedisValue for TsInfo {
674    fn from_redis_value(v: &Value) -> RedisResult<Self> {
675        match *v {
676            Value::Bulk(ref values) => {
677                let mut result = TsInfo::default();
678                let mut map: HashMap<String, Value> = HashMap::new();
679
680                for pair in values.chunks(2) {
681                    map.insert(from_redis_value(&pair[0])?, pair[1].clone());
682                }
683
684                //println!("{:?}", map);
685
686                if let Some(v) = map.get("totalSamples") {
687                    result.total_samples = from_redis_value(v)?;
688                }
689
690                if let Some(v) = map.get("memoryUsage") {
691                    result.memory_usage = from_redis_value(v)?;
692                }
693
694                if let Some(v) = map.get("firstTimestamp") {
695                    result.first_timestamp = from_redis_value(v)?;
696                }
697
698                if let Some(v) = map.get("lastTimestamp") {
699                    result.last_timestamp = from_redis_value(v)?;
700                }
701
702                if let Some(v) = map.get("retentionTime") {
703                    result.retention_time = from_redis_value(v)?;
704                }
705
706                if let Some(v) = map.get("chunkCount") {
707                    result.chunk_count = from_redis_value(v)?;
708                }
709
710                if let Some(v) = map.get("maxSamplesPerChunk") {
711                    result.max_samples_per_chunk = from_redis_value(v)?;
712                }
713
714                if let Some(v) = map.get("chunkSize") {
715                    result.chunk_size = from_redis_value(v)?;
716                }
717
718                if let Some(v) = map.get("sourceKey") {
719                    result.source_key = from_redis_value(v)?;
720                }
721
722                if let Some(v) = map.get("duplicatePolicy") {
723                    result.duplicate_policy = from_redis_value(v)?;
724                }
725
726                result.rules = match map.get("rules") {
727                    Some(Value::Bulk(ref values)) => values
728                        .iter()
729                        .flat_map(|value| match value {
730                            Value::Bulk(ref vs) => Some((
731                                from_redis_value(&vs[0]).unwrap(),
732                                from_redis_value(&vs[1]).unwrap(),
733                                from_redis_value(&vs[2]).unwrap(),
734                            )),
735                            _ => None,
736                        })
737                        .collect(),
738                    _ => vec![],
739                };
740
741                result.labels = match map.get("labels") {
742                    Some(Value::Bulk(ref values)) => values
743                        .iter()
744                        .flat_map(|value| match value {
745                            Value::Bulk(ref vs) => Some((
746                                from_redis_value(&vs[0]).unwrap(),
747                                from_redis_value(&vs[1]).unwrap(),
748                            )),
749                            _ => None,
750                        })
751                        .collect(),
752                    _ => vec![],
753                };
754
755                Ok(result)
756            }
757            _ => Err(RedisError::from(std::io::Error::new(
758                std::io::ErrorKind::Other,
759                "no_ts_info_data",
760            ))),
761        }
762    }
763}
764
765/// Represents a TS.MGET redis time series result. The concrete types for timestamp
766/// and value eg <u64,f64> can be provided from the call site.
767#[derive(Debug)]
768pub struct TsMget<TS: FromRedisValue, V: FromRedisValue> {
769    pub values: Vec<TsMgetEntry<TS, V>>,
770}
771
772impl<TS: Default + FromRedisValue, V: Default + FromRedisValue> FromRedisValue for TsMget<TS, V> {
773    fn from_redis_value(v: &Value) -> RedisResult<Self> {
774        let res = match *v {
775            Value::Bulk(ref values) => TsMget {
776                values: FromRedisValue::from_redis_values(values)?,
777            },
778            _ => TsMget { values: vec![] },
779        };
780        Ok(res)
781    }
782}
783
784/// Represents a TS.MGET redis time series entry. The concrete types for timestamp
785/// and value eg <u64,f64> can be provided from the call site.
786#[derive(Debug, Default)]
787pub struct TsMgetEntry<TS: FromRedisValue, V: FromRedisValue> {
788    pub key: String,
789    pub labels: Vec<(String, String)>,
790    pub value: Option<(TS, V)>,
791}
792
793impl<TS: Default + FromRedisValue, V: Default + FromRedisValue> FromRedisValue
794    for TsMgetEntry<TS, V>
795{
796    fn from_redis_value(v: &Value) -> RedisResult<Self> {
797        match *v {
798            Value::Bulk(ref values) => {
799                let result = TsMgetEntry::<TS, V> {
800                    key: from_redis_value(&values[0])?,
801                    labels: match values[1] {
802                        Value::Bulk(ref vs) => vs
803                            .iter()
804                            .flat_map(|value| match value {
805                                Value::Bulk(ref v) => Some((
806                                    from_redis_value(&v[0]).unwrap(),
807                                    from_redis_value(&v[1]).unwrap(),
808                                )),
809                                _ => None,
810                            })
811                            .collect(),
812                        _ => vec![],
813                    },
814                    value: match values[2] {
815                        Value::Bulk(ref vs) if !vs.is_empty() => Some((
816                            from_redis_value(&vs[0]).unwrap(),
817                            from_redis_value(&vs[1]).unwrap(),
818                        )),
819                        _ => None,
820                    },
821                };
822
823                Ok(result)
824            }
825            _ => Err(RedisError::from(std::io::Error::new(
826                std::io::ErrorKind::Other,
827                "no_mget_data",
828            ))),
829        }
830    }
831}
832
833/// Represents a TS.RANGE redis time series result. The concrete types for timestamp
834/// and value eg <u64,f64> can be provided from the call site.
835#[derive(Debug)]
836pub struct TsRange<TS: FromRedisValue + Copy, V: FromRedisValue + Copy> {
837    pub values: Vec<(TS, V)>,
838}
839
840impl<TS: FromRedisValue + Copy, V: FromRedisValue + Copy> FromRedisValue for TsRange<TS, V> {
841    fn from_redis_value(v: &Value) -> RedisResult<Self> {
842        match *v {
843            Value::Bulk(ref values) => {
844                let items: Vec<TsValueReply<TS, V>> = FromRedisValue::from_redis_values(values)?;
845                Ok(TsRange {
846                    values: items.iter().map(|i| (i.ts, i.value)).collect(),
847                })
848            }
849            _ => Err(RedisError::from(std::io::Error::new(
850                std::io::ErrorKind::Other,
851                "no_range_data",
852            ))),
853        }
854    }
855}
856
857/// Represents a TS.MRANGE redis time series result with multiple entries. The concrete types for timestamp
858/// and value eg <u64,f64> can be provided from the call site.
859#[derive(Debug)]
860pub struct TsMrange<TS: FromRedisValue + Copy, V: FromRedisValue + Copy> {
861    pub values: Vec<TsMrangeEntry<TS, V>>,
862}
863
864impl<TS: Default + FromRedisValue + Copy, V: Default + FromRedisValue + Copy> FromRedisValue
865    for TsMrange<TS, V>
866{
867    fn from_redis_value(v: &Value) -> RedisResult<Self> {
868        let res = match *v {
869            Value::Bulk(ref values) => TsMrange {
870                values: FromRedisValue::from_redis_values(values)?,
871            },
872            _ => TsMrange { values: vec![] },
873        };
874        Ok(res)
875    }
876}
877
878/// Represents a TS.MRANGE redis time series value. The concrete types for timestamp
879/// and value eg <u64,f64> can be provided from the call site.
880#[derive(Debug, Default)]
881pub struct TsMrangeEntry<TS: FromRedisValue + Copy, V: FromRedisValue + Copy> {
882    pub key: String,
883    pub labels: Vec<(String, String)>,
884    pub values: Vec<(TS, V)>,
885}
886
887impl<TS: Default + FromRedisValue + Copy, V: Default + FromRedisValue + Copy> FromRedisValue
888    for TsMrangeEntry<TS, V>
889{
890    fn from_redis_value(v: &Value) -> RedisResult<Self> {
891        match *v {
892            Value::Bulk(ref values) => {
893                let result = TsMrangeEntry::<TS, V> {
894                    key: from_redis_value(&values[0]).unwrap(),
895                    labels: match values[1] {
896                        Value::Bulk(ref vs) => vs
897                            .iter()
898                            .flat_map(|value| match value {
899                                Value::Bulk(ref v) => Some((
900                                    from_redis_value(&v[0]).unwrap(),
901                                    from_redis_value(&v[1]).unwrap(),
902                                )),
903                                _ => None,
904                            })
905                            .collect(),
906                        _ => vec![],
907                    },
908                    values: match values[2] {
909                        Value::Bulk(ref vs) => {
910                            let items: Vec<TsValueReply<TS, V>> =
911                                FromRedisValue::from_redis_values(vs)?;
912                            items.iter().map(|i| (i.ts, i.value)).collect()
913                        }
914                        _ => vec![],
915                    },
916                };
917
918                Ok(result)
919            }
920            _ => Err(RedisError::from(std::io::Error::new(
921                std::io::ErrorKind::Other,
922                "no_mget_data",
923            ))),
924        }
925    }
926}
927
928#[derive(Debug)]
929struct TsValueReply<TS: FromRedisValue, V: FromRedisValue> {
930    pub ts: TS,
931    pub value: V,
932}
933
934impl<TS: FromRedisValue, V: FromRedisValue> FromRedisValue for TsValueReply<TS, V> {
935    fn from_redis_value(v: &Value) -> RedisResult<Self> {
936        match *v {
937            Value::Bulk(ref values) => Ok(TsValueReply {
938                ts: from_redis_value(&values[0]).unwrap(),
939                value: from_redis_value(&values[1]).unwrap(),
940            }),
941            _ => Err(RedisError::from(std::io::Error::new(
942                std::io::ErrorKind::Other,
943                "no_value_data",
944            ))),
945        }
946    }
947}
948
949#[derive(PartialEq, Eq, Clone, Debug, Copy)]
950enum TsCompare {
951    Eq,
952    NotEq,
953}
954
955impl ToRedisArgs for TsCompare {
956    fn write_redis_args<W>(&self, out: &mut W)
957    where
958        W: ?Sized + RedisWrite,
959    {
960        let val = match *self {
961            TsCompare::Eq => "=",
962            TsCompare::NotEq => "!=",
963        };
964
965        val.write_redis_args(out);
966    }
967}
968
969#[derive(Debug, Clone)]
970pub struct TsFilter {
971    name: String,
972    value: String,
973    compare: TsCompare,
974}
975
976impl ToRedisArgs for TsFilter {
977    fn write_redis_args<W>(&self, out: &mut W)
978    where
979        W: ?Sized + RedisWrite,
980    {
981        let comp = match self.compare {
982            TsCompare::Eq => "=",
983            TsCompare::NotEq => "!=",
984        };
985
986        let arg = format!("{}{}{}", self.name, comp, self.value);
987        out.write_arg(arg.as_bytes());
988    }
989}