fred/commands/interfaces/
timeseries.rs

1use crate::{
2  commands,
3  interfaces::ClientLike,
4  prelude::{Error, FredResult, Key},
5  types::{
6    timeseries::{
7      Aggregator,
8      DuplicatePolicy,
9      Encoding,
10      GetLabels,
11      GetTimestamp,
12      GroupBy,
13      RangeAggregation,
14      Timestamp,
15    },
16    FromValue,
17    Map,
18  },
19};
20use bytes_utils::Str;
21use fred_macros::rm_send_if;
22use futures::Future;
23
24/// A [Redis Timeseries](https://github.com/RedisTimeSeries/RedisTimeSeries/) interface.
25#[cfg_attr(docsrs, doc(cfg(feature = "i-time-series")))]
26#[rm_send_if(feature = "glommio")]
27pub trait TimeSeriesInterface: ClientLike {
28  /// Append a sample to a time series.
29  ///
30  /// <https://redis.io/commands/ts.add/>
31  fn ts_add<R, K, T, L>(
32    &self,
33    key: K,
34    timestamp: T,
35    value: f64,
36    retention: Option<u64>,
37    encoding: Option<Encoding>,
38    chunk_size: Option<u64>,
39    on_duplicate: Option<DuplicatePolicy>,
40    labels: L,
41  ) -> impl Future<Output = FredResult<R>> + Send
42  where
43    R: FromValue,
44    K: Into<Key> + Send,
45    T: TryInto<Timestamp> + Send,
46    T::Error: Into<Error> + Send,
47    L: TryInto<Map> + Send,
48    L::Error: Into<Error>,
49  {
50    async move {
51      into!(key);
52      try_into!(timestamp, labels);
53      commands::timeseries::ts_add(
54        self,
55        key,
56        timestamp,
57        value,
58        retention,
59        encoding,
60        chunk_size,
61        on_duplicate,
62        labels,
63      )
64      .await?
65      .convert()
66    }
67  }
68
69  /// Update the retention, chunk size, duplicate policy, and labels of an existing time series.
70  ///
71  /// <https://redis.io/commands/ts.alter/>
72  fn ts_alter<R, K, L>(
73    &self,
74    key: K,
75    retention: Option<u64>,
76    chunk_size: Option<u64>,
77    duplicate_policy: Option<DuplicatePolicy>,
78    labels: L,
79  ) -> impl Future<Output = FredResult<R>> + Send
80  where
81    R: FromValue,
82    K: Into<Key> + Send,
83    L: TryInto<Map> + Send,
84    L::Error: Into<Error>,
85  {
86    async move {
87      into!(key);
88      try_into!(labels);
89      commands::timeseries::ts_alter(self, key, retention, chunk_size, duplicate_policy, labels)
90        .await?
91        .convert()
92    }
93  }
94
95  /// Create a new time series.
96  ///
97  /// <https://redis.io/commands/ts.create/>
98  fn ts_create<R, K, L>(
99    &self,
100    key: K,
101    retention: Option<u64>,
102    encoding: Option<Encoding>,
103    chunk_size: Option<u64>,
104    duplicate_policy: Option<DuplicatePolicy>,
105    labels: L,
106  ) -> impl Future<Output = FredResult<R>> + Send
107  where
108    R: FromValue,
109    K: Into<Key> + Send,
110    L: TryInto<Map> + Send,
111    L::Error: Into<Error>,
112  {
113    async move {
114      into!(key);
115      try_into!(labels);
116      commands::timeseries::ts_create(self, key, retention, encoding, chunk_size, duplicate_policy, labels)
117        .await?
118        .convert()
119    }
120  }
121
122  /// Create a compaction rule.
123  ///
124  /// <https://redis.io/commands/ts.createrule/>
125  fn ts_createrule<R, S, D>(
126    &self,
127    src: S,
128    dest: D,
129    aggregation: (Aggregator, u64),
130    align_timestamp: Option<u64>,
131  ) -> impl Future<Output = FredResult<R>> + Send
132  where
133    R: FromValue,
134    S: Into<Key> + Send,
135    D: Into<Key> + Send,
136  {
137    async move {
138      into!(src, dest);
139      commands::timeseries::ts_createrule(self, src, dest, aggregation, align_timestamp)
140        .await?
141        .convert()
142    }
143  }
144
145  /// Decrease the value of the sample with the maximum existing timestamp, or create a new sample with a value equal
146  /// to the value of the sample with the maximum existing timestamp with a given decrement.
147  ///
148  /// <https://redis.io/commands/ts.decrby/>
149  fn ts_decrby<R, K, L>(
150    &self,
151    key: K,
152    subtrahend: f64,
153    timestamp: Option<Timestamp>,
154    retention: Option<u64>,
155    uncompressed: bool,
156    chunk_size: Option<u64>,
157    labels: L,
158  ) -> impl Future<Output = FredResult<R>> + Send
159  where
160    R: FromValue,
161    K: Into<Key> + Send,
162    L: TryInto<Map> + Send,
163    L::Error: Into<Error> + Send,
164  {
165    async move {
166      into!(key);
167      try_into!(labels);
168      commands::timeseries::ts_decrby(
169        self,
170        key,
171        subtrahend,
172        timestamp,
173        retention,
174        uncompressed,
175        chunk_size,
176        labels,
177      )
178      .await?
179      .convert()
180    }
181  }
182
183  /// Delete all samples between two timestamps for a given time series.
184  ///
185  /// <https://redis.io/commands/ts.del/>
186  fn ts_del<R, K>(&self, key: K, from: i64, to: i64) -> impl Future<Output = FredResult<R>> + Send
187  where
188    R: FromValue,
189    K: Into<Key> + Send,
190  {
191    async move {
192      into!(key);
193      commands::timeseries::ts_del(self, key, from, to).await?.convert()
194    }
195  }
196
197  /// Delete a compaction rule.
198  ///
199  /// <https://redis.io/commands/ts.deleterule/>
200  fn ts_deleterule<R, S, D>(&self, src: S, dest: D) -> impl Future<Output = FredResult<R>> + Send
201  where
202    R: FromValue,
203    S: Into<Key> + Send,
204    D: Into<Key> + Send,
205  {
206    async move {
207      into!(src, dest);
208      commands::timeseries::ts_deleterule(self, src, dest).await?.convert()
209    }
210  }
211
212  /// Get the sample with the highest timestamp from a given time series.
213  ///
214  /// <https://redis.io/commands/ts.get/>
215  fn ts_get<R, K>(&self, key: K, latest: bool) -> impl Future<Output = FredResult<R>> + Send
216  where
217    R: FromValue,
218    K: Into<Key> + Send,
219  {
220    async move {
221      into!(key);
222      commands::timeseries::ts_get(self, key, latest).await?.convert()
223    }
224  }
225
226  /// Increase the value of the sample with the maximum existing timestamp, or create a new sample with a value equal
227  /// to the value of the sample with the maximum existing timestamp with a given increment.
228  ///
229  /// <https://redis.io/commands/ts.incrby/>
230  fn ts_incrby<R, K, L>(
231    &self,
232    key: K,
233    addend: f64,
234    timestamp: Option<Timestamp>,
235    retention: Option<u64>,
236    uncompressed: bool,
237    chunk_size: Option<u64>,
238    labels: L,
239  ) -> impl Future<Output = FredResult<R>> + Send
240  where
241    R: FromValue,
242    K: Into<Key> + Send,
243    L: TryInto<Map> + Send,
244    L::Error: Into<Error> + Send,
245  {
246    async move {
247      into!(key);
248      try_into!(labels);
249      commands::timeseries::ts_incrby(
250        self,
251        key,
252        addend,
253        timestamp,
254        retention,
255        uncompressed,
256        chunk_size,
257        labels,
258      )
259      .await?
260      .convert()
261    }
262  }
263
264  /// Return information and statistics for a time series.
265  ///
266  /// <https://redis.io/commands/ts.info/>
267  fn ts_info<R, K>(&self, key: K, debug: bool) -> impl Future<Output = FredResult<R>> + Send
268  where
269    R: FromValue,
270    K: Into<Key> + Send,
271  {
272    async move {
273      into!(key);
274      commands::timeseries::ts_info(self, key, debug).await?.convert()
275    }
276  }
277
278  /// Append new samples to one or more time series.
279  ///
280  /// <https://redis.io/commands/ts.madd/>
281  fn ts_madd<R, K, I>(&self, samples: I) -> impl Future<Output = FredResult<R>> + Send
282  where
283    R: FromValue,
284    K: Into<Key> + Send,
285    I: IntoIterator<Item = (K, Timestamp, f64)> + Send,
286  {
287    async move {
288      let samples: Vec<_> = samples
289        .into_iter()
290        .map(|(key, ts, val)| (key.into(), ts, val))
291        .collect();
292
293      commands::timeseries::ts_madd(self, samples).await?.convert()
294    }
295  }
296
297  /// Get the sample with the highest timestamp from each time series matching a specific filter.
298  ///
299  /// See [Resp2TimeSeriesValues](crate::types::timeseries::Resp2TimeSeriesValues) and
300  /// [Resp3TimeSeriesValues](crate::types::timeseries::Resp3TimeSeriesValues) for more information.
301  ///
302  /// <https://redis.io/commands/ts.mget/>
303  fn ts_mget<R, L, S, I>(
304    &self,
305    latest: bool,
306    labels: Option<L>,
307    filters: I,
308  ) -> impl Future<Output = FredResult<R>> + Send
309  where
310    R: FromValue,
311    L: Into<GetLabels> + Send,
312    S: Into<Str> + Send,
313    I: IntoIterator<Item = S> + Send,
314  {
315    async move {
316      let labels = labels.map(|l| l.into());
317      let filters = filters.into_iter().map(|s| s.into()).collect();
318
319      commands::timeseries::ts_mget(self, latest, labels, filters)
320        .await?
321        .convert()
322    }
323  }
324
325  /// Query a range across multiple time series by filters in the forward direction.
326  ///
327  /// See [Resp2TimeSeriesValues](crate::types::timeseries::Resp2TimeSeriesValues) and
328  /// [Resp3TimeSeriesValues](crate::types::timeseries::Resp3TimeSeriesValues) for more information.
329  ///
330  /// <https://redis.io/commands/ts.mrange/>
331  fn ts_mrange<R, F, T, I, S, J>(
332    &self,
333    from: F,
334    to: T,
335    latest: bool,
336    filter_by_ts: I,
337    filter_by_value: Option<(i64, i64)>,
338    labels: Option<GetLabels>,
339    count: Option<u64>,
340    aggregation: Option<RangeAggregation>,
341    filters: J,
342    group_by: Option<GroupBy>,
343  ) -> impl Future<Output = FredResult<R>> + Send
344  where
345    R: FromValue,
346    F: TryInto<GetTimestamp> + Send,
347    F::Error: Into<Error> + Send,
348    T: TryInto<GetTimestamp> + Send,
349    T::Error: Into<Error> + Send,
350    S: Into<Str> + Send,
351    I: IntoIterator<Item = i64> + Send,
352    J: IntoIterator<Item = S> + Send,
353  {
354    async move {
355      try_into!(from, to);
356      let filters = filters.into_iter().map(|s| s.into()).collect();
357      let filter_by_ts = filter_by_ts.into_iter().collect();
358
359      commands::timeseries::ts_mrange(
360        self,
361        from,
362        to,
363        latest,
364        filter_by_ts,
365        filter_by_value,
366        labels,
367        count,
368        aggregation,
369        filters,
370        group_by,
371      )
372      .await?
373      .convert()
374    }
375  }
376
377  /// Query a range across multiple time series by filters in the reverse direction.
378  ///
379  /// See [Resp2TimeSeriesValues](crate::types::timeseries::Resp2TimeSeriesValues) and
380  /// [Resp3TimeSeriesValues](crate::types::timeseries::Resp3TimeSeriesValues) for more information.
381  ///
382  /// <https://redis.io/commands/ts.mrevrange/>
383  fn ts_mrevrange<R, F, T, I, S, J>(
384    &self,
385    from: F,
386    to: T,
387    latest: bool,
388    filter_by_ts: I,
389    filter_by_value: Option<(i64, i64)>,
390    labels: Option<GetLabels>,
391    count: Option<u64>,
392    aggregation: Option<RangeAggregation>,
393    filters: J,
394    group_by: Option<GroupBy>,
395  ) -> impl Future<Output = FredResult<R>> + Send
396  where
397    R: FromValue,
398    F: TryInto<GetTimestamp> + Send,
399    F::Error: Into<Error> + Send,
400    T: TryInto<GetTimestamp> + Send,
401    T::Error: Into<Error> + Send,
402    S: Into<Str> + Send,
403    I: IntoIterator<Item = i64> + Send,
404    J: IntoIterator<Item = S> + Send,
405  {
406    async move {
407      try_into!(from, to);
408      let filters = filters.into_iter().map(|s| s.into()).collect();
409      let filter_by_ts = filter_by_ts.into_iter().collect();
410
411      commands::timeseries::ts_mrevrange(
412        self,
413        from,
414        to,
415        latest,
416        filter_by_ts,
417        filter_by_value,
418        labels,
419        count,
420        aggregation,
421        filters,
422        group_by,
423      )
424      .await?
425      .convert()
426    }
427  }
428
429  /// Get all time series keys matching a filter list.
430  ///
431  /// <https://redis.io/commands/ts.queryindex/>
432  fn ts_queryindex<R, S, I>(&self, filters: I) -> impl Future<Output = FredResult<R>> + Send
433  where
434    R: FromValue,
435    S: Into<Str> + Send,
436    I: IntoIterator<Item = S> + Send,
437  {
438    async move {
439      let filters = filters.into_iter().map(|s| s.into()).collect();
440      commands::timeseries::ts_queryindex(self, filters).await?.convert()
441    }
442  }
443
444  /// Query a range in forward direction.
445  ///
446  /// <https://redis.io/commands/ts.range/>
447  fn ts_range<R, K, F, T, I>(
448    &self,
449    key: K,
450    from: F,
451    to: T,
452    latest: bool,
453    filter_by_ts: I,
454    filter_by_value: Option<(i64, i64)>,
455    count: Option<u64>,
456    aggregation: Option<RangeAggregation>,
457  ) -> impl Future<Output = FredResult<R>> + Send
458  where
459    R: FromValue,
460    K: Into<Key> + Send,
461    F: TryInto<GetTimestamp> + Send,
462    F::Error: Into<Error> + Send,
463    T: TryInto<GetTimestamp> + Send,
464    T::Error: Into<Error> + Send,
465    I: IntoIterator<Item = i64> + Send,
466  {
467    async move {
468      into!(key);
469      try_into!(from, to);
470      let filter_by_ts = filter_by_ts.into_iter().collect();
471
472      commands::timeseries::ts_range(
473        self,
474        key,
475        from,
476        to,
477        latest,
478        filter_by_ts,
479        filter_by_value,
480        count,
481        aggregation,
482      )
483      .await?
484      .convert()
485    }
486  }
487
488  /// Query a range in reverse direction.
489  ///
490  /// <https://redis.io/commands/ts.revrange/>
491  fn ts_revrange<R, K, F, T, I>(
492    &self,
493    key: K,
494    from: F,
495    to: T,
496    latest: bool,
497    filter_by_ts: I,
498    filter_by_value: Option<(i64, i64)>,
499    count: Option<u64>,
500    aggregation: Option<RangeAggregation>,
501  ) -> impl Future<Output = FredResult<R>> + Send
502  where
503    R: FromValue,
504    K: Into<Key> + Send,
505    F: TryInto<GetTimestamp> + Send,
506    F::Error: Into<Error> + Send,
507    T: TryInto<GetTimestamp> + Send,
508    T::Error: Into<Error> + Send,
509    I: IntoIterator<Item = i64> + Send,
510  {
511    async move {
512      into!(key);
513      try_into!(from, to);
514      let filter_by_ts = filter_by_ts.into_iter().collect();
515
516      commands::timeseries::ts_revrange(
517        self,
518        key,
519        from,
520        to,
521        latest,
522        filter_by_ts,
523        filter_by_value,
524        count,
525        aggregation,
526      )
527      .await?
528      .convert()
529    }
530  }
531}