1use crate::Result;
16use redis::{Client, Commands, Connection};
17use rmp_serde as rmps;
18use serde::{de::DeserializeOwned, Serialize};
19use std::{
20 fmt,
21 marker::PhantomData,
22 ops::{Add, AddAssign, Sub, SubAssign},
23 time::{Duration, SystemTime, UNIX_EPOCH},
24};
25
26#[derive(Default, Debug, Clone, Copy, PartialEq, PartialOrd)]
33pub struct Timestamp(f64);
34
35impl Timestamp {
36 pub fn now() -> Self {
38 Self::from(SystemTime::now())
39 }
40
41 pub fn with_resolution(st: SystemTime, res: Duration) -> Self {
43 Self::with_resolution_f64(st, res.as_secs_f64())
44 }
45
46 pub fn with_resolution_f64(st: SystemTime, res: f64) -> Self {
52 let ts = st
53 .duration_since(UNIX_EPOCH)
54 .expect("Time went backwards")
55 .as_secs_f64();
56 Self(res * (ts / res).round())
57 }
58
59 #[inline]
61 pub fn as_f64(&self) -> f64 {
62 self.0
63 }
64
65 pub fn into_system_time(self) -> SystemTime {
67 UNIX_EPOCH + Duration::from_secs_f64(self.0)
68 }
69}
70
71impl From<f64> for Timestamp {
72 fn from(val: f64) -> Self {
73 Self(val)
74 }
75}
76
77impl From<SystemTime> for Timestamp {
78 fn from(st: SystemTime) -> Self {
79 let ts = st
80 .duration_since(UNIX_EPOCH)
81 .expect("Time went backwards")
82 .as_secs_f64();
83 Self(1.0e-6 * (ts / 1.0e-6).round())
84 }
85}
86
87impl From<Timestamp> for SystemTime {
88 fn from(ts: Timestamp) -> Self {
89 UNIX_EPOCH + Duration::from_secs_f64(ts.0)
90 }
91}
92
93impl Add<f64> for Timestamp {
94 type Output = Self;
95
96 fn add(self, dur: f64) -> Self::Output {
97 Self(self.0 + dur)
98 }
99}
100
101impl AddAssign<f64> for Timestamp {
102 fn add_assign(&mut self, dur: f64) {
103 self.0 += dur
104 }
105}
106
107impl Sub<f64> for Timestamp {
108 type Output = Self;
109
110 fn sub(self, dur: f64) -> Self::Output {
111 Self(self.0 - dur)
112 }
113}
114
115impl SubAssign<f64> for Timestamp {
116 fn sub_assign(&mut self, dur: f64) {
117 self.0 -= dur
118 }
119}
120
121impl Add<Duration> for Timestamp {
122 type Output = Self;
123
124 fn add(self, dur: Duration) -> Self::Output {
125 Self(self.0 + dur.as_secs_f64())
126 }
127}
128
129impl AddAssign<Duration> for Timestamp {
130 fn add_assign(&mut self, dur: Duration) {
131 self.0 += dur.as_secs_f64()
132 }
133}
134
135impl Sub<Duration> for Timestamp {
136 type Output = Self;
137
138 fn sub(self, dur: Duration) -> Self::Output {
139 Self(self.0 - dur.as_secs_f64())
140 }
141}
142
143impl SubAssign<Duration> for Timestamp {
144 fn sub_assign(&mut self, dur: Duration) {
145 self.0 -= dur.as_secs_f64()
146 }
147}
148
149impl fmt::Display for Timestamp {
150 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
151 write!(f, "{}", self.0)
152 }
153}
154
155pub struct TimeValue<T> {
162 pub timestamp: Timestamp,
164 pub value: T,
166}
167
168impl<T> TimeValue<T> {
169 pub fn new(val: T) -> Self {
171 Self::with_timestamp(Timestamp::now(), val)
172 }
173
174 pub fn with_timestamp<S>(ts: S, val: T) -> Self
176 where
177 S: Into<Timestamp>,
178 {
179 Self {
180 timestamp: ts.into(),
181 value: val,
182 }
183 }
184
185 pub fn into_tuple(self) -> (Timestamp, T) {
187 (self.timestamp, self.value)
188 }
189
190 pub fn into_tuple_f64(self) -> (f64, T) {
192 (self.timestamp.0, self.value)
193 }
194}
195
196impl<S: Into<Timestamp>, T> From<(S, T)> for TimeValue<T> {
197 fn from(v: (S, T)) -> Self {
198 Self {
199 timestamp: v.0.into(),
200 value: v.1,
201 }
202 }
203}
204
205impl<T: Clone> Clone for TimeValue<T> {
206 fn clone(&self) -> Self {
207 Self {
208 timestamp: self.timestamp,
209 value: self.value.clone(),
210 }
211 }
212}
213
214impl<T: Copy> Copy for TimeValue<T> {}
216
217pub struct TimeSeries<T> {
228 key: String,
230 #[allow(dead_code)]
232 cli: Client,
233 conn: Connection,
235 phantom: PhantomData<T>,
237}
238
239impl<T> TimeSeries<T> {
240 pub fn new(namespace: &str, name: &str) -> Result<Self> {
242 Self::with_host("localhost", namespace, name)
243 }
244
245 pub fn with_host(host: &str, namespace: &str, name: &str) -> Result<Self> {
248 Self::with_uri(&format!("redis://{}/", host), namespace, name)
249 }
250
251 pub fn with_uri(uri: &str, namespace: &str, name: &str) -> Result<Self> {
254 let key = if namespace.is_empty() {
255 name.into()
256 }
257 else {
258 format!("{}:{}", namespace, name)
259 };
260 let cli = Client::open(uri)?;
261 let conn = cli.get_connection()?;
262 Ok(Self {
263 key,
264 cli,
265 conn,
266 phantom: PhantomData,
267 })
268 }
269
270 pub fn purge_before<S>(&mut self, ts: S) -> Result<()>
273 where
274 S: Into<Timestamp>,
275 {
276 let ts = format!("({}", ts.into());
277 self.conn.zrembyscore(&self.key, "-inf", ts)?;
278 Ok(())
279 }
280
281 pub fn purge_older_than(&mut self, dur: Duration) -> Result<()> {
287 self.purge_before(SystemTime::now() - dur)
288 }
289
290 pub fn delete(&mut self) -> Result<()> {
292 self.conn.del(&self.key)?;
293 Ok(())
294 }
295}
296
297impl<T: Serialize> TimeSeries<T> {
298 pub fn add<S>(&mut self, ts: S, val: T) -> Result<()>
300 where
301 S: Into<Timestamp>,
302 {
303 let ts = ts.into();
304 let rval = rmps::encode::to_vec_named(&(ts.0, val))?;
305 self.conn.zadd(&self.key, rval, ts.0)?;
306 Ok(())
307 }
308
309 #[inline]
312 pub fn add_now(&mut self, val: T) -> Result<()> {
313 self.add(Timestamp::now(), val)
314 }
315
316 pub fn add_value<V>(&mut self, val: V) -> Result<()>
318 where
319 V: Into<TimeValue<T>>,
320 {
321 let val = val.into();
322 self.add(val.timestamp, val.value)
323 }
324
325 pub fn add_multiple(&mut self, vals: &[TimeValue<T>]) -> Result<()> {
327 let rvals: Vec<(f64, Vec<u8>)> = vals
328 .iter()
329 .map(|v| {
330 let rval = rmps::encode::to_vec_named(&(v.timestamp.0, &v.value)).unwrap();
331 (v.timestamp.0, rval)
332 })
333 .collect();
334
335 self.conn.zadd_multiple(&self.key, &rvals)?;
336 Ok(())
337 }
338
339 pub fn add_multiple_values(&mut self, vals: &[(Timestamp, T)]) -> Result<()> {
341 let rvals: Vec<(f64, Vec<u8>)> = vals
342 .iter()
343 .map(|(ts, v)| (ts.0, rmps::encode::to_vec_named(&(ts.0, v)).unwrap()))
344 .collect();
345
346 self.conn.zadd_multiple(&self.key, &rvals)?;
347 Ok(())
348 }
349}
350
351impl<T: DeserializeOwned> TimeSeries<T> {
352 pub fn get_range_any<S, U>(&mut self, ts1: S, ts2: U) -> Result<Vec<TimeValue<T>>>
357 where
358 S: redis::ToRedisArgs,
359 U: redis::ToRedisArgs,
360 {
361 let v: Vec<Vec<u8>> = self.conn.zrangebyscore(&self.key, ts1, ts2)?;
362 let vret = v
363 .iter()
364 .map(|buf| rmps::decode::from_slice(buf).unwrap())
365 .map(|(ts, value): (f64, T)| TimeValue {
366 timestamp: Timestamp::from(ts),
367 value,
368 })
369 .collect();
370 Ok(vret)
371 }
372
373 pub fn get_range<S, U>(&mut self, ts1: S, ts2: U) -> Result<Vec<TimeValue<T>>>
376 where
377 S: Into<Timestamp>,
378 U: Into<Timestamp>,
379 {
380 let ts2 = format!("({}", ts2.into());
381 self.get_range_any::<_, _>(ts1.into().0, ts2)
382 }
383
384 pub fn get_from<S>(&mut self, ts: S) -> Result<Vec<TimeValue<T>>>
386 where
387 S: Into<Timestamp>,
388 {
389 self.get_range_any::<_, _>(ts.into().0, "+inf")
390 }
391
392 pub fn get_last(&mut self, dur: Duration) -> Result<Vec<TimeValue<T>>> {
394 self.get_from(SystemTime::now() - dur)
395 }
396
397 pub fn get_all(&mut self) -> Result<Vec<TimeValue<T>>> {
400 self.get_range_any::<_, _>("-inf", "+inf")
401 }
402}
403
404#[cfg(test)]
407mod tests {
408 use super::*;
409
410 const NAMESPACE: &str = "redis-zset-ts";
411
412 #[test]
413 fn test_time_value() {
414 let tv = TimeValue::from((0.0, 42));
415 assert_eq!(42, tv.value);
416
417 let tv = tv.into_tuple();
418 assert_eq!(42, tv.1);
419 }
420
421 #[test]
422 fn test_conn() {
423 let mut series = TimeSeries::<i32>::new(NAMESPACE, "conn").unwrap();
424 let _ = series.delete();
425 }
426
427 #[test]
428 fn test_add() {
429 let mut series = TimeSeries::new(NAMESPACE, "add").unwrap();
430 let _ = series.delete();
431 series.add_now(&42).unwrap();
432 }
433
434 #[test]
435 fn test_get() {
436 let mut series = TimeSeries::new(NAMESPACE, "get").unwrap();
437 let _ = series.delete();
438
439 series.add(2.0, 42).unwrap();
440 series.add(3.0, 99).unwrap();
441 series.add_value((4.0, 13)).unwrap();
442
443 let v = series.get_range(2.0, 3.0).unwrap();
444 assert_eq!(1, v.len());
445 assert_eq!(42, v[0].value);
446
447 let v = series.get_range(2.0, 4.0).unwrap();
448 assert_eq!(2, v.len());
449 assert_eq!(42, v[0].value);
450 assert_eq!(99, v[1].value);
451
452 let v = series.get_from(3.0).unwrap();
453 assert_eq!(2, v.len());
454 assert_eq!(99, v[0].value);
455 assert_eq!(13, v[1].value);
456
457 let v = series.get_all().unwrap();
458 assert_eq!(3, v.len());
459 assert_eq!(42, v[0].value);
460 assert_eq!(99, v[1].value);
461 assert_eq!(13, v[2].value);
462 }
463
464 #[test]
465 fn test_purge() {
466 let mut series = TimeSeries::new(NAMESPACE, "purge").unwrap();
467 let _ = series.delete();
468
469 series.add(2.0, 42).unwrap();
470 series.add(3.0, 99).unwrap();
471 series.add_value((4.0, 13)).unwrap();
472
473 series.purge_before(3.0).unwrap();
474
475 let v = series.get_range(1.0, 5.0).unwrap();
476 assert_eq!(2, v.len());
477 assert_eq!(99, v[0].value);
478 assert_eq!(13, v[1].value);
479 }
480}