rustis/commands/time_series_commands.rs
1use crate::{
2 client::{PreparedCommand, prepare_command},
3 resp::{Response, Value, cmd, serialize_flag},
4};
5use serde::{
6 Deserialize, Serialize,
7 de::{self, value::SeqAccessDeserializer},
8};
9use smallvec::SmallVec;
10use std::{collections::HashMap, fmt};
11
12/// A group of Redis commands related to [`Time Series`](https://redis.io/docs/stack/timeseries/)
13///
14/// # See Also
15/// [Time Series Commands](https://redis.io/commands/?group=timeseries)
16pub trait TimeSeriesCommands<'a>: Sized {
17 /// Append a sample to a time series
18 ///
19 /// # Arguments
20 /// * `key` - key name for the time series.
21 /// * `timestamp` - UNIX sample timestamp in milliseconds or `*` to set the timestamp according to the server clock.
22 /// * `values` - numeric data value of the sample. The double number should follow [`RFC 7159`](https://tools.ietf.org/html/rfc7159)
23 /// (JSON standard). In particular, the parser rejects overly large values that do not fit in binary64. It does not accept `NaN` or `infinite` values.
24 ///
25 /// # Return
26 /// The UNIX sample timestamp in milliseconds
27 ///
28 /// # Notes
29 /// * When specified key does not exist, a new time series is created.
30 /// * if a [`COMPACTION_POLICY`](https://redis.io/docs/stack/timeseries/configuration/#compaction_policy) configuration parameter is defined,
31 /// compacted time series would be created as well.
32 /// * If timestamp is older than the retention period compared to the maximum existing timestamp, the sample is discarded and an error is returned.
33 /// * When adding a sample to a time series for which compaction rules are defined:
34 /// * If all the original samples for an affected aggregated time bucket are available,
35 /// the compacted value is recalculated based on the reported sample and the original samples.
36 /// * If only a part of the original samples for an affected aggregated time bucket is available
37 /// due to trimming caused in accordance with the time series RETENTION policy, the compacted value
38 /// is recalculated based on the reported sample and the available original samples.
39 /// * If the original samples for an affected aggregated time bucket are not available due to trimming
40 /// caused in accordance with the time series RETENTION policy, the compacted value bucket is not updated.
41 /// * Explicitly adding samples to a compacted time series (using [`ts_add`](TimeSeriesCommands::ts_add), [`ts_madd`](TimeSeriesCommands::ts_madd),
42 /// [`ts_incrby`](TimeSeriesCommands::ts_incrby), or [`ts_decrby`](TimeSeriesCommands::ts_decrby)) may result
43 /// in inconsistencies between the raw and the compacted data. The compaction process may override such samples.
44 ///
45 /// # Complexity
46 /// If a compaction rule exits on a time series, the performance of `ts_add` can be reduced.
47 /// The complexity of `ts_add` is always `O(M)`, where `M` is the number of compaction rules or `O(1)` with no compaction.
48 ///
49 /// # See Also
50 /// * [<https://redis.io/commands/ts.add/>](https://redis.io/commands/ts.add/)
51 #[must_use]
52 fn ts_add(
53 self,
54 key: impl Serialize,
55 timestamp: TsTimestamp,
56 value: f64,
57 options: TsAddOptions,
58 ) -> PreparedCommand<'a, Self, u64> {
59 prepare_command(
60 self,
61 cmd("TS.ADD")
62 .key(key)
63 .arg(timestamp)
64 .arg(value)
65 .arg(options),
66 )
67 }
68
69 /// Update the retention, chunk size, duplicate policy, and labels of an existing time series
70 ///
71 /// # Arguments
72 /// * `key` - key name for the time series.
73 /// * `options` - options to alter the existing time series. [`encoding`](TsCreateOptions::encoding) cannot be used on this command.
74 ///
75 /// # Notes
76 /// This command alters only the specified element.
77 /// For example, if you specify only retention and labels, the chunk size and the duplicate policy are not altered.
78 ///
79 /// # See Also
80 /// * [<https://redis.io/commands/ts.alter/>](https://redis.io/commands/ts.alter/)
81 #[must_use]
82 fn ts_alter(
83 self,
84 key: impl Serialize,
85 options: TsCreateOptions,
86 ) -> PreparedCommand<'a, Self, ()> {
87 prepare_command(self, cmd("TS.ALTER").key(key).arg(options))
88 }
89
90 /// Create a new time series
91 ///
92 /// # Arguments
93 /// * `key` - key name for the time series.
94 ///
95 /// # Notes
96 /// * If a key already exists, you get a Redis error reply, TSDB: key already exists.
97 /// You can check for the existence of a key with the [`exists`](crate::commands::GenericCommands::exists) command.
98 /// * Other commands that also create a new time series when called with a key that does not exist are
99 /// [`ts_add`](TimeSeriesCommands::ts_add), [`ts_incrby`](TimeSeriesCommands::ts_incrby), and [`ts_decrby`](TimeSeriesCommands::ts_decrby).
100 ///
101 /// # See Also
102 /// * [<https://redis.io/commands/ts.create/>](https://redis.io/commands/ts.create/)
103 #[must_use]
104 fn ts_create(
105 self,
106 key: impl Serialize,
107 options: TsCreateOptions,
108 ) -> PreparedCommand<'a, Self, ()> {
109 prepare_command(self, cmd("TS.CREATE").key(key).arg(options))
110 }
111
112 /// Create a compaction rule
113 ///
114 /// # Arguments
115 /// * `src_key` - key name for the source time series.
116 /// * `dst_key` - key name for destination (compacted) time series.
117 /// It must be created before `ts_createrule` is called.
118 /// * `aggregator` - aggregates results into time buckets by taking an aggregation type
119 /// * `bucket_duration` - duration of each aggregation bucket, in milliseconds.
120 /// * `options` - See [`TsCreateRuleOptions`](TsCreateRuleOptions)
121 ///
122 /// # Notes
123 /// * Only new samples that are added into the source series after the creation of the rule will be aggregated.
124 /// * Calling `ts_createrule` with a nonempty `dst_key` may result in inconsistencies between the raw and the compacted data.
125 /// * Explicitly adding samples to a compacted time series (using [`ts_add`](TimeSeriesCommands::ts_add),
126 /// [`ts_madd`](TimeSeriesCommands::ts_madd), [`ts_incrby`](TimeSeriesCommands::ts_incrby), or [`ts_decrby`](TimeSeriesCommands::ts_decrby))
127 /// may result in inconsistencies between the raw and the compacted data. The compaction process may override such samples.
128 /// * If no samples are added to the source time series during a bucket period. no compacted sample is added to the destination time series.
129 /// * The timestamp of a compacted sample added to the destination time series is set to the start timestamp the appropriate compaction bucket.
130 /// For example, for a 10-minute compaction bucket with no alignment, the compacted samples timestamps are `x:00`, `x:10`, `x:20`, and so on.
131 ///
132 /// # See Also
133 /// * [<https://redis.io/commands/ts.createrule/>](https://redis.io/commands/ts.createrule/)
134 #[must_use]
135 fn ts_createrule(
136 self,
137 src_key: impl Serialize,
138 dst_key: impl Serialize,
139 aggregator: TsAggregationType,
140 bucket_duration: u64,
141 options: TsCreateRuleOptions,
142 ) -> PreparedCommand<'a, Self, ()> {
143 prepare_command(
144 self,
145 cmd("TS.CREATERULE")
146 .key(src_key)
147 .key(dst_key)
148 .arg("AGGREGATION")
149 .arg(aggregator)
150 .arg(bucket_duration)
151 .arg(options),
152 )
153 }
154
155 /// Decrease the value of the sample with the maximum existing timestamp,
156 /// or create a new sample with a value equal to the value of the sample with the maximum existing timestamp with a given decrement
157 ///
158 /// # Arguments
159 /// * `key` - key name for the time series.
160 /// * `value` - numeric data value of the sample
161 /// * `options` - See [`TsIncrByDecrByOptions`](TsIncrByDecrByOptions)
162 ///
163 /// # Notes
164 /// * When specified key does not exist, a new time series is created.
165 /// * You can use this command as a counter or gauge that automatically gets history as a time series.
166 /// * Explicitly adding samples to a compacted time series (using [`ts_add`](TimeSeriesCommands::ts_add),
167 /// [`ts_madd`](TimeSeriesCommands::ts_madd), [`ts_incrby`](TimeSeriesCommands::ts_incrby),
168 /// or [`ts_decrby`](TimeSeriesCommands::ts_decrby)) may result in inconsistencies between the raw and the compacted data.
169 /// The compaction process may override such samples.
170 ///
171 /// # See Also
172 /// * [<https://redis.io/commands/ts.decrby/>](https://redis.io/commands/ts.decrby/)
173 #[must_use]
174 fn ts_decrby(
175 self,
176 key: impl Serialize,
177 value: f64,
178 options: TsIncrByDecrByOptions,
179 ) -> PreparedCommand<'a, Self, ()> {
180 prepare_command(self, cmd("TS.DECRBY").key(key).arg(value).arg(options))
181 }
182
183 /// Delete all samples between two timestamps for a given time series
184 ///
185 /// # Arguments
186 /// * `key` - key name for the time series.
187 /// * `from_timestamp` - start timestamp for the range deletion.
188 /// * `to_timestamp` - end timestamp for the range deletion.
189 ///
190 /// # Return
191 /// The number of samples that were removed.
192 ///
193 /// # Notes
194 /// The given timestamp interval is closed (inclusive),
195 /// meaning that samples whose timestamp equals the `from_timestamp` or `to_timestamp` are also deleted.
196 ///
197 /// # See Also
198 /// * [<https://redis.io/commands/ts.del/>](https://redis.io/commands/ts.del/)
199 #[must_use]
200 fn ts_del(
201 self,
202 key: impl Serialize,
203 from_timestamp: u64,
204 to_timestamp: u64,
205 ) -> PreparedCommand<'a, Self, usize> {
206 prepare_command(
207 self,
208 cmd("TS.DEL").key(key).arg(from_timestamp).arg(to_timestamp),
209 )
210 }
211
212 /// Delete a compaction rule
213 ///
214 /// # Arguments
215 /// * `src_key` - key name for the source time series.
216 /// * `dst_key` - key name for destination (compacted) time series.
217 ///
218 /// # Notes
219 /// This command does not delete the compacted series.
220 ///
221 /// # See Also
222 /// * [<https://redis.io/commands/ts.deleterule/>](https://redis.io/commands/ts.deleterule/)
223 #[must_use]
224 fn ts_deleterule(
225 self,
226 src_key: impl Serialize,
227 dst_key: impl Serialize,
228 ) -> PreparedCommand<'a, Self, ()> {
229 prepare_command(self, cmd("TS.DELETERULE").key(src_key).key(dst_key))
230 }
231
232 /// Get the last sample
233 ///
234 /// # Arguments
235 /// * `key` - key name for the time series.
236 /// * `options` - See [`TsGetOptions`](TsGetOptions)
237 ///
238 /// # Return
239 /// An option tuple:
240 /// * The last sample timestamp, and last sample value, when the time series contains data.
241 /// * None, when the time series is empty.
242 ///
243 /// # See Also
244 /// * [<https://redis.io/commands/ts.get/>](https://redis.io/commands/ts.get/)
245 #[must_use]
246 fn ts_get(
247 self,
248 key: impl Serialize,
249 options: TsGetOptions,
250 ) -> PreparedCommand<'a, Self, Option<(u64, f64)>> {
251 prepare_command(self, cmd("TS.GET").key(key).arg(options))
252 }
253
254 /// Increase the value of the sample with the maximum existing timestamp,
255 /// or create a new sample with a value equal to the value of the sample
256 /// with the maximum existing timestamp with a given increment
257 ///
258 /// # Arguments
259 /// * `key` - key name for the time series.
260 /// * `value` - numeric data value of the sample
261 /// * `options` - See [`TsIncrByDecrByOptions`](TsIncrByDecrByOptions)
262 ///
263 /// # Notes
264 /// * When specified key does not exist, a new time series is created.
265 /// * You can use this command as a counter or gauge that automatically gets history as a time series.
266 /// * Explicitly adding samples to a compacted time series (using [`ts_add`](TimeSeriesCommands::ts_add),
267 /// [`ts_madd`](TimeSeriesCommands::ts_madd), [`ts_incrby`](TimeSeriesCommands::ts_incrby),
268 /// or [`ts_decrby`](TimeSeriesCommands::ts_decrby)) may result in inconsistencies between the raw and the compacted data.
269 /// The compaction process may override such samples.
270 ///
271 /// # See Also
272 /// * [<https://redis.io/commands/ts.incrby/>](https://redis.io/commands/ts.incrby/)
273 #[must_use]
274 fn ts_incrby(
275 self,
276 key: impl Serialize,
277 value: f64,
278 options: TsIncrByDecrByOptions,
279 ) -> PreparedCommand<'a, Self, u64> {
280 prepare_command(self, cmd("TS.INCRBY").key(key).arg(value).arg(options))
281 }
282
283 /// Return information and statistics for a time series.
284 ///
285 /// # Arguments
286 /// * `key` - key name for the time series.
287 /// * `debug` - an optional flag to get a more detailed information about the chunks.
288 ///
289 /// # Return
290 /// an instance of [`TsInfoResult`](TsInfoResult)
291 ///
292 /// # See Also
293 /// * [<https://redis.io/commands/ts.info/>](https://redis.io/commands/ts.info/)
294 #[must_use]
295 fn ts_info(self, key: impl Serialize, debug: bool) -> PreparedCommand<'a, Self, TsInfoResult> {
296 prepare_command(self, cmd("TS.INFO").key(key).arg_if(debug, "DEBUG"))
297 }
298
299 /// Append new samples to one or more time series
300 ///
301 /// # Arguments
302 /// * `items` - one or more the following tuple:
303 /// * `key` - the key name for the time series.
304 /// * `timestamp` - the UNIX sample timestamp in milliseconds or * to set the timestamp according to the server clock.
305 /// * `value` - numeric data value of the sample (double). \
306 /// The double number should follow [`RFC 7159`](https://tools.ietf.org/html/rfc7159) (a JSON standard).
307 /// The parser rejects overly large values that would not fit in binary64.
308 /// It does not accept `NaN` or `infinite` values.
309 ///
310 /// # Return
311 /// a collection of the timestamps of added samples
312 ///
313 /// # Notes
314 /// * If timestamp is older than the retention period compared to the maximum existing timestamp,
315 /// the sample is discarded and an error is returned.
316 /// * Explicitly adding samples to a compacted time series (using [`ts_add`](TimeSeriesCommands::ts_add),
317 /// [`ts_madd`](TimeSeriesCommands::ts_madd), [`ts_incrby`](TimeSeriesCommands::ts_incrby),
318 /// or [`ts_decrby`](TimeSeriesCommands::ts_decrby)) may result in inconsistencies between the raw and the compacted data.
319 /// The compaction process may override such samples.
320 ///
321 /// # See Also
322 /// * [<https://redis.io/commands/ts.madd/>](https://redis.io/commands/ts.madd/)
323 #[must_use]
324 fn ts_madd<R: Response>(self, items: impl Serialize) -> PreparedCommand<'a, Self, R> {
325 prepare_command(
326 self,
327 cmd("TS.MADD")
328 .key_with_step(items, 3)
329 .cluster_info(None, None, 3),
330 )
331 }
332
333 /// Get the last samples matching a specific filter
334 ///
335 /// # Arguments
336 /// * `options` - See [`TsMGetOptions`](TsMGetOptions)
337 /// * `filters` - filters time series based on their labels and label values, with these options:
338 /// * `label=value`, where `label` equals `value`
339 /// * `label!=value`, where `label` does not equal `value`
340 /// * `label=`, where `key` does not have label `label`
341 /// * `label!=`, where `key` has label `label`
342 /// * `label=(_value1_,_value2_,...)`, where `key` with label `label` equals one of the values in the list
343 /// * `label!=(value1,value2,...)` where `key` with label `label` does not equal any of the values in the list
344 ///
345 /// # Return
346 /// A collection of [`TsSample`](TsSample)
347 ///
348 /// # Notes
349 /// * When using filters, apply a minimum of one label=value filter.
350 /// * Filters are conjunctive. For example, the FILTER `type=temperature room=study`
351 /// means the time series is a temperature time series of a study room.
352 ///
353 /// # See Also
354 /// * [<https://redis.io/commands/ts.mget/>](https://redis.io/commands/ts.mget/)
355 #[must_use]
356 fn ts_mget<R: Response>(
357 self,
358 options: TsMGetOptions,
359 filters: impl Serialize,
360 ) -> PreparedCommand<'a, Self, R> {
361 prepare_command(self, cmd("TS.MGET").arg(options).arg("FILTER").arg(filters))
362 }
363
364 /// Query a range across multiple time series by filters in forward direction
365 ///
366 /// # Arguments
367 /// * `from_timestamp` - start timestamp for the range query (integer UNIX timestamp in milliseconds)
368 /// or `-` to denote the timestamp of the earliest sample in the time series.
369 /// * `to_timestamp` - end timestamp for the range query (integer UNIX timestamp in milliseconds)
370 /// or `+` to denote the timestamp of the latest sample in the time series.
371 /// * `options` - See [`TsMRangeOptions`](TsMRangeOptions)
372 /// * `filters` - filters time series based on their labels and label values, with these options:
373 /// * `label=value`, where `label` equals `value`
374 /// * `label!=value`, where `label` does not equal `value`
375 /// * `label=`, where `key` does not have label `label`
376 /// * `label!=`, where `key` has label `label`
377 /// * `label=(_value1_,_value2_,...)`, where `key` with label `label` equals one of the values in the list
378 /// * `label!=(value1,value2,...)` where `key` with label `label` does not equal any of the values in the list
379 /// * `groupby_options` - See [`TsGroupByOptions`](TsGroupByOptions)
380 ///
381 /// # Return
382 /// A collection of [`TsRangeSample`](TsRangeSample)
383 ///
384 /// # Notes
385 /// * The `ts_mrange` command cannot be part of transaction when running on a Redis cluster.
386 ///
387 /// # See Also
388 /// * [<https://redis.io/commands/ts.mrange/>](https://redis.io/commands/ts.mrange/)
389 #[must_use]
390 fn ts_mrange<R: Response>(
391 self,
392 from_timestamp: impl Serialize,
393 to_timestamp: impl Serialize,
394 options: TsMRangeOptions,
395 filters: impl Serialize,
396 groupby_options: TsGroupByOptions,
397 ) -> PreparedCommand<'a, Self, R> {
398 prepare_command(
399 self,
400 cmd("TS.MRANGE")
401 .arg(from_timestamp)
402 .arg(to_timestamp)
403 .arg(options)
404 .arg("FILTER")
405 .arg(filters)
406 .arg(groupby_options),
407 )
408 }
409
410 /// Query a range across multiple time series by filters in reverse direction
411 ///
412 /// # Arguments
413 /// * `from_timestamp` - start timestamp for the range query (integer UNIX timestamp in milliseconds)
414 /// or `-` to denote the timestamp of the earliest sample in the time series.
415 /// * `to_timestamp` - end timestamp for the range query (integer UNIX timestamp in milliseconds)
416 /// or `+` to denote the timestamp of the latest sample in the time series.
417 /// * `options` - See [`TsMRangeOptions`](TsMRangeOptions)
418 /// * `filters` - filters time series based on their labels and label values, with these options:
419 /// * `label=value`, where `label` equals `value`
420 /// * `label!=value`, where `label` does not equal `value`
421 /// * `label=`, where `key` does not have label `label`
422 /// * `label!=`, where `key` has label `label`
423 /// * `label=(_value1_,_value2_,...)`, where `key` with label `label` equals one of the values in the list
424 /// * `label!=(value1,value2,...)` where `key` with label `label` does not equal any of the values in the list
425 /// * `groupby_options` - See [`TsGroupByOptions`](TsGroupByOptions)
426 ///
427 /// # Return
428 /// A collection of [`TsRangeSample`](TsRangeSample)
429 ///
430 /// # Notes
431 /// * The `ts_mrevrange` command cannot be part of transaction when running on a Redis cluster.
432 ///
433 /// # See Also
434 /// * [<https://redis.io/commands/ts.mrevrange/>](https://redis.io/commands/ts.mrevrange/)
435 #[must_use]
436 fn ts_mrevrange<R: Response>(
437 self,
438 from_timestamp: impl Serialize,
439 to_timestamp: impl Serialize,
440 options: TsMRangeOptions,
441 filters: impl Serialize,
442 groupby_options: TsGroupByOptions,
443 ) -> PreparedCommand<'a, Self, R> {
444 prepare_command(
445 self,
446 cmd("TS.MREVRANGE")
447 .arg(from_timestamp)
448 .arg(to_timestamp)
449 .arg(options)
450 .arg("FILTER")
451 .arg(filters)
452 .arg(groupby_options),
453 )
454 }
455
456 /// Get all time series keys matching a filter list
457 ///
458 /// # Arguments
459 /// * `filters` - filters time series based on their labels and label values, with these options:
460 /// * `label=value`, where `label` equals `value`
461 /// * `label!=value`, where `label` does not equal `value`
462 /// * `label=`, where `key` does not have label `label`
463 /// * `label!=`, where `key` has label `label`
464 /// * `label=(_value1_,_value2_,...)`, where `key` with label `label` equals one of the values in the list
465 /// * `label!=(value1,value2,...)` where `key` with label `label` does not equal any of the values in the list
466 ///
467 /// # Return
468 /// A collection of keys
469 ///
470 /// # Notes
471 /// * When using filters, apply a minimum of one `label=value` filter.
472 /// * `ts_queryindex` cannot be part of a transaction that runs on a Redis cluster.
473 /// * Filters are conjunctive. For example, the FILTER `type=temperature room=study`
474 /// means the a time series is a temperature time series of a study room.
475 ///
476 /// # See Also
477 /// * [<https://redis.io/commands/ts.queryindex/>](https://redis.io/commands/ts.queryindex/)
478 #[must_use]
479 fn ts_queryindex<R: Response>(self, filters: impl Serialize) -> PreparedCommand<'a, Self, R> {
480 prepare_command(self, cmd("TS.QUERYINDEX").arg(filters))
481 }
482
483 /// Query a range in forward direction
484 ///
485 /// # Arguments
486 /// * `key` - the key name for the time series.
487 /// * `from_timestamp` - start timestamp for the range query (integer UNIX timestamp in milliseconds)
488 /// or `-`to denote the timestamp of the earliest sample in the time series.
489 /// * `to_timestamp` - end timestamp for the range query (integer UNIX timestamp in milliseconds)
490 /// or `+` to denote the timestamp of the latest sample in the time series.
491 /// * `options` - See [`TsRangeOptions`](TsRangeOptions)
492 ///
493 /// # Return
494 /// A collection of keys
495 ///
496 /// # Notes
497 /// * When the time series is a compaction,
498 /// the last compacted value may aggregate raw values with timestamp beyond `to_timestamp`.
499 /// That is because `to_timestamp` only limits the timestamp of the compacted value,
500 /// which is the start time of the raw bucket that was compacted.
501 ///
502 /// # See Also
503 /// * [<https://redis.io/commands/ts.range/>](https://redis.io/commands/ts.range/)
504 #[must_use]
505 fn ts_range<R: Response>(
506 self,
507 key: impl Serialize,
508 from_timestamp: impl Serialize,
509 to_timestamp: impl Serialize,
510 options: TsRangeOptions,
511 ) -> PreparedCommand<'a, Self, R> {
512 prepare_command(
513 self,
514 cmd("TS.RANGE")
515 .key(key)
516 .arg(from_timestamp)
517 .arg(to_timestamp)
518 .arg(options),
519 )
520 }
521
522 /// Query a range in reverse direction
523 ///
524 /// # Arguments
525 /// * `key` - the key name for the time series.
526 /// * `from_timestamp` - start timestamp for the range query (integer UNIX timestamp in milliseconds)
527 /// or `-`to denote the timestamp of the earliest sample in the time series.
528 /// * `to_timestamp` - end timestamp for the range query (integer UNIX timestamp in milliseconds)
529 /// or `+` to denote the timestamp of the latest sample in the time series.
530 /// * `options` - See [`TsRangeOptions`](TsRangeOptions)
531 ///
532 /// # Return
533 /// A collection of keys
534 ///
535 /// # Notes
536 /// * When the time series is a compaction,
537 /// the last compacted value may aggregate raw values with timestamp beyond `to_timestamp`.
538 /// That is because `to_timestamp` only limits the timestamp of the compacted value,
539 /// which is the start time of the raw bucket that was compacted.
540 ///
541 /// # See Also
542 /// * [<https://redis.io/commands/ts.revrange/>](https://redis.io/commands/ts.revrange/)
543 #[must_use]
544 fn ts_revrange<R: Response>(
545 self,
546 key: impl Serialize,
547 from_timestamp: impl Serialize,
548 to_timestamp: impl Serialize,
549 options: TsRangeOptions,
550 ) -> PreparedCommand<'a, Self, R> {
551 prepare_command(
552 self,
553 cmd("TS.REVRANGE")
554 .key(key)
555 .arg(from_timestamp)
556 .arg(to_timestamp)
557 .arg(options),
558 )
559 }
560}
561
562/// Options for the [`ts_add`](TimeSeriesCommands::ts_add) command.
563///
564/// # Notes
565/// * You can use this command to add data to a nonexisting time series in a single command.
566/// This is why [`retention`](TsAddOptions::retention), [`encoding`](TsAddOptions::encoding),
567/// [`chunk_size`](TsAddOptions::chunk_size), [`on_duplicate`](TsAddOptions::on_duplicate),
568/// and [`labels`](TsAddOptions::labels) are optional arguments.
569/// * Setting [`retention`](TsAddOptions::retention) and [`labels`](TsAddOptions::labels) introduces additional time complexity.
570#[derive(Default, Serialize)]
571#[serde(rename_all = "UPPERCASE")]
572pub struct TsAddOptions<'a> {
573 #[serde(skip_serializing_if = "Option::is_none")]
574 retention: Option<u64>,
575 #[serde(skip_serializing_if = "Option::is_none")]
576 encoding: Option<TsEncoding>,
577 #[serde(skip_serializing_if = "Option::is_none")]
578 chunk_size: Option<u32>,
579 #[serde(skip_serializing_if = "Option::is_none")]
580 on_duplicate: Option<TsDuplicatePolicy>,
581 #[serde(skip_serializing_if = "SmallVec::is_empty")]
582 labels: SmallVec<[(&'a str, &'a str); 10]>,
583}
584
585impl<'a> TsAddOptions<'a> {
586 /// maximum retention period, compared to the maximum existing timestamp, in milliseconds.
587 ///
588 /// Use it only if you are creating a new time series.
589 /// It is ignored if you are adding samples to an existing time series.
590 /// See [`retention`](TsCreateOptions::retention).
591 #[must_use]
592 pub fn retention(mut self, retention_period: u64) -> Self {
593 self.retention = Some(retention_period);
594 self
595 }
596
597 /// specifies the series sample's encoding format.
598 ///
599 /// Use it only if you are creating a new time series.
600 /// It is ignored if you are adding samples to an existing time series.
601 /// See [`encoding`](TsCreateOptions::encoding).
602 #[must_use]
603 pub fn encoding(mut self, encoding: TsEncoding) -> Self {
604 self.encoding = Some(encoding);
605 self
606 }
607
608 /// memory size, in bytes, allocated for each data chunk.
609 ///
610 /// Use it only if you are creating a new time series.
611 /// It is ignored if you are adding samples to an existing time series.
612 /// See [`chunk_size`](TsCreateOptions::chunk_size).
613 #[must_use]
614 pub fn chunk_size(mut self, chunk_size: u32) -> Self {
615 self.chunk_size = Some(chunk_size);
616 self
617 }
618
619 /// overwrite key and database configuration for
620 /// [`DUPLICATE_POLICY`](https://redis.io/docs/stack/timeseries/configuration/#duplicate_policy)
621 #[must_use]
622 pub fn on_duplicate(mut self, policy: TsDuplicatePolicy) -> Self {
623 self.on_duplicate = Some(policy);
624 self
625 }
626
627 /// set of label-value pairs that represent metadata labels of the time series.
628 ///
629 /// Use it only if you are creating a new time series.
630 /// It is ignored if you are adding samples to an existing time series.
631 /// See [`labels`](TsCreateOptions::labels).
632 ///
633 /// The [`ts_mget`](TimeSeriesCommands::ts_mget), [`ts_mrange`](TimeSeriesCommands::ts_mrange),
634 /// and [`ts_mrevrange`](TimeSeriesCommands::ts_mrevrange) commands operate on multiple time series based on their labels.
635 /// The [`ts_queryindex`](TimeSeriesCommands::ts_queryindex) command returns all time series keys matching a given filter based on their labels.
636 #[must_use]
637 pub fn labels(mut self, labels: impl IntoIterator<Item = (&'a str, &'a str)>) -> Self {
638 self.labels.extend(labels);
639 self
640 }
641}
642
643/// specifies the series samples encoding format.
644///
645/// `Compressed` is almost always the right choice.
646/// Compression not only saves memory but usually improves performance due to a lower number of memory accesses.
647/// It can result in about 90% memory reduction. The exception are highly irregular timestamps or values, which occur rarely.
648///
649/// When not specified, the option is set to `Compressed`.
650#[derive(Serialize)]
651#[serde(rename_all = "UPPERCASE")]
652pub enum TsEncoding {
653 /// applies compression to the series samples.
654 Compressed,
655 /// keeps the raw samples in memory.
656 ///
657 /// Adding this flag keeps data in an uncompressed form.
658 Uncompressed,
659}
660
661/// [`Policy`](https://redis.io/docs/stack/timeseries/configuration/#duplicate_policy)
662/// for handling samples with identical timestamps
663///
664/// It is used with one of the following values:
665#[derive(Debug, Deserialize, Serialize)]
666#[serde(rename_all = "lowercase")]
667pub enum TsDuplicatePolicy {
668 /// ignore any newly reported value and reply with an error
669 Block,
670 /// ignore any newly reported value
671 First,
672 /// override with the newly reported value
673 Last,
674 /// only override if the value is lower than the existing value
675 Min,
676 /// only override if the value is higher than the existing value
677 Max,
678 /// If a previous sample exists, add the new sample to it so that the updated value is equal to (previous + new). ///
679 /// If no previous sample exists, set the updated value equal to the new value.
680 Sum,
681}
682
683/// Options for the [`ts_add`](TimeSeriesCommands::ts_create) command.
684#[derive(Default, Serialize)]
685#[serde(rename_all = "UPPERCASE")]
686pub struct TsCreateOptions<'a> {
687 #[serde(skip_serializing_if = "Option::is_none")]
688 retention: Option<u64>,
689 #[serde(skip_serializing_if = "Option::is_none")]
690 encoding: Option<TsEncoding>,
691 #[serde(skip_serializing_if = "Option::is_none")]
692 chunk_size: Option<u32>,
693 #[serde(skip_serializing_if = "Option::is_none")]
694 duplicate_policy: Option<TsDuplicatePolicy>,
695 #[serde(skip_serializing_if = "SmallVec::is_empty")]
696 labels: SmallVec<[(&'a str, &'a str); 10]>,
697}
698
699impl<'a> TsCreateOptions<'a> {
700 /// maximum age for samples compared to the highest reported timestamp, in milliseconds.
701 ///
702 /// Samples are expired based solely on the difference between their timestamp
703 /// and the timestamps passed to subsequent [`ts_add`](TimeSeriesCommands::ts_add),
704 /// [`ts_madd`](TimeSeriesCommands::ts_madd), [`ts_incrby`](TimeSeriesCommands::ts_incrby),
705 /// and [`ts_decrby`](TimeSeriesCommands::ts_decrby) calls.
706 ///
707 /// When set to 0, samples never expire. When not specified, the option is set to the global
708 /// [`RETENTION_POLICY`](https://redis.io/docs/stack/timeseries/configuration/#retention_policy)
709 /// configuration of the database, which by default is 0.
710 #[must_use]
711 pub fn retention(mut self, retention_period: u64) -> Self {
712 self.retention = Some(retention_period);
713 self
714 }
715
716 /// specifies the series sample's encoding format.
717 #[must_use]
718 pub fn encoding(mut self, encoding: TsEncoding) -> Self {
719 self.encoding = Some(encoding);
720 self
721 }
722
723 /// initial allocation size, in bytes, for the data part of each new chunk. Actual chunks may consume more memory.
724 ///
725 /// Changing chunkSize (using [`ts_alter`](TimeSeriesCommands::ts_alter)) does not affect existing chunks.
726 ///
727 /// Must be a multiple of 8 in the range [48 .. 1048576].
728 /// When not specified, it is set to 4096 bytes (a single memory page).
729 ///
730 /// Note: Before v1.6.10 no minimum was enforced. Between v1.6.10 and v1.6.17 and in v1.8.0 the minimum value was 128.
731 /// Since v1.8.1 the minimum value is 48.
732 ///
733 /// The data in each key is stored in chunks. Each chunk contains header and data for a given timeframe.
734 /// An index contains all chunks. Iterations occur inside each chunk. Depending on your use case, consider these tradeoffs for having smaller or larger sizes of chunks:
735 /// * Insert performance: Smaller chunks result in slower inserts (more chunks need to be created).
736 /// * Query performance: Queries for a small subset when the chunks are very large are slower,
737 /// as we need to iterate over the chunk to find the data.
738 /// * Larger chunks may take more memory when you have a very large number of keys and very few samples per key,
739 /// or less memory when you have many samples per key.
740 ///
741 /// If you are unsure about your use case, select the default.
742 #[must_use]
743 pub fn chunk_size(mut self, chunk_size: u32) -> Self {
744 self.chunk_size = Some(chunk_size);
745 self
746 }
747
748 /// policy for handling insertion ([`ts_add`](TimeSeriesCommands::ts_add) and [`ts_madd`](TimeSeriesCommands::ts_madd))
749 /// of multiple samples with identical timestamps
750 #[must_use]
751 pub fn duplicate_policy(mut self, policy: TsDuplicatePolicy) -> Self {
752 self.duplicate_policy = Some(policy);
753 self
754 }
755
756 /// set of label-value pairs that represent metadata labels of the time series.
757 ///
758 /// Use it only if you are creating a new time series.
759 /// It is ignored if you are adding samples to an existing time series.
760 /// See [`labels`](TsCreateOptions::labels).
761 ///
762 /// The [`ts_mget`](TimeSeriesCommands::ts_mget), [`ts_mrange`](TimeSeriesCommands::ts_mrange),
763 /// and [`ts_mrevrange`](TimeSeriesCommands::ts_mrevrange) commands operate on multiple time series based on their labels.
764 /// The [`ts_queryindex`](TimeSeriesCommands::ts_queryindex) command returns all time series keys matching a given filter based on their labels.
765 #[must_use]
766 pub fn labels(mut self, labels: impl IntoIterator<Item = (&'a str, &'a str)>) -> Self {
767 self.labels.extend(labels);
768 self
769 }
770}
771
772/// Aggregation type for the [`ts_createrule`](TimeSeriesCommands::ts_createrule)
773/// and [`ts_mrange`](TimeSeriesCommands::ts_mrange) commands.
774#[derive(Debug, Serialize, Deserialize)]
775#[serde(rename_all = "UPPERCASE")]
776pub enum TsAggregationType {
777 /// Arithmetic mean of all values
778 Avg,
779 /// Sum of all values
780 Sum,
781 /// Minimum value
782 Min,
783 /// Maximum value
784 Max,
785 /// Difference between the highest and the lowest value
786 Range,
787 /// Number of values
788 Count,
789 /// Value with lowest timestamp in the bucket
790 First,
791 /// Value with highest timestamp in the bucket
792 Last,
793 /// Population standard deviation of the values
794 #[serde(rename = "STD.P")]
795 StdP,
796 /// Sample standard deviation of the values
797 #[serde(rename = "STD.S")]
798 StdS,
799 /// Population variance of the values
800 #[serde(rename = "VAR.P")]
801 VarP,
802 /// Sample variance of the values
803 #[serde(rename = "VAR.S")]
804 VarS,
805 /// Time-weighted average over the bucket's timeframe (since RedisTimeSeries v1.8)
806 Twa,
807}
808
809/// Options for the [`ts_createrule`](TimeSeriesCommands::ts_createrule) command.
810#[derive(Default, Serialize)]
811#[serde(rename_all = "UPPERCASE")]
812pub struct TsCreateRuleOptions {
813 #[serde(rename = "", skip_serializing_if = "Option::is_none")]
814 align_timestamp: Option<u64>,
815}
816
817impl TsCreateRuleOptions {
818 /// ensures that there is a bucket that starts exactly at `align_timestamp`
819 /// and aligns all other buckets accordingly. (since RedisTimeSeries v1.8)
820 ///
821 /// It is expressed in milliseconds.
822 /// The default value is 0 aligned with the epoch.
823 /// For example, if `bucket_duration` is 24 hours (`24 * 3600 * 1000`), setting `align_timestamp`
824 /// to 6 hours after the epoch (`6 * 3600 * 1000`) ensures that each bucket’s timeframe is `[06:00 .. 06:00)`.
825 #[must_use]
826 pub fn align_timestamp(mut self, align_timestamp: u64) -> Self {
827 self.align_timestamp = Some(align_timestamp);
828 self
829 }
830}
831
832/// Options for the [`ts_incrby`](TimeSeriesCommands::ts_incrby)
833/// and [`ts_decrby`](TimeSeriesCommands::ts_decrby) commands.
834///
835/// # Notes
836/// * You can use this command to add data to a nonexisting time series in a single command.
837/// This is why `retention`, `uncompressed`, `chunk_size`, and `labels` are optional arguments.
838/// * When specified and the key doesn't exist, a new time series is created.
839/// Setting the `retention` and `labels` options introduces additional time complexity.
840#[derive(Default, Serialize)]
841#[serde(rename_all = "UPPERCASE")]
842pub struct TsIncrByDecrByOptions<'a> {
843 #[serde(skip_serializing_if = "Option::is_none")]
844 timestamp: Option<TsTimestamp>,
845 #[serde(skip_serializing_if = "Option::is_none")]
846 retention: Option<u64>,
847 #[serde(
848 skip_serializing_if = "std::ops::Not::not",
849 serialize_with = "serialize_flag"
850 )]
851 uncompressed: bool,
852 #[serde(skip_serializing_if = "Option::is_none")]
853 chunk_size: Option<u32>,
854 #[serde(skip_serializing_if = "SmallVec::is_empty")]
855 labels: SmallVec<[(&'a str, &'a str); 10]>,
856}
857
858impl<'a> TsIncrByDecrByOptions<'a> {
859 /// is (integer) UNIX sample timestamp in milliseconds or * to set the timestamp according to the server clock.
860 ///
861 /// timestamp must be equal to or higher than the maximum existing timestamp.
862 /// When equal, the value of the sample with the maximum existing timestamp is decreased.
863 /// If it is higher, a new sample with a timestamp set to timestamp is created,
864 /// and its value is set to the value of the sample with the maximum existing timestamp minus value.
865 ///
866 /// If the time series is empty, the value is set to value.
867 ///
868 /// When not specified, the timestamp is set according to the server clock.
869 #[must_use]
870 pub fn timestamp(mut self, timestamp: TsTimestamp) -> Self {
871 self.timestamp = Some(timestamp);
872 self
873 }
874
875 /// maximum age for samples compared to the highest reported timestamp, in milliseconds.
876 ///
877 /// Use it only if you are creating a new time series.
878 /// It is ignored if you are adding samples to an existing time series
879 ///
880 /// See [`retention`](TsCreateOptions::retention).
881 #[must_use]
882 pub fn retention(mut self, retention_period: u64) -> Self {
883 self.retention = Some(retention_period);
884 self
885 }
886
887 /// changes data storage from compressed (default) to uncompressed.
888 ///
889 /// Use it only if you are creating a new time series.
890 /// It is ignored if you are adding samples to an existing time series.
891 /// See [`encoding`](TsCreateOptions::encoding).
892 #[must_use]
893 pub fn uncompressed(mut self) -> Self {
894 self.uncompressed = true;
895 self
896 }
897
898 /// memory size, in bytes, allocated for each data chunk.
899 ///
900 /// Use it only if you are creating a new time series.
901 /// It is ignored if you are adding samples to an existing time series.
902 /// See [`chunk_size`](TsCreateOptions::chunk_size).
903 #[must_use]
904 pub fn chunk_size(mut self, chunk_size: u32) -> Self {
905 self.chunk_size = Some(chunk_size);
906 self
907 }
908
909 /// set of label-value pairs that represent metadata labels of the time series.
910 ///
911 /// Use it only if you are creating a new time series.
912 /// It is ignored if you are adding samples to an existing time series.
913 /// See [`labels`](TsCreateOptions::labels).
914 #[must_use]
915 pub fn labels(mut self, label: &'a str, value: &'a str) -> Self {
916 self.labels.push((label, value));
917 self
918 }
919}
920
921/// Options for the [`ts_get`](TimeSeriesCommands::ts_get) command.
922#[derive(Default, Serialize)]
923#[serde(rename_all = "UPPERCASE")]
924pub struct TsGetOptions {
925 #[serde(
926 skip_serializing_if = "std::ops::Not::not",
927 serialize_with = "serialize_flag"
928 )]
929 latest: bool,
930}
931
932impl TsGetOptions {
933 /// Used when a time series is a compaction.
934 ///
935 /// With `latest`, [`ts_get`](TimeSeriesCommands::ts_get)
936 /// also reports the compacted value of the latest possibly partial bucket,
937 /// given that this bucket's start time falls within [`from_timestamp`, `to_timestamp`].
938 /// Without `latest`, [`ts_get`](TimeSeriesCommands::ts_get)
939 /// does not report the latest possibly partial bucket.
940 /// When a time series is not a compaction, `latest` is ignored.
941 ///
942 /// The data in the latest bucket of a compaction is possibly partial.
943 /// A bucket is closed and compacted only upon arrival of a new sample that opens a new latest bucket.
944 /// There are cases, however, when the compacted value of the latest possibly partial bucket is also required.
945 /// In such a case, use `latest`.
946 #[must_use]
947 pub fn latest(mut self) -> Self {
948 self.latest = true;
949 self
950 }
951}
952
953/// Result for the [`ts_info`](TimeSeriesCommands::ts_info) command.
954#[derive(Debug, Deserialize)]
955#[serde(rename_all = "camelCase")]
956pub struct TsInfoResult {
957 /// key name
958 pub key_self_name: String,
959 /// Total number of samples in this time series
960 pub total_samples: usize,
961 /// Total number of bytes allocated for this time series, which is the sum of
962 /// * The memory used for storing the series' configuration parameters (retention period, duplication policy, etc.)
963 /// * The memory used for storing the series' compaction rules
964 /// * The memory used for storing the series' labels (key-value pairs)
965 /// * The memory used for storing the chunks (chunk header + compressed/uncompressed data)
966 pub memory_usage: usize,
967 ///First timestamp present in this time series
968 pub first_timestamp: u64,
969 /// Last timestamp present in this time series
970 pub last_timestamp: u64,
971 /// The retention period, in milliseconds, for this time series
972 pub retention_time: u64,
973 /// Number of chunks used for this time series
974 pub chunk_count: usize,
975 /// The initial allocation size, in bytes, for the data part of each new chunk.
976 /// Actual chunks may consume more memory.
977 /// Changing the chunk size (using [`ts_alter`](TimeSeriesCommands::ts_alter)) does not affect existing chunks.
978 pub chunk_size: usize,
979 /// The chunks type: `compressed` or `uncompressed`
980 pub chunk_type: String,
981 /// The [`duplicate policy`](https://redis.io/docs/stack/timeseries/configuration/#duplicate_policy) of this time series
982 pub duplicate_policy: Option<TsDuplicatePolicy>,
983 /// A map of label-value pairs that represent the metadata labels of this time series
984 pub labels: HashMap<String, String>,
985 /// Key name for source time series in case the current series is a target
986 /// of a [`compaction rule`](https://redis.io/commands/ts.createrule/)
987 pub source_key: String,
988 /// A nested array of the [`compaction rules`](https://redis.io/commands/ts.createrule/)
989 /// defined in this time series, with these elements for each rule:
990 /// * The compaction key
991 /// * The bucket duration
992 /// * The aggregator
993 /// * The alignment (since RedisTimeSeries v1.8)
994 #[serde(deserialize_with = "deserialize_compation_rules")]
995 pub rules: Vec<TsCompactionRule>,
996 /// Additional chunk information when the `debug` flag is specified in [`ts_info`](TimeSeriesCommands::ts_info)
997 #[serde(rename = "Chunks")]
998 pub chunks: Option<Vec<TsInfoChunkResult>>,
999 /// Additional values for future versions of the command
1000 #[serde(flatten)]
1001 pub additional_values: HashMap<String, Value>,
1002}
1003
1004/// Additional debug result for the [`ts_info`](TimeSeriesCommands::ts_info) command.
1005#[derive(Debug, Deserialize)]
1006#[serde(rename_all = "camelCase")]
1007pub struct TsInfoChunkResult {
1008 /// First timestamp present in the chunk
1009 pub start_timestamp: i64,
1010 /// Last timestamp present in the chunk
1011 pub end_timestamp: i64,
1012 /// Total number of samples in the chunk
1013 pub samples: usize,
1014 /// The chunk data size in bytes.
1015 /// This is the exact size that used for data only inside the chunk.
1016 /// It does not include other overheads.
1017 pub size: usize,
1018 /// Ratio of `size` and `samples`
1019 pub bytes_per_sample: f64,
1020}
1021
1022/// information about the [`compaction rules`](https://redis.io/commands/ts.createrule/)
1023/// of a time series collection, in the context of the [`ts_info`](TimeSeriesCommands::ts_info) command.
1024#[derive(Debug)]
1025pub struct TsCompactionRule {
1026 /// The compaction key
1027 pub compaction_key: String,
1028 /// The bucket duration
1029 pub bucket_duration: u64,
1030 /// The aggregator
1031 pub aggregator: TsAggregationType,
1032 /// The alignment (since RedisTimeSeries v1.8)
1033 pub alignment: u64,
1034}
1035
1036fn deserialize_compation_rules<'de, D>(deserializer: D) -> Result<Vec<TsCompactionRule>, D::Error>
1037where
1038 D: de::Deserializer<'de>,
1039{
1040 struct Visitor;
1041
1042 impl<'de> de::Visitor<'de> for Visitor {
1043 type Value = Vec<TsCompactionRule>;
1044
1045 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
1046 formatter.write_str("an array of TsCompactionRule")
1047 }
1048
1049 fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
1050 where
1051 A: de::MapAccess<'de>,
1052 {
1053 let mut rules = Vec::with_capacity(map.size_hint().unwrap_or_default());
1054 while let Some(compaction_key) = map.next_key()? {
1055 let (bucket_duration, aggregator, alignment) =
1056 map.next_value::<(u64, TsAggregationType, u64)>()?;
1057 rules.push(TsCompactionRule {
1058 compaction_key,
1059 bucket_duration,
1060 aggregator,
1061 alignment,
1062 });
1063 }
1064
1065 Ok(rules)
1066 }
1067 }
1068
1069 deserializer.deserialize_map(Visitor)
1070}
1071
1072// impl<'de> de::Deserialize<'de> for Vec<TsCompactionRule> {
1073// fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1074// where
1075// D: de::Deserializer<'de> {
1076// struct Visitor;
1077
1078// impl<'de> de::Visitor<'de> for Visitor {
1079// type Value = TsCompactionRule;
1080
1081// fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
1082// formatter.write_str("TsCompactionRule")
1083// }
1084
1085// fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
1086// where
1087// A: de::MapAccess<'de>, {
1088// let Some(entry)
1089// }
1090// }
1091
1092// deserializer.deserialize_map(Visitor)
1093// }
1094// }
1095
1096/// Options for the [`ts_mget`](TimeSeriesCommands::ts_mget) command.
1097#[derive(Default, Serialize)]
1098#[serde(rename_all = "UPPERCASE")]
1099pub struct TsMGetOptions<'a> {
1100 #[serde(
1101 skip_serializing_if = "std::ops::Not::not",
1102 serialize_with = "serialize_flag"
1103 )]
1104 latest: bool,
1105 #[serde(
1106 skip_serializing_if = "std::ops::Not::not",
1107 serialize_with = "serialize_flag"
1108 )]
1109 withlabels: bool,
1110 #[serde(skip_serializing_if = "SmallVec::is_empty")]
1111 selected_labels: SmallVec<[&'a str; 10]>,
1112}
1113
1114impl<'a> TsMGetOptions<'a> {
1115 /// Used when a time series is a compaction.
1116 ///
1117 /// With `latest`, [`ts_mget`](TimeSeriesCommands::ts_mget)
1118 /// also reports the compacted value of the latest possibly partial bucket,
1119 /// given that this bucket's start time falls within [`from_timestamp`, `to_timestamp`].
1120 /// Without `latest`, [`ts_mget`](TimeSeriesCommands::ts_mget)
1121 /// does not report the latest possibly partial bucket.
1122 /// When a time series is not a compaction, `latest` is ignored.
1123 ///
1124 /// The data in the latest bucket of a compaction is possibly partial.
1125 /// A bucket is closed and compacted only upon arrival of a new sample that opens a new latest bucket.
1126 /// There are cases, however, when the compacted value of the latest possibly partial bucket is also required.
1127 /// In such a case, use `latest`.
1128 #[must_use]
1129 pub fn latest(mut self) -> Self {
1130 self.latest = true;
1131 self
1132 }
1133
1134 /// Includes in the reply all label-value pairs representing metadata labels of the time series.
1135 ///
1136 /// If `withlabels` or `selected_labels` are not specified, by default, an empty list is reported as label-value pairs.
1137 #[must_use]
1138 pub fn withlabels(mut self) -> Self {
1139 self.withlabels = true;
1140 self
1141 }
1142
1143 /// returns a subset of the label-value pairs that represent metadata labels of the time series.
1144 ///
1145 /// Use when a large number of labels exists per series, but only the values of some of the labels are required.
1146 /// If `withlabels` or `selected_labels` are not specified, by default, an empty list is reported as label-value pairs.
1147 #[must_use]
1148 pub fn selected_label(mut self, label: &'a str) -> Self {
1149 self.selected_labels.push(label);
1150 self
1151 }
1152}
1153
1154/// Result for the [`ts_mget`](TimeSeriesCommands::ts_mget) command.
1155#[derive(Debug, Deserialize)]
1156pub struct TsSample {
1157 /// Label-value pairs
1158 ///
1159 /// * By default, an empty list is reported
1160 /// * If [`withlabels`](TsMGetOptions::withlabels) is specified, all labels associated with this time series are reported
1161 /// * If [`selected_labels`](TsMGetOptions::selected_labels) is specified, the selected labels are reported
1162 pub labels: HashMap<String, String>,
1163 /// Timestamp-value pairs for all samples/aggregations matching the range
1164 pub timestamp_value: (u64, f64),
1165}
1166
1167/// Options for the [`ts_mrange`](TimeSeriesCommands::ts_mrange) and
1168/// [`ts_mrevrange`](TimeSeriesCommands::ts_mrevrange) commands.
1169#[derive(Default, Serialize)]
1170#[serde(rename_all = "UPPERCASE")]
1171pub struct TsMRangeOptions<'a> {
1172 #[serde(
1173 skip_serializing_if = "std::ops::Not::not",
1174 serialize_with = "serialize_flag"
1175 )]
1176 latest: bool,
1177 #[serde(skip_serializing_if = "Option::is_none")]
1178 filter_by_ts: Option<&'a str>,
1179 #[serde(skip_serializing_if = "Option::is_none")]
1180 filter_by_value: Option<(f64, f64)>,
1181 #[serde(
1182 skip_serializing_if = "std::ops::Not::not",
1183 serialize_with = "serialize_flag"
1184 )]
1185 withlabels: bool,
1186 #[serde(skip_serializing_if = "SmallVec::is_empty")]
1187 selected_labels: SmallVec<[&'a str; 10]>,
1188 #[serde(skip_serializing_if = "Option::is_none")]
1189 count: Option<u32>,
1190 #[serde(skip_serializing_if = "Option::is_none")]
1191 align: Option<&'a str>,
1192 #[serde(skip_serializing_if = "Option::is_none")]
1193 aggregation: Option<(TsAggregationType, u64)>,
1194 #[serde(skip_serializing_if = "Option::is_none")]
1195 buckettimestamp: Option<u64>,
1196 #[serde(
1197 skip_serializing_if = "std::ops::Not::not",
1198 serialize_with = "serialize_flag"
1199 )]
1200 empty: bool,
1201}
1202
1203impl<'a> TsMRangeOptions<'a> {
1204 /// Used when a time series is a compaction.
1205 ///
1206 /// With `latest`, [`ts_mrange`](TimeSeriesCommands::ts_mrange)
1207 /// also reports the compacted value of the latest possibly partial bucket,
1208 /// given that this bucket's start time falls within [`from_timestamp`, `to_timestamp`].
1209 /// Without `latest`, [`ts_mrange`](TimeSeriesCommands::ts_mrange)
1210 /// does not report the latest possibly partial bucket.
1211 /// When a time series is not a compaction, `latest` is ignored.
1212 ///
1213 /// The data in the latest bucket of a compaction is possibly partial.
1214 /// A bucket is closed and compacted only upon arrival of a new sample that opens a new latest bucket.
1215 /// There are cases, however, when the compacted value of the latest possibly partial bucket is also required.
1216 /// In such a case, use `latest`.
1217 #[must_use]
1218 pub fn latest(mut self) -> Self {
1219 self.latest = true;
1220 self
1221 }
1222
1223 /// filters samples by a list of specific timestamps.
1224 ///
1225 /// A sample passes the filter if its exact timestamp is specified and falls within [`from_timestamp`, `to_timestamp`].
1226 #[must_use]
1227 pub fn filter_by_ts(mut self, ts: &'a str) -> Self {
1228 self.filter_by_ts = Some(ts);
1229 self
1230 }
1231
1232 /// filters samples by minimum and maximum values.
1233 #[must_use]
1234 pub fn filter_by_value(mut self, min: f64, max: f64) -> Self {
1235 self.filter_by_value = Some((min, max));
1236 self
1237 }
1238
1239 /// Includes in the reply all label-value pairs representing metadata labels of the time series.
1240 ///
1241 /// If `withlabels` or `selected_labels` are not specified, by default, an empty list is reported as label-value pairs.
1242 #[must_use]
1243 pub fn withlabels(mut self) -> Self {
1244 self.withlabels = true;
1245 self
1246 }
1247
1248 /// returns a subset of the label-value pairs that represent metadata labels of the time series.
1249 ///
1250 /// Use when a large number of labels exists per series, but only the values of some of the labels are required.
1251 /// If `withlabels` or `selected_labels` are not specified, by default, an empty list is reported as label-value pairs.
1252 #[must_use]
1253 pub fn selected_label(mut self, label: &'a str) -> Self {
1254 self.selected_labels.push(label);
1255 self
1256 }
1257
1258 /// limits the number of returned samples.
1259 #[must_use]
1260 pub fn count(mut self, count: u32) -> Self {
1261 self.count = Some(count);
1262 self
1263 }
1264
1265 /// A time bucket alignment control for `aggregation`.
1266 ///
1267 /// It controls the time bucket timestamps by changing the reference timestamp on which a bucket is defined.
1268 ///
1269 /// Values include:
1270 /// * `start` or `-`: The reference timestamp will be the query start interval time (`from_timestamp`) which can't be `-`
1271 /// * `end` or `+`: The reference timestamp will be the query end interval time (`to_timestamp`) which can't be `+`
1272 /// * A specific timestamp: align the reference timestamp to a specific time
1273 ///
1274 /// # Note
1275 /// When not provided, alignment is set to 0.
1276 #[must_use]
1277 pub fn align(mut self, align: &'a str) -> Self {
1278 self.align = Some(align);
1279 self
1280 }
1281
1282 /// Aggregates results into time buckets, where:
1283 /// * `aggregator` - takes a value of [`TsAggregationType`](TsAggregationType)
1284 /// * `bucket_duration` - is duration of each bucket, in milliseconds.
1285 ///
1286 /// Without `align`, bucket start times are multiples of `bucket_duration`.
1287 ///
1288 /// With `align`, bucket start times are multiples of `bucket_duration` with remainder `align` % `bucket_duration`.
1289 ///
1290 /// The first bucket start time is less than or equal to `from_timestamp`.
1291 #[must_use]
1292 pub fn aggregation(mut self, aggregator: TsAggregationType, bucket_duration: u64) -> Self {
1293 self.aggregation = Some((aggregator, bucket_duration));
1294 self
1295 }
1296
1297 /// controls how bucket timestamps are reported.
1298 /// `bucket_timestamp` values include:
1299 /// * `-` or `low` - Timestamp reported for each bucket is the bucket's start time (default)
1300 /// * `+` or `high` - Timestamp reported for each bucket is the bucket's end time
1301 /// * `~` or `mid` - Timestamp reported for each bucket is the bucket's mid time (rounded down if not an integer)
1302 #[must_use]
1303 pub fn bucket_timestamp(mut self, bucket_timestamp: u64) -> Self {
1304 self.buckettimestamp = Some(bucket_timestamp);
1305 self
1306 }
1307
1308 /// A flag, which, when specified, reports aggregations also for empty buckets.
1309 /// when `aggregator` values are:
1310 /// * `sum`, `count` - the value reported for each empty bucket is `0`
1311 /// * `last` - the value reported for each empty bucket is the value
1312 /// of the last sample before the bucket's start.
1313 /// `NaN` when no such sample.
1314 /// * `twa` - the value reported for each empty bucket is the average value
1315 /// over the bucket's timeframe based on linear interpolation
1316 /// of the last sample before the bucket's start and the first sample after the bucket's end.
1317 /// `NaN` when no such samples.
1318 /// * `min`, `max`, `range`, `avg`, `first`, `std.p`, `std.s` - the value reported for each empty bucket is `NaN`
1319 ///
1320 /// Regardless of the values of `from_timestamp` and `to_timestamp`,
1321 /// no data is reported for buckets that end before the earliest sample or begin after the latest sample in the time series.
1322 #[must_use]
1323 pub fn empty(mut self) -> Self {
1324 self.empty = true;
1325 self
1326 }
1327}
1328
1329/// Result for the [`ts_mrange`](TimeSeriesCommands::ts_mrange) and
1330/// [`ts_mrevrange`](TimeSeriesCommands::ts_mrevrange) commands.
1331#[derive(Debug)]
1332pub struct TsRangeSample {
1333 /// Label-value pairs
1334 ///
1335 /// * By default, an empty list is reported
1336 /// * If [`withlabels`](TsMGetOptions::withlabels) is specified, all labels associated with this time series are reported
1337 /// * If [`selected_labels`](TsMGetOptions::selected_labels) is specified, the selected labels are reported
1338 pub labels: Vec<(String, String)>,
1339 pub reducers: Vec<String>,
1340 pub sources: Vec<String>,
1341 pub aggregators: Vec<String>,
1342 /// Timestamp-value pairs for all samples/aggregations matching the range
1343 pub values: Vec<(u64, f64)>,
1344}
1345
1346impl<'de> de::Deserialize<'de> for TsRangeSample {
1347 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1348 where
1349 D: de::Deserializer<'de>,
1350 {
1351 enum TsRangeSampleField {
1352 Aggregators(Vec<String>),
1353 Reducers(Vec<String>),
1354 Sources(Vec<String>),
1355 Values(Vec<(u64, f64)>),
1356 }
1357
1358 impl<'de> de::Deserialize<'de> for TsRangeSampleField {
1359 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1360 where
1361 D: de::Deserializer<'de>,
1362 {
1363 struct Visitor;
1364
1365 impl<'de> de::Visitor<'de> for Visitor {
1366 type Value = TsRangeSampleField;
1367
1368 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
1369 formatter.write_str("TsRangeSampleField")
1370 }
1371
1372 fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
1373 where
1374 A: de::SeqAccess<'de>,
1375 {
1376 Ok(TsRangeSampleField::Values(Vec::<(u64, f64)>::deserialize(
1377 SeqAccessDeserializer::new(seq),
1378 )?))
1379 }
1380
1381 fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
1382 where
1383 A: de::MapAccess<'de>,
1384 {
1385 let (Some((field, value)), None) = (
1386 map.next_entry::<&str, Vec<String>>()?,
1387 map.next_entry::<&str, Vec<String>>()?,
1388 ) else {
1389 return Err(de::Error::invalid_length(0, &"1 in map"));
1390 };
1391
1392 match field {
1393 "reducers" => Ok(TsRangeSampleField::Reducers(value)),
1394 "sources" => Ok(TsRangeSampleField::Sources(value)),
1395 "aggregators" => Ok(TsRangeSampleField::Aggregators(value)),
1396 _ => Err(de::Error::unknown_field(
1397 field,
1398 &["reducers", "sources", "aggregators"],
1399 )),
1400 }
1401 }
1402 }
1403
1404 deserializer.deserialize_any(Visitor)
1405 }
1406 }
1407
1408 struct Visitor;
1409
1410 impl<'de> de::Visitor<'de> for Visitor {
1411 type Value = TsRangeSample;
1412
1413 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
1414 formatter.write_str("TsRangeSample")
1415 }
1416
1417 fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
1418 where
1419 A: de::SeqAccess<'de>,
1420 {
1421 let mut sample = TsRangeSample {
1422 labels: Vec::new(),
1423 reducers: Vec::new(),
1424 sources: Vec::new(),
1425 aggregators: Vec::new(),
1426 values: Vec::new(),
1427 };
1428
1429 let Some(labels) = seq.next_element::<Vec<(String, String)>>()? else {
1430 return Err(de::Error::invalid_length(0, &"more elements in sequence"));
1431 };
1432
1433 sample.labels = labels;
1434
1435 while let Some(field) = seq.next_element::<TsRangeSampleField>()? {
1436 match field {
1437 TsRangeSampleField::Aggregators(aggregators) => {
1438 sample.aggregators = aggregators
1439 }
1440 TsRangeSampleField::Reducers(reducers) => sample.reducers = reducers,
1441 TsRangeSampleField::Sources(sources) => sample.sources = sources,
1442 TsRangeSampleField::Values(values) => sample.values = values,
1443 }
1444 }
1445
1446 Ok(sample)
1447 }
1448 }
1449
1450 deserializer.deserialize_seq(Visitor)
1451 }
1452}
1453
1454/// Options for the [`ts_mrange`](TimeSeriesCommands::ts_mrange) command.
1455#[derive(Serialize)]
1456#[serde(rename_all = "UPPERCASE")]
1457pub struct TsGroupByOptions<'a> {
1458 groupby: &'a str,
1459 reduce: TsAggregationType,
1460}
1461
1462impl<'a> TsGroupByOptions<'a> {
1463 /// aggregates results across different time series, grouped by the provided label name.
1464 ///
1465 /// When combined with [`aggregation`](TsMRangeOptions::aggregation) the groupby/reduce is applied post aggregation stage.
1466 ///
1467 /// # Arguments
1468 /// * `label` - is the label name to group a series by. A new series for each value is produced.
1469 /// * `reducer` - is the reducer type used to aggregate series that share the same label value.
1470 ///
1471 /// # Notes
1472 /// * The produced time series is named `<label>=<groupbyvalue>`
1473 /// * The produced time series contains two labels with these label array structures:
1474 /// * `reducer`, the reducer used
1475 /// * `source`, the time series keys used to compute the grouped series (key1,key2,key3,...)
1476 #[must_use]
1477 pub fn new(label: &'a str, reducer: TsAggregationType) -> Self {
1478 Self {
1479 groupby: label,
1480 reduce: reducer,
1481 }
1482 }
1483}
1484
1485/// Options for the [`ts_range`](TimeSeriesCommands::ts_range) and
1486/// [`ts_revrange`](TimeSeriesCommands::ts_revrange) commands.
1487#[derive(Default, Serialize)]
1488#[serde(rename_all = "UPPERCASE")]
1489pub struct TsRangeOptions<'a> {
1490 #[serde(
1491 skip_serializing_if = "std::ops::Not::not",
1492 serialize_with = "serialize_flag"
1493 )]
1494 latest: bool,
1495 #[serde(skip_serializing_if = "Option::is_none")]
1496 filter_by_ts: Option<&'a str>,
1497 #[serde(skip_serializing_if = "Option::is_none")]
1498 filter_by_value: Option<(f64, f64)>,
1499 #[serde(skip_serializing_if = "Option::is_none")]
1500 count: Option<u32>,
1501 #[serde(skip_serializing_if = "Option::is_none")]
1502 align: Option<&'a str>,
1503 #[serde(skip_serializing_if = "Option::is_none")]
1504 aggregation: Option<(TsAggregationType, u64)>,
1505 #[serde(skip_serializing_if = "Option::is_none")]
1506 buckettimestamp: Option<u64>,
1507 #[serde(
1508 skip_serializing_if = "std::ops::Not::not",
1509 serialize_with = "serialize_flag"
1510 )]
1511 empty: bool,
1512}
1513
1514impl<'a> TsRangeOptions<'a> {
1515 /// Used when a time series is a compaction.
1516 ///
1517 /// With `latest`, [`ts_range`](TimeSeriesCommands::ts_range)
1518 /// also reports the compacted value of the latest possibly partial bucket,
1519 /// given that this bucket's start time falls within [`from_timestamp`, `to_timestamp`].
1520 /// Without `latest`, [`ts_range`](TimeSeriesCommands::ts_range)
1521 /// does not report the latest possibly partial bucket.
1522 /// When a time series is not a compaction, `latest` is ignored.
1523 ///
1524 /// The data in the latest bucket of a compaction is possibly partial.
1525 /// A bucket is closed and compacted only upon arrival of a new sample that opens a new latest bucket.
1526 /// There are cases, however, when the compacted value of the latest possibly partial bucket is also required.
1527 /// In such a case, use `latest`.
1528 #[must_use]
1529 pub fn latest(mut self) -> Self {
1530 self.latest = true;
1531 self
1532 }
1533
1534 /// filters samples by a list of specific timestamps.
1535 ///
1536 /// A sample passes the filter if its exact timestamp is specified and falls within [`from_timestamp`, `to_timestamp`].
1537 #[must_use]
1538 pub fn filter_by_ts(mut self, ts: &'a str) -> Self {
1539 self.filter_by_ts = Some(ts);
1540 self
1541 }
1542
1543 /// filters samples by minimum and maximum values.
1544 #[must_use]
1545 pub fn filter_by_value(mut self, min: f64, max: f64) -> Self {
1546 self.filter_by_value = Some((min, max));
1547 self
1548 }
1549
1550 /// limits the number of returned samples.
1551 #[must_use]
1552 pub fn count(mut self, count: u32) -> Self {
1553 self.count = Some(count);
1554 self
1555 }
1556
1557 /// A time bucket alignment control for `aggregation`.
1558 ///
1559 /// It controls the time bucket timestamps by changing the reference timestamp on which a bucket is defined.
1560 ///
1561 /// Values include:
1562 /// * `start` or `-`: The reference timestamp will be the query start interval time (`from_timestamp`) which can't be `-`
1563 /// * `end` or `+`: The reference timestamp will be the query end interval time (`to_timestamp`) which can't be `+`
1564 /// * A specific timestamp: align the reference timestamp to a specific time
1565 ///
1566 /// # Note
1567 /// When not provided, alignment is set to 0.
1568 #[must_use]
1569 pub fn align(mut self, align: &'a str) -> Self {
1570 self.align = Some(align);
1571 self
1572 }
1573
1574 /// Aggregates results into time buckets, where:
1575 /// * `aggregator` - takes a value of [`TsAggregationType`](TsAggregationType)
1576 /// * `bucket_duration` - is duration of each bucket, in milliseconds.
1577 ///
1578 /// Without `align`, bucket start times are multiples of `bucket_duration`.
1579 ///
1580 /// With `align`, bucket start times are multiples of `bucket_duration` with remainder `align` % `bucket_duration`.
1581 ///
1582 /// The first bucket start time is less than or equal to `from_timestamp`.
1583 #[must_use]
1584 pub fn aggregation(mut self, aggregator: TsAggregationType, bucket_duration: u64) -> Self {
1585 self.aggregation = Some((aggregator, bucket_duration));
1586 self
1587 }
1588
1589 /// controls how bucket timestamps are reported.
1590 /// `bucket_timestamp` values include:
1591 /// * `-` or `low` - Timestamp reported for each bucket is the bucket's start time (default)
1592 /// * `+` or `high` - Timestamp reported for each bucket is the bucket's end time
1593 /// * `~` or `mid` - Timestamp reported for each bucket is the bucket's mid time (rounded down if not an integer)
1594 #[must_use]
1595 pub fn bucket_timestamp(mut self, bucket_timestamp: u64) -> Self {
1596 self.buckettimestamp = Some(bucket_timestamp);
1597 self
1598 }
1599
1600 /// A flag, which, when specified, reports aggregations also for empty buckets.
1601 /// when `aggregator` values are:
1602 /// * `sum`, `count` - the value reported for each empty bucket is `0`
1603 /// * `last` - the value reported for each empty bucket is the value
1604 /// of the last sample before the bucket's start.
1605 /// `NaN` when no such sample.
1606 /// * `twa` - the value reported for each empty bucket is the average value
1607 /// over the bucket's timeframe based on linear interpolation
1608 /// of the last sample before the bucket's start and the first sample after the bucket's end.
1609 /// `NaN` when no such samples.
1610 /// * `min`, `max`, `range`, `avg`, `first`, `std.p`, `std.s` - the value reported for each empty bucket is `NaN`
1611 ///
1612 /// Regardless of the values of `from_timestamp` and `to_timestamp`,
1613 /// no data is reported for buckets that end before the earliest sample or begin after the latest sample in the time series.
1614 #[must_use]
1615 pub fn empty(mut self) -> Self {
1616 self.empty = true;
1617 self
1618 }
1619}
1620
1621/// Timeseries Timestamp
1622pub enum TsTimestamp {
1623 /// User specified timestamp
1624 Value(u64),
1625 /// Unix time of the server clock (*)
1626 ServerClock,
1627}
1628
1629impl Serialize for TsTimestamp {
1630 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1631 where
1632 S: serde::Serializer,
1633 {
1634 match self {
1635 TsTimestamp::Value(ts) => serializer.serialize_u64(*ts),
1636 TsTimestamp::ServerClock => serializer.serialize_str("*"),
1637 }
1638 }
1639}