1use crate::{
2 commands,
3 interfaces::ClientLike,
4 prelude::{Error, FredResult, Key},
5 types::{
6 timeseries::{
7 Aggregator,
8 DuplicatePolicy,
9 Encoding,
10 GetLabels,
11 GetTimestamp,
12 GroupBy,
13 RangeAggregation,
14 Timestamp,
15 },
16 FromValue,
17 Map,
18 },
19};
20use bytes_utils::Str;
21use fred_macros::rm_send_if;
22use futures::Future;
23
24#[cfg_attr(docsrs, doc(cfg(feature = "i-time-series")))]
26#[rm_send_if(feature = "glommio")]
27pub trait TimeSeriesInterface: ClientLike {
28 fn ts_add<R, K, T, L>(
32 &self,
33 key: K,
34 timestamp: T,
35 value: f64,
36 retention: Option<u64>,
37 encoding: Option<Encoding>,
38 chunk_size: Option<u64>,
39 on_duplicate: Option<DuplicatePolicy>,
40 labels: L,
41 ) -> impl Future<Output = FredResult<R>> + Send
42 where
43 R: FromValue,
44 K: Into<Key> + Send,
45 T: TryInto<Timestamp> + Send,
46 T::Error: Into<Error> + Send,
47 L: TryInto<Map> + Send,
48 L::Error: Into<Error>,
49 {
50 async move {
51 into!(key);
52 try_into!(timestamp, labels);
53 commands::timeseries::ts_add(
54 self,
55 key,
56 timestamp,
57 value,
58 retention,
59 encoding,
60 chunk_size,
61 on_duplicate,
62 labels,
63 )
64 .await?
65 .convert()
66 }
67 }
68
69 fn ts_alter<R, K, L>(
73 &self,
74 key: K,
75 retention: Option<u64>,
76 chunk_size: Option<u64>,
77 duplicate_policy: Option<DuplicatePolicy>,
78 labels: L,
79 ) -> impl Future<Output = FredResult<R>> + Send
80 where
81 R: FromValue,
82 K: Into<Key> + Send,
83 L: TryInto<Map> + Send,
84 L::Error: Into<Error>,
85 {
86 async move {
87 into!(key);
88 try_into!(labels);
89 commands::timeseries::ts_alter(self, key, retention, chunk_size, duplicate_policy, labels)
90 .await?
91 .convert()
92 }
93 }
94
95 fn ts_create<R, K, L>(
99 &self,
100 key: K,
101 retention: Option<u64>,
102 encoding: Option<Encoding>,
103 chunk_size: Option<u64>,
104 duplicate_policy: Option<DuplicatePolicy>,
105 labels: L,
106 ) -> impl Future<Output = FredResult<R>> + Send
107 where
108 R: FromValue,
109 K: Into<Key> + Send,
110 L: TryInto<Map> + Send,
111 L::Error: Into<Error>,
112 {
113 async move {
114 into!(key);
115 try_into!(labels);
116 commands::timeseries::ts_create(self, key, retention, encoding, chunk_size, duplicate_policy, labels)
117 .await?
118 .convert()
119 }
120 }
121
122 fn ts_createrule<R, S, D>(
126 &self,
127 src: S,
128 dest: D,
129 aggregation: (Aggregator, u64),
130 align_timestamp: Option<u64>,
131 ) -> impl Future<Output = FredResult<R>> + Send
132 where
133 R: FromValue,
134 S: Into<Key> + Send,
135 D: Into<Key> + Send,
136 {
137 async move {
138 into!(src, dest);
139 commands::timeseries::ts_createrule(self, src, dest, aggregation, align_timestamp)
140 .await?
141 .convert()
142 }
143 }
144
145 fn ts_decrby<R, K, L>(
150 &self,
151 key: K,
152 subtrahend: f64,
153 timestamp: Option<Timestamp>,
154 retention: Option<u64>,
155 uncompressed: bool,
156 chunk_size: Option<u64>,
157 labels: L,
158 ) -> impl Future<Output = FredResult<R>> + Send
159 where
160 R: FromValue,
161 K: Into<Key> + Send,
162 L: TryInto<Map> + Send,
163 L::Error: Into<Error> + Send,
164 {
165 async move {
166 into!(key);
167 try_into!(labels);
168 commands::timeseries::ts_decrby(
169 self,
170 key,
171 subtrahend,
172 timestamp,
173 retention,
174 uncompressed,
175 chunk_size,
176 labels,
177 )
178 .await?
179 .convert()
180 }
181 }
182
183 fn ts_del<R, K>(&self, key: K, from: i64, to: i64) -> impl Future<Output = FredResult<R>> + Send
187 where
188 R: FromValue,
189 K: Into<Key> + Send,
190 {
191 async move {
192 into!(key);
193 commands::timeseries::ts_del(self, key, from, to).await?.convert()
194 }
195 }
196
197 fn ts_deleterule<R, S, D>(&self, src: S, dest: D) -> impl Future<Output = FredResult<R>> + Send
201 where
202 R: FromValue,
203 S: Into<Key> + Send,
204 D: Into<Key> + Send,
205 {
206 async move {
207 into!(src, dest);
208 commands::timeseries::ts_deleterule(self, src, dest).await?.convert()
209 }
210 }
211
212 fn ts_get<R, K>(&self, key: K, latest: bool) -> impl Future<Output = FredResult<R>> + Send
216 where
217 R: FromValue,
218 K: Into<Key> + Send,
219 {
220 async move {
221 into!(key);
222 commands::timeseries::ts_get(self, key, latest).await?.convert()
223 }
224 }
225
226 fn ts_incrby<R, K, L>(
231 &self,
232 key: K,
233 addend: f64,
234 timestamp: Option<Timestamp>,
235 retention: Option<u64>,
236 uncompressed: bool,
237 chunk_size: Option<u64>,
238 labels: L,
239 ) -> impl Future<Output = FredResult<R>> + Send
240 where
241 R: FromValue,
242 K: Into<Key> + Send,
243 L: TryInto<Map> + Send,
244 L::Error: Into<Error> + Send,
245 {
246 async move {
247 into!(key);
248 try_into!(labels);
249 commands::timeseries::ts_incrby(
250 self,
251 key,
252 addend,
253 timestamp,
254 retention,
255 uncompressed,
256 chunk_size,
257 labels,
258 )
259 .await?
260 .convert()
261 }
262 }
263
264 fn ts_info<R, K>(&self, key: K, debug: bool) -> impl Future<Output = FredResult<R>> + Send
268 where
269 R: FromValue,
270 K: Into<Key> + Send,
271 {
272 async move {
273 into!(key);
274 commands::timeseries::ts_info(self, key, debug).await?.convert()
275 }
276 }
277
278 fn ts_madd<R, K, I>(&self, samples: I) -> impl Future<Output = FredResult<R>> + Send
282 where
283 R: FromValue,
284 K: Into<Key> + Send,
285 I: IntoIterator<Item = (K, Timestamp, f64)> + Send,
286 {
287 async move {
288 let samples: Vec<_> = samples
289 .into_iter()
290 .map(|(key, ts, val)| (key.into(), ts, val))
291 .collect();
292
293 commands::timeseries::ts_madd(self, samples).await?.convert()
294 }
295 }
296
297 fn ts_mget<R, L, S, I>(
304 &self,
305 latest: bool,
306 labels: Option<L>,
307 filters: I,
308 ) -> impl Future<Output = FredResult<R>> + Send
309 where
310 R: FromValue,
311 L: Into<GetLabels> + Send,
312 S: Into<Str> + Send,
313 I: IntoIterator<Item = S> + Send,
314 {
315 async move {
316 let labels = labels.map(|l| l.into());
317 let filters = filters.into_iter().map(|s| s.into()).collect();
318
319 commands::timeseries::ts_mget(self, latest, labels, filters)
320 .await?
321 .convert()
322 }
323 }
324
325 fn ts_mrange<R, F, T, I, S, J>(
332 &self,
333 from: F,
334 to: T,
335 latest: bool,
336 filter_by_ts: I,
337 filter_by_value: Option<(i64, i64)>,
338 labels: Option<GetLabels>,
339 count: Option<u64>,
340 aggregation: Option<RangeAggregation>,
341 filters: J,
342 group_by: Option<GroupBy>,
343 ) -> impl Future<Output = FredResult<R>> + Send
344 where
345 R: FromValue,
346 F: TryInto<GetTimestamp> + Send,
347 F::Error: Into<Error> + Send,
348 T: TryInto<GetTimestamp> + Send,
349 T::Error: Into<Error> + Send,
350 S: Into<Str> + Send,
351 I: IntoIterator<Item = i64> + Send,
352 J: IntoIterator<Item = S> + Send,
353 {
354 async move {
355 try_into!(from, to);
356 let filters = filters.into_iter().map(|s| s.into()).collect();
357 let filter_by_ts = filter_by_ts.into_iter().collect();
358
359 commands::timeseries::ts_mrange(
360 self,
361 from,
362 to,
363 latest,
364 filter_by_ts,
365 filter_by_value,
366 labels,
367 count,
368 aggregation,
369 filters,
370 group_by,
371 )
372 .await?
373 .convert()
374 }
375 }
376
377 fn ts_mrevrange<R, F, T, I, S, J>(
384 &self,
385 from: F,
386 to: T,
387 latest: bool,
388 filter_by_ts: I,
389 filter_by_value: Option<(i64, i64)>,
390 labels: Option<GetLabels>,
391 count: Option<u64>,
392 aggregation: Option<RangeAggregation>,
393 filters: J,
394 group_by: Option<GroupBy>,
395 ) -> impl Future<Output = FredResult<R>> + Send
396 where
397 R: FromValue,
398 F: TryInto<GetTimestamp> + Send,
399 F::Error: Into<Error> + Send,
400 T: TryInto<GetTimestamp> + Send,
401 T::Error: Into<Error> + Send,
402 S: Into<Str> + Send,
403 I: IntoIterator<Item = i64> + Send,
404 J: IntoIterator<Item = S> + Send,
405 {
406 async move {
407 try_into!(from, to);
408 let filters = filters.into_iter().map(|s| s.into()).collect();
409 let filter_by_ts = filter_by_ts.into_iter().collect();
410
411 commands::timeseries::ts_mrevrange(
412 self,
413 from,
414 to,
415 latest,
416 filter_by_ts,
417 filter_by_value,
418 labels,
419 count,
420 aggregation,
421 filters,
422 group_by,
423 )
424 .await?
425 .convert()
426 }
427 }
428
429 fn ts_queryindex<R, S, I>(&self, filters: I) -> impl Future<Output = FredResult<R>> + Send
433 where
434 R: FromValue,
435 S: Into<Str> + Send,
436 I: IntoIterator<Item = S> + Send,
437 {
438 async move {
439 let filters = filters.into_iter().map(|s| s.into()).collect();
440 commands::timeseries::ts_queryindex(self, filters).await?.convert()
441 }
442 }
443
444 fn ts_range<R, K, F, T, I>(
448 &self,
449 key: K,
450 from: F,
451 to: T,
452 latest: bool,
453 filter_by_ts: I,
454 filter_by_value: Option<(i64, i64)>,
455 count: Option<u64>,
456 aggregation: Option<RangeAggregation>,
457 ) -> impl Future<Output = FredResult<R>> + Send
458 where
459 R: FromValue,
460 K: Into<Key> + Send,
461 F: TryInto<GetTimestamp> + Send,
462 F::Error: Into<Error> + Send,
463 T: TryInto<GetTimestamp> + Send,
464 T::Error: Into<Error> + Send,
465 I: IntoIterator<Item = i64> + Send,
466 {
467 async move {
468 into!(key);
469 try_into!(from, to);
470 let filter_by_ts = filter_by_ts.into_iter().collect();
471
472 commands::timeseries::ts_range(
473 self,
474 key,
475 from,
476 to,
477 latest,
478 filter_by_ts,
479 filter_by_value,
480 count,
481 aggregation,
482 )
483 .await?
484 .convert()
485 }
486 }
487
488 fn ts_revrange<R, K, F, T, I>(
492 &self,
493 key: K,
494 from: F,
495 to: T,
496 latest: bool,
497 filter_by_ts: I,
498 filter_by_value: Option<(i64, i64)>,
499 count: Option<u64>,
500 aggregation: Option<RangeAggregation>,
501 ) -> impl Future<Output = FredResult<R>> + Send
502 where
503 R: FromValue,
504 K: Into<Key> + Send,
505 F: TryInto<GetTimestamp> + Send,
506 F::Error: Into<Error> + Send,
507 T: TryInto<GetTimestamp> + Send,
508 T::Error: Into<Error> + Send,
509 I: IntoIterator<Item = i64> + Send,
510 {
511 async move {
512 into!(key);
513 try_into!(from, to);
514 let filter_by_ts = filter_by_ts.into_iter().collect();
515
516 commands::timeseries::ts_revrange(
517 self,
518 key,
519 from,
520 to,
521 latest,
522 filter_by_ts,
523 filter_by_value,
524 count,
525 aggregation,
526 )
527 .await?
528 .convert()
529 }
530 }
531}