rxr/observable/
timestamp.rs

1//! Utilities for timestamping observable emissions.
2//!
3//! Provides the `TimestampedEmit` type for pairing values emitted by an `Observable`
4//! with their corresponding timestamps.
5
6use std::{
7    fmt::Display,
8    time::{Duration, SystemTime, SystemTimeError},
9};
10
11/// Represents a timestamped object created from an observable stream.
12///
13/// This type is emitted by the `Observable` created by the [`timestamp()`] operator,
14/// which maps each value emitted by the source observable to a corresponding
15/// timestamp. The `value` property holds the emitted value along with its type,
16/// while the `timestamp` is generated using `SystemTime::now()`, representing the
17/// duration since the `UNIX_EPOCH` in milliseconds.
18///
19/// [`timestamp()`]: ../trait.ObservableExt.html#method.timestamp
20#[derive(Clone, Debug)]
21pub struct TimestampedEmit<T> {
22    pub value: T,
23    pub timestamp: u128,
24    error: Option<&'static str>,
25    sys_time_error: Option<SystemTimeError>,
26}
27
28impl<T> TimestampedEmit<T> {
29    /// If `SystemTimeError` occurred during timestamping emitted values, get
30    /// duration which represents how far forward the time of the emit was from the
31    /// `UNIX_EPOCH`, otherwise return `None`.
32    pub fn error_duration(&self) -> Option<Duration> {
33        if let Some(ste) = &self.sys_time_error {
34            return Some(ste.duration());
35        }
36        None
37    }
38
39    /// Returns an error string if the system time moved backwards, such as from a
40    /// misconfigured system clock; otherwise, returns `None`.
41    pub fn error_str(&self) -> Option<&'static str> {
42        self.error
43    }
44
45    /// A convenience method to check if the emitted value is mapped with a proper timestamp.
46    pub fn is_error(&self) -> bool {
47        if self.error.is_none() {
48            return false;
49        }
50        true
51    }
52
53    pub(super) fn new(value: T) -> TimestampedEmit<T> {
54        let mut error = None;
55        let mut sys_time_error = None;
56
57        let timestamp = SystemTime::now()
58            .duration_since(SystemTime::UNIX_EPOCH)
59            .unwrap_or_else(|ste| {
60                error = Some("Error: Attempted to calculate a time before the Unix epoch. Your system clock might be misconfigured.");
61                sys_time_error = Some(ste);
62
63                // Using fallback timestamp of 0.
64                Duration::new(0, 0)
65            })
66            .as_millis();
67
68        TimestampedEmit {
69            value,
70            timestamp,
71            error,
72            sys_time_error,
73        }
74    }
75}
76
77impl<T: Display> Display for TimestampedEmit<T> {
78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79        write!(f, "value: {}\ntimestamp: {}", self.value, self.timestamp)
80    }
81}