rtlola_interpreter/configuration/
time.rs

1//! This module contains the different time representations of the interpreter.
2//!
3//! ## Time Representations
4//! The RTLola interpreter supports multiple representations of time in its input and output.
5//! If run in offline mode, meaning the time for an event is parsed from the input source,
6//! the format in which the time is present in the input has to be set. Consider the following example CSV file:
7//!
8//! <pre>
9//! a,b,time
10//! 0,1,0.1
11//! 2,3,0.2
12//! 4,5,0.3
13//! </pre>
14//!
15//! The supported time representations are:
16//!
17//! #### Relative
18//! Time is considered relative to a fixed point in time. Call this point in time `x` then in the example above
19//! the first event gets the timestamp `x + 0.1`, the second one `x + 0.2` and so forth.
20//!
21//! #### Incremental
22//! Time is considered relative to the preceding event. This induces the following timestamps for the above example:
23//!
24//! <pre>
25//! a,b, time
26//! 0,1, x + 0.1
27//! 2,3, x + 0.3
28//! 4,5, x + 0.6
29//! </pre>
30//!
31//! #### Absolute
32//! Time is parsed as absolute timestamps.
33//!
34//! **Note**: The evaluation of periodic streams depends on the time passed between events.
35//! Depending on the representation, determining the time that passed before the first event is not obvious.
36//! While the relative and incremental representations do not strictly need a point of reference to determine
37//! the time passed, the absolute representation requires such a point of reference.
38//! This point of time can either be directly supplied during configuration using the [start_time](crate::ConfigBuilder::start_time) method
39//! or inferred as the time of the first event.
40//! The latter consequently assumes that no time has passed before the first event in the input.
41
42use std::fmt::Debug;
43use std::ops::Sub;
44use std::str::FromStr;
45use std::sync::{Arc, RwLock};
46use std::time::{Duration, SystemTime};
47
48#[cfg(not(feature = "serde"))]
49use humantime::Rfc3339Timestamp;
50use rust_decimal::prelude::ToPrimitive;
51use rust_decimal::Decimal;
52#[cfg(feature = "serde")]
53use serde::{Deserialize, Serialize};
54
55use crate::{CondDeserialize, CondSerialize, Time};
56
57macro_rules! time_conversion_string {
58    ($otr: ty) => {
59        impl TimeConversion<String> for $otr {
60            fn into(from: <Self as TimeRepresentation>::InnerTime) -> String {
61                <$otr>::default().to_string(from)
62            }
63        }
64    };
65}
66
67macro_rules! time_conversion_unit {
68    ($otr: ty) => {
69        impl TimeConversion<()> for $otr {
70            fn into(_from: <Self as TimeRepresentation>::InnerTime) -> () {}
71        }
72    };
73}
74
75macro_rules! time_conversion_duration {
76    ($otr: ty) => {
77        impl TimeConversion<Duration> for $otr {
78            fn into(from: <Self as TimeRepresentation>::InnerTime) -> Duration {
79                <$otr>::default().convert_from(from)
80            }
81        }
82    };
83}
84
85const NANOS_IN_SECOND: u64 = 1_000_000_000;
86
87pub(crate) type StartTime = Arc<RwLock<Option<SystemTime>>>;
88
89/// Precisely parses an duration from a string of the form '{secs}.{sub-secs}'
90pub fn parse_float_time(s: &str) -> Result<Duration, String> {
91    let num = Decimal::from_str(s).map_err(|e| e.to_string())?;
92    let nanos = (num.fract() * Decimal::from(NANOS_IN_SECOND))
93        .to_u32()
94        .ok_or("Could not convert nano seconds")?;
95    let secs = num.trunc().to_u64().ok_or("Could not convert seconds")?;
96    Ok(Duration::new(secs, nanos))
97}
98
99/// The functionality a time format has to provide.
100pub trait TimeRepresentation:
101    TimeMode + Clone + Send + Default + CondSerialize + CondDeserialize + 'static
102{
103    /// The internal representation of the time format.
104    type InnerTime: Debug + Clone + Send + CondSerialize + CondDeserialize;
105
106    /// Convert from the internal time representation to the monitor time.
107    fn convert_from(&mut self, inner: Self::InnerTime) -> Time;
108    /// Convert from monitor time to the internal representation.
109    fn convert_into(&self, ts: Time) -> Self::InnerTime;
110
111    /// Convert the internal representation into a string.
112    fn to_string(&self, ts: Self::InnerTime) -> String;
113    /// Parse the internal representation from a string and convert it into monitor time.
114    fn parse(s: &'_ str) -> Result<Self::InnerTime, String>;
115
116    /// Returns a default start time if applicable for the time representation.
117    fn default_start_time() -> Option<SystemTime> {
118        Some(SystemTime::now())
119    }
120
121    /// Initializes the start time of the time representation.
122    fn init_start_time(&mut self, start: Option<SystemTime>) -> StartTime {
123        Arc::new(RwLock::new(start.or_else(Self::default_start_time)))
124    }
125
126    /// Set an already initialized start time.
127    fn set_start_time(&mut self, _start_time: StartTime) {}
128}
129
130/// Convert the InnerTime of an [OutputTimeRepresentation] to a generic type T.
131pub trait TimeConversion<T>: OutputTimeRepresentation {
132    /// Converts an InnerTime to `T`
133    fn into(from: <Self as TimeRepresentation>::InnerTime) -> T;
134}
135
136impl<O: OutputTimeRepresentation> TimeConversion<O::InnerTime> for O {
137    fn into(from: <Self as TimeRepresentation>::InnerTime) -> O::InnerTime {
138        from
139    }
140}
141
142/// This trait captures whether the time is given explicitly through a timestamp or is indirectly obtained through measurements.
143pub trait TimeMode {
144    /// Returns whether the time [TimeRepresentation] require an explicit timestamp
145    fn requires_timestamp() -> bool {
146        true
147    }
148}
149
150/// This trait captures the subset [TimeRepresentation]s suitable for output.
151pub trait OutputTimeRepresentation: TimeRepresentation {}
152
153/// Time represented as the unsigned number of nanoseconds relative to a fixed start time.
154#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
155#[derive(Debug, Copy, Clone, Default)]
156pub struct RelativeNanos {}
157
158impl TimeRepresentation for RelativeNanos {
159    type InnerTime = u64;
160
161    fn convert_from(&mut self, nanos: Self::InnerTime) -> Time {
162        Duration::from_nanos(nanos)
163    }
164
165    fn convert_into(&self, ts: Time) -> Self::InnerTime {
166        ts.as_nanos() as u64
167    }
168
169    fn to_string(&self, ts: Self::InnerTime) -> String {
170        ts.to_string()
171    }
172
173    fn parse(s: &'_ str) -> Result<u64, String> {
174        u64::from_str(s).map_err(|e| e.to_string())
175    }
176}
177impl OutputTimeRepresentation for RelativeNanos {}
178impl TimeMode for RelativeNanos {}
179
180impl TimeConversion<f64> for RelativeNanos {
181    fn into(from: <Self as TimeRepresentation>::InnerTime) -> f64 {
182        from as f64
183    }
184}
185time_conversion_string!(RelativeNanos);
186time_conversion_duration!(RelativeNanos);
187time_conversion_unit!(RelativeNanos);
188
189/// Time represented as a positive real number representing seconds and sub-seconds relative to a fixed start time.
190/// ie. 5.2
191#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
192#[derive(Debug, Copy, Clone, Default)]
193pub struct RelativeFloat {}
194
195impl TimeRepresentation for RelativeFloat {
196    type InnerTime = Duration;
197
198    fn convert_from(&mut self, ts: Self::InnerTime) -> Time {
199        ts
200    }
201
202    fn convert_into(&self, ts: Time) -> Self::InnerTime {
203        ts
204    }
205
206    fn to_string(&self, ts: Self::InnerTime) -> String {
207        format! {"{}.{:09}", ts.as_secs(), ts.subsec_nanos()}
208    }
209
210    fn parse(s: &str) -> Result<Duration, String> {
211        parse_float_time(s)
212    }
213}
214impl OutputTimeRepresentation for RelativeFloat {}
215impl TimeMode for RelativeFloat {}
216
217time_conversion_string!(RelativeFloat);
218time_conversion_unit!(RelativeFloat);
219impl TimeConversion<f64> for RelativeFloat {
220    fn into(from: <Self as TimeRepresentation>::InnerTime) -> f64 {
221        from.as_secs_f64()
222    }
223}
224
225/// Time represented as the unsigned number in nanoseconds as the offset to the preceding event.
226#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
227#[derive(Debug, Copy, Clone, Default)]
228pub struct OffsetNanos {
229    current: Time,
230    last_time: Time,
231}
232
233impl TimeRepresentation for OffsetNanos {
234    type InnerTime = u64;
235
236    fn convert_from(&mut self, raw: Self::InnerTime) -> Time {
237        self.last_time = self.current;
238        self.current += Duration::from_nanos(raw);
239        self.current
240    }
241
242    fn convert_into(&self, ts: Time) -> Self::InnerTime {
243        ts.sub(self.last_time).as_nanos() as u64
244    }
245
246    fn to_string(&self, ts: Self::InnerTime) -> String {
247        ts.to_string()
248    }
249
250    fn parse(s: &'_ str) -> Result<u64, String> {
251        u64::from_str(s).map_err(|e| e.to_string())
252    }
253}
254impl TimeMode for OffsetNanos {}
255
256/// Time represented as a positive real number representing seconds and sub-seconds as the offset to the preceding event.
257#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
258#[derive(Debug, Copy, Clone, Default)]
259pub struct OffsetFloat {
260    current: Time,
261    last_time: Time,
262}
263
264impl TimeRepresentation for OffsetFloat {
265    type InnerTime = Duration;
266
267    fn convert_from(&mut self, ts: Duration) -> Time {
268        self.last_time = self.current;
269        self.current += ts;
270        self.current
271    }
272
273    fn convert_into(&self, ts: Time) -> Self::InnerTime {
274        ts - self.last_time
275    }
276
277    fn to_string(&self, ts: Self::InnerTime) -> String {
278        format! {"{}.{:09}", ts.as_secs(), ts.subsec_nanos()}
279    }
280
281    fn parse(s: &str) -> Result<Duration, String> {
282        parse_float_time(s)
283    }
284}
285
286impl TimeMode for OffsetFloat {}
287
288/// Time represented as wall clock time given as a positive real number representing seconds and sub-seconds since the start of the Unix Epoch.
289#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
290#[derive(Debug, Clone, Default)]
291pub struct AbsoluteFloat {
292    start: StartTime,
293}
294
295impl TimeRepresentation for AbsoluteFloat {
296    type InnerTime = Duration;
297
298    fn convert_from(&mut self, ts: Duration) -> Time {
299        let current = SystemTime::UNIX_EPOCH + ts;
300        let st_read = *self.start.read().unwrap();
301        if let Some(st) = st_read {
302            current
303                .duration_since(st)
304                .expect("Time did not behave monotonically!")
305        } else {
306            *self.start.write().unwrap() = Some(current);
307            Duration::ZERO
308        }
309    }
310
311    fn convert_into(&self, ts: Time) -> Self::InnerTime {
312        let ts = self.start.read().unwrap().unwrap() + ts;
313        ts.duration_since(SystemTime::UNIX_EPOCH)
314            .expect("Time did not behave monotonically!")
315    }
316
317    fn to_string(&self, ts: Time) -> String {
318        format! {"{}.{:09}", ts.as_secs(), ts.subsec_nanos()}
319    }
320
321    fn parse(s: &str) -> Result<Duration, String> {
322        parse_float_time(s)
323    }
324
325    fn init_start_time(&mut self, start: Option<SystemTime>) -> StartTime {
326        self.start = Arc::new(RwLock::new(start));
327        self.start.clone()
328    }
329
330    fn set_start_time(&mut self, start_time: StartTime) {
331        self.start = start_time;
332    }
333
334    fn default_start_time() -> Option<SystemTime> {
335        None
336    }
337}
338impl OutputTimeRepresentation for AbsoluteFloat {}
339impl TimeMode for AbsoluteFloat {}
340
341time_conversion_string!(AbsoluteFloat);
342time_conversion_unit!(AbsoluteFloat);
343impl TimeConversion<f64> for AbsoluteFloat {
344    fn into(from: <Self as TimeRepresentation>::InnerTime) -> f64 {
345        from.as_secs_f64()
346    }
347}
348
349/// Time represented as wall clock time in RFC3339 format.
350#[cfg(not(feature = "serde"))]
351#[derive(Debug, Clone, Default)]
352pub struct AbsoluteRfc {
353    start: StartTime,
354}
355
356#[cfg(not(feature = "serde"))]
357impl TimeRepresentation for AbsoluteRfc {
358    type InnerTime = Rfc3339Timestamp;
359
360    fn convert_from(&mut self, rfc: Self::InnerTime) -> Time {
361        let current = rfc.get_ref();
362        let st_read = *self.start.read().unwrap();
363        if let Some(st) = st_read {
364            current
365                .duration_since(st)
366                .expect("Time did not behave monotonically!")
367        } else {
368            *self.start.write().unwrap() = Some(*current);
369            Duration::ZERO
370        }
371    }
372
373    fn convert_into(&self, ts: Time) -> Self::InnerTime {
374        let ts = self.start.read().unwrap().unwrap() + ts;
375        humantime::format_rfc3339(ts)
376    }
377
378    fn to_string(&self, ts: Self::InnerTime) -> String {
379        ts.to_string()
380    }
381
382    fn parse(s: &'_ str) -> Result<Self::InnerTime, String> {
383        let ts = humantime::parse_rfc3339(s).map_err(|e| e.to_string())?;
384        Ok(humantime::format_rfc3339(ts))
385    }
386
387    fn init_start_time(&mut self, start: Option<SystemTime>) -> StartTime {
388        self.start = Arc::new(RwLock::new(start));
389        self.start.clone()
390    }
391
392    fn set_start_time(&mut self, start_time: StartTime) {
393        self.start = start_time;
394    }
395
396    fn default_start_time() -> Option<SystemTime> {
397        None
398    }
399}
400#[cfg(not(feature = "serde"))]
401impl OutputTimeRepresentation for AbsoluteRfc {}
402#[cfg(not(feature = "serde"))]
403impl TimeMode for AbsoluteRfc {}
404
405#[cfg(not(feature = "serde"))]
406time_conversion_string!(AbsoluteRfc);
407
408#[cfg(not(feature = "serde"))]
409time_conversion_unit!(AbsoluteRfc);
410
411#[cfg(not(feature = "serde"))]
412impl TimeConversion<f64> for AbsoluteRfc {
413    fn into(from: <Self as TimeRepresentation>::InnerTime) -> f64 {
414        from.get_ref()
415            .duration_since(SystemTime::UNIX_EPOCH)
416            .unwrap()
417            .as_secs_f64()
418    }
419}
420
421/// Time is set to be a fixed delay between input events.
422/// The time given is ignored, and the fixed delay is applied.
423#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
424#[derive(Debug, Clone, Copy, Default)]
425pub struct DelayTime {
426    current: Duration,
427    delay: Duration,
428}
429
430impl DelayTime {
431    /// Creates a new DelayTime with a given delay.
432    pub fn new(delay: Duration) -> Self {
433        DelayTime {
434            current: Default::default(),
435            delay,
436        }
437    }
438}
439
440impl TimeRepresentation for DelayTime {
441    type InnerTime = ();
442
443    fn convert_from(&mut self, _inner: Self::InnerTime) -> Time {
444        self.current += self.delay;
445        self.current
446    }
447
448    fn convert_into(&self, _ts: Time) -> Self::InnerTime {}
449
450    fn to_string(&self, _ts: Self::InnerTime) -> String {
451        format! {"{}.{:09}", self.current.as_secs(), self.current.subsec_nanos()}
452    }
453
454    fn parse(_s: &str) -> Result<(), String> {
455        Ok(())
456    }
457}
458
459impl TimeMode for DelayTime {
460    fn requires_timestamp() -> bool {
461        false
462    }
463}
464
465/// Time is set to be real-time. I.e. the input time is ignored and the current timestamp in rfc3339 format is taken instead.
466#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
467#[derive(Debug, Clone, Default)]
468pub struct RealTime {
469    last_ts: Time,
470    start: StartTime,
471}
472impl TimeRepresentation for RealTime {
473    type InnerTime = ();
474
475    fn convert_from(&mut self, _inner: Self::InnerTime) -> Time {
476        let current = SystemTime::now();
477        let st_read = *self.start.read().unwrap();
478        self.last_ts = if let Some(st) = st_read {
479            current
480                .duration_since(st)
481                .expect("Time did not behave monotonically!")
482        } else {
483            *self.start.write().unwrap() = Some(current);
484            Duration::ZERO
485        };
486        self.last_ts
487    }
488
489    fn convert_into(&self, _ts: Time) -> Self::InnerTime {}
490
491    fn to_string(&self, _ts: Self::InnerTime) -> String {
492        let ts = self.start.read().unwrap().unwrap() + self.last_ts;
493        humantime::format_rfc3339(ts).to_string()
494    }
495
496    fn init_start_time(&mut self, start: Option<SystemTime>) -> StartTime {
497        self.start = Arc::new(RwLock::new(start.or_else(Self::default_start_time)));
498        self.start.clone()
499    }
500
501    fn set_start_time(&mut self, start_time: StartTime) {
502        self.start = start_time;
503    }
504
505    fn parse(_s: &str) -> Result<(), String> {
506        Ok(())
507    }
508}
509
510impl TimeMode for RealTime {
511    fn requires_timestamp() -> bool {
512        false
513    }
514}