rx_rust/operators/utility/timestamp.rs
1use crate::{
2 disposable::subscription::Subscription,
3 observable::Observable,
4 observer::{Observer, Termination},
5 utils::types::NecessarySendSync,
6};
7use educe::Educe;
8use std::time::Instant;
9
10/// Attaches a timestamp to each item emitted by an Observable.
11/// See <https://reactivex.io/documentation/operators/timestamp.html>
12///
13/// # Examples
14/// ```rust
15/// use rx_rust::{
16/// observable::observable_ext::ObservableExt,
17/// observer::{Observer, Termination},
18/// operators::utility::timestamp::Timestamp,
19/// subject::publish_subject::PublishSubject,
20/// };
21/// use std::{convert::Infallible, time::{Duration, Instant}};
22///
23/// let mut timestamped = Vec::new();
24/// let mut terminations = Vec::new();
25/// let mut subject: PublishSubject<'_, i32, Infallible> = PublishSubject::default();
26/// let start = Instant::now();
27///
28/// let subscription = Timestamp::new(subject.clone()).subscribe_with_callback(
29/// |(value, instant)| timestamped.push((value, instant)),
30/// |termination| terminations.push(termination),
31/// );
32///
33/// subject.on_next(1);
34/// subject.on_next(2);
35/// subject.on_termination(Termination::Completed);
36/// drop(subscription);
37///
38/// assert_eq!(
39/// timestamped.iter().map(|(value, _)| *value).collect::<Vec<_>>(),
40/// vec![1, 2]
41/// );
42/// assert!(timestamped[0].1.duration_since(start) >= Duration::from_millis(0));
43/// assert!(timestamped[1].1.duration_since(timestamped[0].1) >= Duration::from_millis(0));
44/// assert_eq!(terminations, vec![Termination::Completed]);
45/// ```
46#[derive(Educe)]
47#[educe(Debug, Clone)]
48pub struct Timestamp<OE> {
49 source: OE,
50}
51
52impl<OE> Timestamp<OE> {
53 pub fn new(source: OE) -> Self {
54 Self { source }
55 }
56}
57
58impl<'or, 'sub, T, E, OE> Observable<'or, 'sub, (T, Instant), E> for Timestamp<OE>
59where
60 OE: Observable<'or, 'sub, T, E>,
61{
62 fn subscribe(
63 self,
64 observer: impl Observer<(T, Instant), E> + NecessarySendSync + 'or,
65 ) -> Subscription<'sub> {
66 let observer = TimestampObserver { observer };
67 self.source.subscribe(observer)
68 }
69}
70
71struct TimestampObserver<OR> {
72 observer: OR,
73}
74
75impl<T, E, OR> Observer<T, E> for TimestampObserver<OR>
76where
77 OR: Observer<(T, Instant), E>,
78{
79 fn on_next(&mut self, value: T) {
80 self.observer.on_next((value, Instant::now()));
81 }
82
83 fn on_termination(self, termination: Termination<E>) {
84 self.observer.on_termination(termination);
85 }
86}