rx_rust/operators/creating/
interval.rs

1use crate::utils::types::NecessarySendSync;
2use crate::{
3    disposable::subscription::Subscription, observable::Observable, observer::Observer,
4    scheduler::Scheduler,
5};
6use educe::Educe;
7use std::{convert::Infallible, time::Duration};
8
9/// Creates an Observable that emits a sequence of integers spaced by a given time interval.
10/// See <https://reactivex.io/documentation/operators/interval.html>
11///
12/// # Examples
13/// ```rust
14/// # #[cfg(not(feature = "tokio-scheduler"))]
15/// # fn main() {
16/// #     panic!("Use tokio-scheduler feature to run tests.");
17/// # }
18/// # #[cfg(feature = "tokio-scheduler")]
19/// #[tokio::main]
20/// async fn main() {
21///     use rx_rust::{
22///         observable::observable_ext::ObservableExt,
23///         observer::Termination,
24///         operators::creating::interval::Interval,
25///     };
26///     use std::sync::{Arc, Mutex};
27///     use std::time::Duration;
28///     use tokio::time::sleep;
29///
30///     let handle = tokio::runtime::Handle::current();
31///     let values = Arc::new(Mutex::new(Vec::new()));
32///     let terminations = Arc::new(Mutex::new(Vec::new()));
33///     let values_observer = Arc::clone(&values);
34///     let terminations_observer = Arc::clone(&terminations);
35///     let subscription = Interval::new(Duration::from_millis(1), handle, None)
36///         .take(3)
37///         .subscribe_with_callback(
38///             move |value| values_observer.lock().unwrap().push(value),
39///             move |termination| terminations_observer
40///                 .lock()
41///                 .unwrap()
42///                 .push(termination),
43///         );
44///
45///     sleep(Duration::from_millis(10)).await;
46///     drop(subscription);
47///
48///     assert_eq!(&*values.lock().unwrap(), &[0, 1, 2]);
49///     assert_eq!(
50///         &*terminations.lock().unwrap(),
51///         &[Termination::Completed]
52///     );
53/// }
54/// ```
55#[derive(Educe)]
56#[educe(Debug, Clone)]
57pub struct Interval<S> {
58    period: Duration,
59    scheduler: S,
60    delay: Option<Duration>,
61}
62
63impl<S> Interval<S> {
64    pub fn new(period: Duration, scheduler: S, delay: Option<Duration>) -> Self {
65        Self {
66            period,
67            scheduler,
68            delay,
69        }
70    }
71}
72
73impl<'sub, S> Observable<'static, 'sub, usize, Infallible> for Interval<S>
74where
75    S: Scheduler,
76{
77    fn subscribe(
78        self,
79        mut observer: impl Observer<usize, Infallible> + NecessarySendSync + 'static,
80    ) -> Subscription<'sub> {
81        let disposal = self.scheduler.schedule_periodically(
82            move |count| {
83                observer.on_next(count);
84                false
85            },
86            self.period,
87            self.delay,
88        );
89        Subscription::new_with_disposal(disposal)
90    }
91}