rx_rust/operators/utility/
timestamp.rs

1use crate::{
2    disposable::subscription::Subscription,
3    observable::Observable,
4    observer::{Observer, Termination},
5    utils::types::NecessarySendSync,
6};
7use educe::Educe;
8use std::time::Instant;
9
10/// Attaches a timestamp to each item emitted by an Observable.
11/// See <https://reactivex.io/documentation/operators/timestamp.html>
12///
13/// # Examples
14/// ```rust
15/// use rx_rust::{
16///     observable::observable_ext::ObservableExt,
17///     observer::{Observer, Termination},
18///     operators::utility::timestamp::Timestamp,
19///     subject::publish_subject::PublishSubject,
20/// };
21/// use std::{convert::Infallible, time::{Duration, Instant}};
22///
23/// let mut timestamped = Vec::new();
24/// let mut terminations = Vec::new();
25/// let mut subject: PublishSubject<'_, i32, Infallible> = PublishSubject::default();
26/// let start = Instant::now();
27///
28/// let subscription = Timestamp::new(subject.clone()).subscribe_with_callback(
29///     |(value, instant)| timestamped.push((value, instant)),
30///     |termination| terminations.push(termination),
31/// );
32///
33/// subject.on_next(1);
34/// subject.on_next(2);
35/// subject.on_termination(Termination::Completed);
36/// drop(subscription);
37///
38/// assert_eq!(
39///     timestamped.iter().map(|(value, _)| *value).collect::<Vec<_>>(),
40///     vec![1, 2]
41/// );
42/// assert!(timestamped[0].1.duration_since(start) >= Duration::from_millis(0));
43/// assert!(timestamped[1].1.duration_since(timestamped[0].1) >= Duration::from_millis(0));
44/// assert_eq!(terminations, vec![Termination::Completed]);
45/// ```
46#[derive(Educe)]
47#[educe(Debug, Clone)]
48pub struct Timestamp<OE> {
49    source: OE,
50}
51
52impl<OE> Timestamp<OE> {
53    pub fn new(source: OE) -> Self {
54        Self { source }
55    }
56}
57
58impl<'or, 'sub, T, E, OE> Observable<'or, 'sub, (T, Instant), E> for Timestamp<OE>
59where
60    OE: Observable<'or, 'sub, T, E>,
61{
62    fn subscribe(
63        self,
64        observer: impl Observer<(T, Instant), E> + NecessarySendSync + 'or,
65    ) -> Subscription<'sub> {
66        let observer = TimestampObserver { observer };
67        self.source.subscribe(observer)
68    }
69}
70
71struct TimestampObserver<OR> {
72    observer: OR,
73}
74
75impl<T, E, OR> Observer<T, E> for TimestampObserver<OR>
76where
77    OR: Observer<(T, Instant), E>,
78{
79    fn on_next(&mut self, value: T) {
80        self.observer.on_next((value, Instant::now()));
81    }
82
83    fn on_termination(self, termination: Termination<E>) {
84        self.observer.on_termination(termination);
85    }
86}