rx_rust/operators/utility/
delay.rs

1use crate::disposable::Disposable;
2use crate::disposable::boxed_disposal::BoxedDisposal;
3use crate::disposable::subscription::Subscription;
4use crate::scheduler::RecursionAction;
5use crate::utils::types::{Mutable, MutableHelper, NecessarySendSync, Shared};
6use crate::{
7    observable::Observable,
8    observer::{Observer, Termination},
9    scheduler::Scheduler,
10};
11use crate::{safe_lock_option, safe_lock_option_disposable, safe_lock_option_observer};
12use educe::Educe;
13use std::{
14    collections::VecDeque,
15    time::{Duration, Instant},
16};
17
18/// Shifts the emissions from an Observable forward in time by a specified duration.
19/// See <https://reactivex.io/documentation/operators/delay.html>
20///
21/// # Examples
22/// ```rust
23/// # #[cfg(not(feature = "tokio-scheduler"))]
24/// # fn main() {
25/// #     panic!("Use tokio-scheduler feature to run tests.");
26/// # }
27/// # #[cfg(feature = "tokio-scheduler")]
28/// #[tokio::main]
29/// async fn main() {
30///     use rx_rust::{
31///         observable::observable_ext::ObservableExt,
32///         observer::Termination,
33///         operators::{
34///             creating::from_iter::FromIter,
35///             utility::delay::Delay,
36///         },
37///     };
38///     use std::{
39///         sync::{Arc, Mutex},
40///         time::Duration,
41///     };
42///     use tokio::time::sleep;
43///
44///     let handle = tokio::runtime::Handle::current();
45///     let values = Arc::new(Mutex::new(Vec::new()));
46///     let terminations = Arc::new(Mutex::new(Vec::new()));
47///     let values_observer = Arc::clone(&values);
48///     let terminations_observer = Arc::clone(&terminations);
49///
50///     let subscription = Delay::new(
51///         FromIter::new(vec![1, 2, 3]),
52///         Duration::from_millis(5),
53///         handle.clone(),
54///     )
55///     .subscribe_with_callback(
56///         move |value| values_observer.lock().unwrap().push(value),
57///         move |termination| terminations_observer
58///             .lock()
59///             .unwrap()
60///             .push(termination),
61///     );
62///
63///     sleep(Duration::from_millis(10)).await;
64///     drop(subscription);
65///
66///     assert_eq!(&*values.lock().unwrap(), &[1, 2, 3]);
67///     assert_eq!(
68///         &*terminations.lock().unwrap(),
69///         &[Termination::Completed]
70///     );
71/// }
72/// ```
73#[derive(Educe)]
74#[educe(Debug, Clone)]
75pub struct Delay<OE, S> {
76    source: OE,
77    delay: Duration,
78    scheduler: S,
79}
80
81impl<OE, S> Delay<OE, S> {
82    pub fn new(source: OE, delay: Duration, scheduler: S) -> Self {
83        Self {
84            source,
85            delay,
86            scheduler,
87        }
88    }
89}
90
91impl<'or, 'sub, T, E, OE, S> Observable<'static, 'sub, T, E> for Delay<OE, S>
92where
93    T: NecessarySendSync + 'static,
94    OE: Observable<'or, 'sub, T, E>,
95    S: Scheduler,
96{
97    fn subscribe(
98        self,
99        observer: impl Observer<T, E> + NecessarySendSync + 'static,
100    ) -> Subscription<'sub> {
101        let context = Shared::new(Mutable::new(DelayContext {
102            values: VecDeque::new(),
103            timer: None,
104        }));
105        let delay_observer = DelayObserver {
106            delay: self.delay,
107            scheduler: self.scheduler,
108            context: context.clone(),
109            observer: Shared::new(Mutable::new(Some(observer))),
110        };
111        self.source.subscribe(delay_observer) + context
112    }
113}
114
115struct DelayContext<T> {
116    values: VecDeque<(Instant, Option<T>)>, // None means completed
117    timer: Option<BoxedDisposal<'static>>,
118}
119
120impl<T> Disposable for Shared<Mutable<DelayContext<T>>> {
121    fn dispose(self) {
122        safe_lock_option_disposable!(dispose: self, timer);
123    }
124}
125
126struct DelayObserver<T, OR, S> {
127    delay: Duration,
128    scheduler: S,
129    context: Shared<Mutable<DelayContext<T>>>,
130    observer: Shared<Mutable<Option<OR>>>, // None means terminated or disposed
131}
132
133impl<T, OR, S> DelayObserver<T, OR, S> {
134    fn emit_value_and_setup_timer_if_needed<E>(&self, value: Option<T>)
135    where
136        T: NecessarySendSync + 'static,
137        OR: Observer<T, E> + NecessarySendSync + 'static,
138        S: Scheduler,
139    {
140        self.context.lock_mut(|mut lock| {
141            lock.values.push_back((Instant::now() + self.delay, value));
142            if lock.timer.is_some() {
143                return;
144            }
145            let context = self.context.clone();
146            let observer = self.observer.clone();
147            lock.timer = Some(BoxedDisposal::new(self.scheduler.schedule_recursively(
148                move |_| {
149                    // Get values that should be sent
150                    let (values, completed) = context.lock_mut(|mut lock| {
151                        let mut values = Vec::new();
152                        let mut completed = false;
153                        let now = Instant::now();
154                        while let Some((instant, _)) = lock.values.front() {
155                            if now < *instant {
156                                break;
157                            }
158                            let value = lock.values.pop_front().unwrap().1;
159                            if let Some(value) = value {
160                                values.push(value);
161                            } else {
162                                completed = true;
163                                break;
164                            }
165                        }
166                        (values, completed)
167                    });
168
169                    if completed {
170                        safe_lock_option_observer!(on_next_and_termination: observer, values: values, Termination::Completed);
171                        RecursionAction::Stop
172                    } else {
173                        safe_lock_option_observer!(on_next: observer, values: values);
174                        context.lock_mut(|mut lock| {
175                            if let Some((next_instant, _)) = lock.values.front() {
176                                // Continue
177                                RecursionAction::ContinueAt(*next_instant)
178                            } else {
179                                // No more values. Stop timer. Set timer to None.
180                                if let Some(timer) = lock.timer.take() {
181                                    drop(lock);
182                                    timer.dispose();
183                                }
184                                RecursionAction::Stop
185                            }
186                        })
187                    }
188                },
189                Some(self.delay),
190            )));
191        });
192    }
193}
194
195impl<T, E, OR, S> Observer<T, E> for DelayObserver<T, OR, S>
196where
197    T: NecessarySendSync + 'static,
198    OR: Observer<T, E> + NecessarySendSync + 'static,
199    S: Scheduler,
200{
201    fn on_next(&mut self, value: T) {
202        self.emit_value_and_setup_timer_if_needed(Some(value));
203    }
204
205    fn on_termination(self, termination: Termination<E>) {
206        match termination {
207            Termination::Completed => {
208                self.emit_value_and_setup_timer_if_needed(None);
209            }
210            Termination::Error(_) => {
211                self.context.dispose();
212                safe_lock_option_observer!(on_termination: self.observer, termination);
213            }
214        }
215    }
216}