redis_zset_ts/
time_series.rs

1// redis-zset-ts/src/time_series.rs
2//
3// Copyright (c) 2024, Frank Pagliughi <fpagliughi@mindspring.com>
4// All Rights Reserved
5//
6// Licensed under the MIT license:
7//   <LICENSE or http://opensource.org/licenses/MIT>
8// This file may not be copied, modified, or distributed except according
9// to those terms.
10//
11
12//! Simple time-series database functionality using Redis Sorted 'Z' Sets.
13//!
14
15use crate::Result;
16use redis::{Client, Commands, Connection};
17use rmp_serde as rmps;
18use serde::{de::DeserializeOwned, Serialize};
19use std::{
20    fmt,
21    marker::PhantomData,
22    ops::{Add, AddAssign, Sub, SubAssign},
23    time::{Duration, SystemTime, UNIX_EPOCH},
24};
25
26/////////////////////////////////////////////////////////////////////////////
27
28/// The timestamp for values in the database.
29///
30/// This is an absolute time point, represented as a floating point time_t
31/// value with arbitrary resolutions, typically in the microsecond range.
32#[derive(Default, Debug, Clone, Copy, PartialEq, PartialOrd)]
33pub struct Timestamp(f64);
34
35impl Timestamp {
36    /// Creates a timestamp for the current time.
37    pub fn now() -> Self {
38        Self::from(SystemTime::now())
39    }
40
41    /// Creates a timestamp with a specific resolution
42    pub fn with_resolution(st: SystemTime, res: Duration) -> Self {
43        Self::with_resolution_f64(st, res.as_secs_f64())
44    }
45
46    /// Creates a timestamp with a resolution specified as floating point
47    /// seconds.
48    ///
49    /// So, for microsecond resolution use 1.0e-6; for millisecond
50    /// resolution, 1.0e-3, etc.
51    pub fn with_resolution_f64(st: SystemTime, res: f64) -> Self {
52        let ts = st
53            .duration_since(UNIX_EPOCH)
54            .expect("Time went backwards")
55            .as_secs_f64();
56        Self(res * (ts / res).round())
57    }
58
59    /// Gets the time stamp as a floating-point time_t value.
60    #[inline]
61    pub fn as_f64(&self) -> f64 {
62        self.0
63    }
64
65    /// Converts the time stamp to a system time value.
66    pub fn into_system_time(self) -> SystemTime {
67        UNIX_EPOCH + Duration::from_secs_f64(self.0)
68    }
69}
70
71impl From<f64> for Timestamp {
72    fn from(val: f64) -> Self {
73        Self(val)
74    }
75}
76
77impl From<SystemTime> for Timestamp {
78    fn from(st: SystemTime) -> Self {
79        let ts = st
80            .duration_since(UNIX_EPOCH)
81            .expect("Time went backwards")
82            .as_secs_f64();
83        Self(1.0e-6 * (ts / 1.0e-6).round())
84    }
85}
86
87impl From<Timestamp> for SystemTime {
88    fn from(ts: Timestamp) -> Self {
89        UNIX_EPOCH + Duration::from_secs_f64(ts.0)
90    }
91}
92
93impl Add<f64> for Timestamp {
94    type Output = Self;
95
96    fn add(self, dur: f64) -> Self::Output {
97        Self(self.0 + dur)
98    }
99}
100
101impl AddAssign<f64> for Timestamp {
102    fn add_assign(&mut self, dur: f64) {
103        self.0 += dur
104    }
105}
106
107impl Sub<f64> for Timestamp {
108    type Output = Self;
109
110    fn sub(self, dur: f64) -> Self::Output {
111        Self(self.0 - dur)
112    }
113}
114
115impl SubAssign<f64> for Timestamp {
116    fn sub_assign(&mut self, dur: f64) {
117        self.0 -= dur
118    }
119}
120
121impl Add<Duration> for Timestamp {
122    type Output = Self;
123
124    fn add(self, dur: Duration) -> Self::Output {
125        Self(self.0 + dur.as_secs_f64())
126    }
127}
128
129impl AddAssign<Duration> for Timestamp {
130    fn add_assign(&mut self, dur: Duration) {
131        self.0 += dur.as_secs_f64()
132    }
133}
134
135impl Sub<Duration> for Timestamp {
136    type Output = Self;
137
138    fn sub(self, dur: Duration) -> Self::Output {
139        Self(self.0 - dur.as_secs_f64())
140    }
141}
142
143impl SubAssign<Duration> for Timestamp {
144    fn sub_assign(&mut self, dur: Duration) {
145        self.0 -= dur.as_secs_f64()
146    }
147}
148
149impl fmt::Display for Timestamp {
150    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
151        write!(f, "{}", self.0)
152    }
153}
154
155/////////////////////////////////////////////////////////////////////////////
156
157/// A timestamped value.
158///
159/// This is a data value along with the time at which it was
160/// created/collected.
161pub struct TimeValue<T> {
162    /// The timestamp at which the value was collected.
163    pub timestamp: Timestamp,
164    /// The value
165    pub value: T,
166}
167
168impl<T> TimeValue<T> {
169    /// Created a new value with the current time
170    pub fn new(val: T) -> Self {
171        Self::with_timestamp(Timestamp::now(), val)
172    }
173
174    /// Creates a value with the specified timestamp
175    pub fn with_timestamp<S>(ts: S, val: T) -> Self
176    where
177        S: Into<Timestamp>,
178    {
179        Self {
180            timestamp: ts.into(),
181            value: val,
182        }
183    }
184
185    /// Converts the value into a tuple
186    pub fn into_tuple(self) -> (Timestamp, T) {
187        (self.timestamp, self.value)
188    }
189
190    /// Converts the value into a tuple
191    pub fn into_tuple_f64(self) -> (f64, T) {
192        (self.timestamp.0, self.value)
193    }
194}
195
196impl<S: Into<Timestamp>, T> From<(S, T)> for TimeValue<T> {
197    fn from(v: (S, T)) -> Self {
198        Self {
199            timestamp: v.0.into(),
200            value: v.1,
201        }
202    }
203}
204
205impl<T: Clone> Clone for TimeValue<T> {
206    fn clone(&self) -> Self {
207        Self {
208            timestamp: self.timestamp,
209            value: self.value.clone(),
210        }
211    }
212}
213
214/// Time values that contain copyable data are also copyable.
215impl<T: Copy> Copy for TimeValue<T> {}
216
217/////////////////////////////////////////////////////////////////////////////
218
219/// Connection to a Redis Time Series
220///
221/// This contains a connection to the Redis database that can be used to
222/// create, delete, and interact with a single time series.
223///
224/// To be completely readable and writeable, the data type, `T` must
225/// implement the traits `Clone`, `serde::Serialize` and
226/// `serde::DeserializeOwned`.
227pub struct TimeSeries<T> {
228    /// The name/key of the Redis sorted set.
229    key: String,
230    /// The Redis client
231    #[allow(dead_code)]
232    cli: Client,
233    /// The Redis connection
234    conn: Connection,
235    /// Placeholder for data type
236    phantom: PhantomData<T>,
237}
238
239impl<T> TimeSeries<T> {
240    /// Creates a new connection to the named time series.
241    pub fn new(namespace: &str, name: &str) -> Result<Self> {
242        Self::with_host("localhost", namespace, name)
243    }
244
245    /// Creates a new connection to the named time series on the specified
246    /// host.
247    pub fn with_host(host: &str, namespace: &str, name: &str) -> Result<Self> {
248        Self::with_uri(&format!("redis://{}/", host), namespace, name)
249    }
250
251    /// Creates a new connection to the named time series on the host with
252    /// the specified URI.
253    pub fn with_uri(uri: &str, namespace: &str, name: &str) -> Result<Self> {
254        let key = if namespace.is_empty() {
255            name.into()
256        }
257        else {
258            format!("{}:{}", namespace, name)
259        };
260        let cli = Client::open(uri)?;
261        let conn = cli.get_connection()?;
262        Ok(Self {
263            key,
264            cli,
265            conn,
266            phantom: PhantomData,
267        })
268    }
269
270    /// Deletes all the values in the time series before the specified
271    /// time stamp, `ts`.
272    pub fn purge_before<S>(&mut self, ts: S) -> Result<()>
273    where
274        S: Into<Timestamp>,
275    {
276        let ts = format!("({}", ts.into());
277        self.conn.zrembyscore(&self.key, "-inf", ts)?;
278        Ok(())
279    }
280
281    /// Deletes all the values in the time series older than the specified
282    /// duration.
283    ///
284    /// This deletes all the data except points that fall in the most recent
285    /// time specified by the duration.
286    pub fn purge_older_than(&mut self, dur: Duration) -> Result<()> {
287        self.purge_before(SystemTime::now() - dur)
288    }
289
290    /// Removes the entire timeseries from the Redis DB.
291    pub fn delete(&mut self) -> Result<()> {
292        self.conn.del(&self.key)?;
293        Ok(())
294    }
295}
296
297impl<T: Serialize> TimeSeries<T> {
298    /// Adds a point to the time series.
299    pub fn add<S>(&mut self, ts: S, val: T) -> Result<()>
300    where
301        S: Into<Timestamp>,
302    {
303        let ts = ts.into();
304        let rval = rmps::encode::to_vec_named(&(ts.0, val))?;
305        self.conn.zadd(&self.key, rval, ts.0)?;
306        Ok(())
307    }
308
309    /// Adds a point to the time series, using the current time as the
310    /// timestamp.
311    #[inline]
312    pub fn add_now(&mut self, val: T) -> Result<()> {
313        self.add(Timestamp::now(), val)
314    }
315
316    /// Adds a point to the time series as a TimeValue.
317    pub fn add_value<V>(&mut self, val: V) -> Result<()>
318    where
319        V: Into<TimeValue<T>>,
320    {
321        let val = val.into();
322        self.add(val.timestamp, val.value)
323    }
324
325    /// Adds multiple points to the time series.
326    pub fn add_multiple(&mut self, vals: &[TimeValue<T>]) -> Result<()> {
327        let rvals: Vec<(f64, Vec<u8>)> = vals
328            .iter()
329            .map(|v| {
330                let rval = rmps::encode::to_vec_named(&(v.timestamp.0, &v.value)).unwrap();
331                (v.timestamp.0, rval)
332            })
333            .collect();
334
335        self.conn.zadd_multiple(&self.key, &rvals)?;
336        Ok(())
337    }
338
339    /// Adds multiple points to the time series from a slice of tuples.
340    pub fn add_multiple_values(&mut self, vals: &[(Timestamp, T)]) -> Result<()> {
341        let rvals: Vec<(f64, Vec<u8>)> = vals
342            .iter()
343            .map(|(ts, v)| (ts.0, rmps::encode::to_vec_named(&(ts.0, v)).unwrap()))
344            .collect();
345
346        self.conn.zadd_multiple(&self.key, &rvals)?;
347        Ok(())
348    }
349}
350
351impl<T: DeserializeOwned> TimeSeries<T> {
352    /// Gets values from an arbitrary time range.
353    /// The timestamps can be floating point values, or anything that can be
354    /// converted to a Redis argument, such as special strings like "-inf",
355    /// "+inf", or "2.0)" to indicate open ranges.
356    pub fn get_range_any<S, U>(&mut self, ts1: S, ts2: U) -> Result<Vec<TimeValue<T>>>
357    where
358        S: redis::ToRedisArgs,
359        U: redis::ToRedisArgs,
360    {
361        let v: Vec<Vec<u8>> = self.conn.zrangebyscore(&self.key, ts1, ts2)?;
362        let vret = v
363            .iter()
364            .map(|buf| rmps::decode::from_slice(buf).unwrap())
365            .map(|(ts, value): (f64, T)| TimeValue {
366                timestamp: Timestamp::from(ts),
367                value,
368            })
369            .collect();
370        Ok(vret)
371    }
372
373    /// Gets values from a time range.
374    /// This gets the values from `ts1` up to, but not including, `ts2`.
375    pub fn get_range<S, U>(&mut self, ts1: S, ts2: U) -> Result<Vec<TimeValue<T>>>
376    where
377        S: Into<Timestamp>,
378        U: Into<Timestamp>,
379    {
380        let ts2 = format!("({}", ts2.into());
381        self.get_range_any::<_, _>(ts1.into().0, ts2)
382    }
383
384    /// Gets values starting from the specified time up to the latest value.
385    pub fn get_from<S>(&mut self, ts: S) -> Result<Vec<TimeValue<T>>>
386    where
387        S: Into<Timestamp>,
388    {
389        self.get_range_any::<_, _>(ts.into().0, "+inf")
390    }
391
392    /// Gets the most recent points for the specified duration.
393    pub fn get_last(&mut self, dur: Duration) -> Result<Vec<TimeValue<T>>> {
394        self.get_from(SystemTime::now() - dur)
395    }
396
397    /// Gets all the values in the series.
398    /// This should be used with caution if the series is large.
399    pub fn get_all(&mut self) -> Result<Vec<TimeValue<T>>> {
400        self.get_range_any::<_, _>("-inf", "+inf")
401    }
402}
403
404/////////////////////////////////////////////////////////////////////////////
405
406#[cfg(test)]
407mod tests {
408    use super::*;
409
410    const NAMESPACE: &str = "redis-zset-ts";
411
412    #[test]
413    fn test_time_value() {
414        let tv = TimeValue::from((0.0, 42));
415        assert_eq!(42, tv.value);
416
417        let tv = tv.into_tuple();
418        assert_eq!(42, tv.1);
419    }
420
421    #[test]
422    fn test_conn() {
423        let mut series = TimeSeries::<i32>::new(NAMESPACE, "conn").unwrap();
424        let _ = series.delete();
425    }
426
427    #[test]
428    fn test_add() {
429        let mut series = TimeSeries::new(NAMESPACE, "add").unwrap();
430        let _ = series.delete();
431        series.add_now(&42).unwrap();
432    }
433
434    #[test]
435    fn test_get() {
436        let mut series = TimeSeries::new(NAMESPACE, "get").unwrap();
437        let _ = series.delete();
438
439        series.add(2.0, 42).unwrap();
440        series.add(3.0, 99).unwrap();
441        series.add_value((4.0, 13)).unwrap();
442
443        let v = series.get_range(2.0, 3.0).unwrap();
444        assert_eq!(1, v.len());
445        assert_eq!(42, v[0].value);
446
447        let v = series.get_range(2.0, 4.0).unwrap();
448        assert_eq!(2, v.len());
449        assert_eq!(42, v[0].value);
450        assert_eq!(99, v[1].value);
451
452        let v = series.get_from(3.0).unwrap();
453        assert_eq!(2, v.len());
454        assert_eq!(99, v[0].value);
455        assert_eq!(13, v[1].value);
456
457        let v = series.get_all().unwrap();
458        assert_eq!(3, v.len());
459        assert_eq!(42, v[0].value);
460        assert_eq!(99, v[1].value);
461        assert_eq!(13, v[2].value);
462    }
463
464    #[test]
465    fn test_purge() {
466        let mut series = TimeSeries::new(NAMESPACE, "purge").unwrap();
467        let _ = series.delete();
468
469        series.add(2.0, 42).unwrap();
470        series.add(3.0, 99).unwrap();
471        series.add_value((4.0, 13)).unwrap();
472
473        series.purge_before(3.0).unwrap();
474
475        let v = series.get_range(1.0, 5.0).unwrap();
476        assert_eq!(2, v.len());
477        assert_eq!(99, v[0].value);
478        assert_eq!(13, v[1].value);
479    }
480}