rx_rust/operators/utility/
time_interval.rs

1use crate::{
2    disposable::subscription::Subscription,
3    observable::Observable,
4    observer::{Observer, Termination},
5    utils::types::NecessarySendSync,
6};
7use educe::Educe;
8use std::time::{Duration, Instant};
9
10/// Emits the time elapsed between consecutive emissions from the source Observable.
11/// See <https://reactivex.io/documentation/operators/timeinterval.html>
12///
13/// # Examples
14/// ```rust
15/// use rx_rust::{
16///     observable::observable_ext::ObservableExt,
17///     observer::{Observer, Termination},
18///     operators::utility::time_interval::TimeInterval,
19///     subject::publish_subject::PublishSubject,
20/// };
21/// use std::{convert::Infallible, thread::sleep, time::Duration};
22///
23/// let mut intervals = Vec::new();
24/// let mut terminations = Vec::new();
25/// let mut subject: PublishSubject<'_, i32, Infallible> = PublishSubject::default();
26///
27/// let subscription = TimeInterval::new(subject.clone()).subscribe_with_callback(
28///     |(value, span)| intervals.push((value, span)),
29///     |termination| terminations.push(termination),
30/// );
31///
32/// subject.on_next(1);
33/// sleep(Duration::from_millis(5));
34/// subject.on_next(2);
35/// subject.on_termination(Termination::Completed);
36/// drop(subscription);
37///
38/// assert_eq!(intervals.len(), 2);
39/// assert_eq!(intervals[0].0, 1);
40/// assert_eq!(intervals[1].0, 2);
41/// assert!(intervals[1].1 >= Duration::from_millis(5));
42/// assert_eq!(terminations, vec![Termination::Completed]);
43/// ```
44#[derive(Educe)]
45#[educe(Debug, Clone)]
46pub struct TimeInterval<OE> {
47    source: OE,
48}
49
50impl<OE> TimeInterval<OE> {
51    pub fn new(source: OE) -> Self {
52        Self { source }
53    }
54}
55
56impl<'or, 'sub, T, E, OE> Observable<'or, 'sub, (T, Duration), E> for TimeInterval<OE>
57where
58    OE: Observable<'or, 'sub, T, E>,
59{
60    fn subscribe(
61        self,
62        observer: impl Observer<(T, Duration), E> + NecessarySendSync + 'or,
63    ) -> Subscription<'sub> {
64        let observer = TimeIntervalObserver {
65            observer,
66            time_stamp: Instant::now(),
67        };
68        self.source.subscribe(observer)
69    }
70}
71
72struct TimeIntervalObserver<OR> {
73    observer: OR,
74    time_stamp: Instant,
75}
76
77impl<T, E, OR> Observer<T, E> for TimeIntervalObserver<OR>
78where
79    OR: Observer<(T, Duration), E>,
80{
81    fn on_next(&mut self, value: T) {
82        let time_span = self.time_stamp.elapsed();
83        self.time_stamp = Instant::now();
84        self.observer.on_next((value, time_span));
85    }
86
87    fn on_termination(self, termination: Termination<E>) {
88        self.observer.on_termination(termination);
89    }
90}