rx_rust/operators/creating/
timer.rs

1use crate::utils::types::NecessarySendSync;
2use crate::{
3    disposable::subscription::Subscription,
4    observable::Observable,
5    observer::{Observer, Termination},
6    scheduler::Scheduler,
7};
8use educe::Educe;
9use std::{convert::Infallible, time::Duration};
10
11/// Creates an Observable that emits a single item after a given delay.
12/// See <https://reactivex.io/documentation/operators/timer.html>
13///
14/// # Examples
15/// ```rust
16/// # #[cfg(not(feature = "tokio-scheduler"))]
17/// # fn main() {
18/// #     panic!("Use tokio-scheduler feature to run tests.");
19/// # }
20/// # #[cfg(feature = "tokio-scheduler")]
21/// #[tokio::main]
22/// async fn main() {
23///     use rx_rust::{
24///         observable::observable_ext::ObservableExt,
25///         observer::Termination,
26///         operators::creating::timer::Timer,
27///     };
28///     use std::sync::{Arc, Mutex};
29///     use std::time::Duration;
30///     use tokio::time::sleep;
31///
32///     let handle = tokio::runtime::Handle::current();
33///     let values = Arc::new(Mutex::new(Vec::new()));
34///     let terminations = Arc::new(Mutex::new(Vec::new()));
35///     let values_observer = Arc::clone(&values);
36///     let terminations_observer = Arc::clone(&terminations);
37///
38///     let subscription = Timer::new("tick", Duration::from_millis(5), handle)
39///         .subscribe_with_callback(
40///             move |value| values_observer.lock().unwrap().push(value),
41///             move |termination| terminations_observer
42///                 .lock()
43///                 .unwrap()
44///                 .push(termination),
45///         );
46///
47///     sleep(Duration::from_millis(10)).await;
48///     drop(subscription);
49///
50///     assert_eq!(&*values.lock().unwrap(), &["tick"]);
51///     assert_eq!(
52///         &*terminations.lock().unwrap(),
53///         &[Termination::Completed]
54///     );
55/// }
56/// ```
57#[derive(Educe)]
58#[educe(Debug, Clone)]
59pub struct Timer<T, S> {
60    value: T,
61    delay: Duration,
62    scheduler: S,
63}
64
65impl<T, S> Timer<T, S> {
66    pub fn new(value: T, delay: Duration, scheduler: S) -> Self {
67        Self {
68            value,
69            delay,
70            scheduler,
71        }
72    }
73}
74
75impl<'sub, T, S> Observable<'static, 'sub, T, Infallible> for Timer<T, S>
76where
77    T: NecessarySendSync + 'static,
78    S: Scheduler,
79{
80    fn subscribe(
81        self,
82        mut observer: impl Observer<T, Infallible> + NecessarySendSync + 'static,
83    ) -> Subscription<'sub> {
84        let disposal = self.scheduler.schedule(
85            || {
86                observer.on_next(self.value);
87                observer.on_termination(Termination::Completed);
88            },
89            Some(self.delay),
90        );
91        Subscription::new_with_disposal(disposal)
92    }
93}