rxr/observable/timestamp.rs
1//! Utilities for timestamping observable emissions.
2//!
3//! Provides the `TimestampedEmit` type for pairing values emitted by an `Observable`
4//! with their corresponding timestamps.
5
6use std::{
7 fmt::Display,
8 time::{Duration, SystemTime, SystemTimeError},
9};
10
11/// Represents a timestamped object created from an observable stream.
12///
13/// This type is emitted by the `Observable` created by the [`timestamp()`] operator,
14/// which maps each value emitted by the source observable to a corresponding
15/// timestamp. The `value` property holds the emitted value along with its type,
16/// while the `timestamp` is generated using `SystemTime::now()`, representing the
17/// duration since the `UNIX_EPOCH` in milliseconds.
18///
19/// [`timestamp()`]: ../trait.ObservableExt.html#method.timestamp
20#[derive(Clone, Debug)]
21pub struct TimestampedEmit<T> {
22 pub value: T,
23 pub timestamp: u128,
24 error: Option<&'static str>,
25 sys_time_error: Option<SystemTimeError>,
26}
27
28impl<T> TimestampedEmit<T> {
29 /// If `SystemTimeError` occurred during timestamping emitted values, get
30 /// duration which represents how far forward the time of the emit was from the
31 /// `UNIX_EPOCH`, otherwise return `None`.
32 pub fn error_duration(&self) -> Option<Duration> {
33 if let Some(ste) = &self.sys_time_error {
34 return Some(ste.duration());
35 }
36 None
37 }
38
39 /// Returns an error string if the system time moved backwards, such as from a
40 /// misconfigured system clock; otherwise, returns `None`.
41 pub fn error_str(&self) -> Option<&'static str> {
42 self.error
43 }
44
45 /// A convenience method to check if the emitted value is mapped with a proper timestamp.
46 pub fn is_error(&self) -> bool {
47 if self.error.is_none() {
48 return false;
49 }
50 true
51 }
52
53 pub(super) fn new(value: T) -> TimestampedEmit<T> {
54 let mut error = None;
55 let mut sys_time_error = None;
56
57 let timestamp = SystemTime::now()
58 .duration_since(SystemTime::UNIX_EPOCH)
59 .unwrap_or_else(|ste| {
60 error = Some("Error: Attempted to calculate a time before the Unix epoch. Your system clock might be misconfigured.");
61 sys_time_error = Some(ste);
62
63 // Using fallback timestamp of 0.
64 Duration::new(0, 0)
65 })
66 .as_millis();
67
68 TimestampedEmit {
69 value,
70 timestamp,
71 error,
72 sys_time_error,
73 }
74 }
75}
76
77impl<T: Display> Display for TimestampedEmit<T> {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 write!(f, "value: {}\ntimestamp: {}", self.value, self.timestamp)
80 }
81}