rx_rust/operators/utility/time_interval.rs
1use crate::{
2 disposable::subscription::Subscription,
3 observable::Observable,
4 observer::{Observer, Termination},
5 utils::types::NecessarySendSync,
6};
7use educe::Educe;
8use std::time::{Duration, Instant};
9
10/// Emits the time elapsed between consecutive emissions from the source Observable.
11/// See <https://reactivex.io/documentation/operators/timeinterval.html>
12///
13/// # Examples
14/// ```rust
15/// use rx_rust::{
16/// observable::observable_ext::ObservableExt,
17/// observer::{Observer, Termination},
18/// operators::utility::time_interval::TimeInterval,
19/// subject::publish_subject::PublishSubject,
20/// };
21/// use std::{convert::Infallible, thread::sleep, time::Duration};
22///
23/// let mut intervals = Vec::new();
24/// let mut terminations = Vec::new();
25/// let mut subject: PublishSubject<'_, i32, Infallible> = PublishSubject::default();
26///
27/// let subscription = TimeInterval::new(subject.clone()).subscribe_with_callback(
28/// |(value, span)| intervals.push((value, span)),
29/// |termination| terminations.push(termination),
30/// );
31///
32/// subject.on_next(1);
33/// sleep(Duration::from_millis(5));
34/// subject.on_next(2);
35/// subject.on_termination(Termination::Completed);
36/// drop(subscription);
37///
38/// assert_eq!(intervals.len(), 2);
39/// assert_eq!(intervals[0].0, 1);
40/// assert_eq!(intervals[1].0, 2);
41/// assert!(intervals[1].1 >= Duration::from_millis(5));
42/// assert_eq!(terminations, vec![Termination::Completed]);
43/// ```
44#[derive(Educe)]
45#[educe(Debug, Clone)]
46pub struct TimeInterval<OE> {
47 source: OE,
48}
49
50impl<OE> TimeInterval<OE> {
51 pub fn new(source: OE) -> Self {
52 Self { source }
53 }
54}
55
56impl<'or, 'sub, T, E, OE> Observable<'or, 'sub, (T, Duration), E> for TimeInterval<OE>
57where
58 OE: Observable<'or, 'sub, T, E>,
59{
60 fn subscribe(
61 self,
62 observer: impl Observer<(T, Duration), E> + NecessarySendSync + 'or,
63 ) -> Subscription<'sub> {
64 let observer = TimeIntervalObserver {
65 observer,
66 time_stamp: Instant::now(),
67 };
68 self.source.subscribe(observer)
69 }
70}
71
72struct TimeIntervalObserver<OR> {
73 observer: OR,
74 time_stamp: Instant,
75}
76
77impl<T, E, OR> Observer<T, E> for TimeIntervalObserver<OR>
78where
79 OR: Observer<(T, Duration), E>,
80{
81 fn on_next(&mut self, value: T) {
82 let time_span = self.time_stamp.elapsed();
83 self.time_stamp = Instant::now();
84 self.observer.on_next((value, time_span));
85 }
86
87 fn on_termination(self, termination: Termination<E>) {
88 self.observer.on_termination(termination);
89 }
90}