rx_rust/operators/utility/
timeout.rs

1use crate::{
2    disposable::{
3        Disposable, boxed_disposal::BoxedDisposal, callback_disposal::CallbackDisposal,
4        subscription::Subscription,
5    },
6    observable::Observable,
7    observer::{Observer, Termination},
8    safe_lock, safe_lock_option_observer,
9    scheduler::Scheduler,
10    utils::{
11        types::{Mutable, MutableHelper, NecessarySendSync, Shared},
12        unsub_after_termination::subscribe_unsub_after_termination,
13    },
14};
15use educe::Educe;
16use std::time::Duration;
17
18#[derive(Educe)]
19#[educe(Debug, Clone, PartialEq, Eq)]
20pub enum Error<E> {
21    Timeout,
22    SourceError(E),
23}
24
25/// Mirrors the source Observable, but issues an error if a specified duration elapses between emissions.
26/// See <https://reactivex.io/documentation/operators/timeout.html>
27///
28/// # Examples
29/// ```rust
30/// # #[cfg(not(feature = "tokio-scheduler"))]
31/// # fn main() {
32/// #     panic!("Use tokio-scheduler feature to run tests.");
33/// # }
34/// # #[cfg(feature = "tokio-scheduler")]
35/// #[tokio::main]
36/// async fn main() {
37///     use rx_rust::{
38///         observable::observable_ext::ObservableExt,
39///         observer::{Observer, Termination},
40///         operators::utility::timeout::{Error, Timeout},
41///         subject::publish_subject::PublishSubject,
42///     };
43///     use std::{convert::Infallible, sync::{Arc, Mutex}};
44///     use tokio::time::{sleep, Duration};
45///
46///     let handle = tokio::runtime::Handle::current();
47///     let values = Arc::new(Mutex::new(Vec::new()));
48///     let terminations = Arc::new(Mutex::new(Vec::new()));
49///     let values_observer = Arc::clone(&values);
50///     let terminations_observer = Arc::clone(&terminations);
51///     let mut subject: PublishSubject<'static, i32, Infallible> = PublishSubject::default();
52///
53///     let subscription = Timeout::new(subject.clone(), Duration::from_millis(5), handle.clone())
54///         .subscribe_with_callback(
55///             move |value| values_observer.lock().unwrap().push(value),
56///             move |termination| terminations_observer
57///                 .lock()
58///                 .unwrap()
59///                 .push(termination),
60///         );
61///
62///     subject.on_next(1);
63///     sleep(Duration::from_millis(10)).await;
64///     drop(subscription);
65///
66///     assert_eq!(&*values.lock().unwrap(), &[1]);
67///     assert_eq!(
68///         &*terminations.lock().unwrap(),
69///         &[Termination::Error(Error::Timeout)]
70///     );
71/// }
72/// ```
73#[derive(Educe)]
74#[educe(Debug, Clone)]
75pub struct Timeout<OE, S> {
76    source: OE,
77    duration: Duration,
78    scheduler: S,
79}
80
81impl<OE, S> Timeout<OE, S> {
82    pub fn new(source: OE, duration: Duration, scheduler: S) -> Self {
83        Self {
84            source,
85            duration,
86            scheduler,
87        }
88    }
89}
90
91impl<'or, 'sub, T, E, OE, S> Observable<'static, 'sub, T, Error<E>> for Timeout<OE, S>
92where
93    OE: Observable<'or, 'static, T, E>,
94    S: Scheduler,
95{
96    fn subscribe(
97        self,
98        observer: impl Observer<T, Error<E>> + NecessarySendSync + 'static,
99    ) -> Subscription<'static> {
100        subscribe_unsub_after_termination(observer, |observer| {
101            let context = Shared::new(Mutable::new(TimeoutContext {
102                timer_state: TimerState::Initialized,
103                version: 0,
104            }));
105            let observer = Shared::new(Mutable::new(Some(observer)));
106            let timeout_observer = TimeoutObserver {
107                observer: observer.clone(),
108                duration: self.duration,
109                scheduler: self.scheduler.clone(),
110                context: context.clone(),
111            };
112
113            let sub = self.source.subscribe(timeout_observer);
114            let timer = create_timer(
115                0,
116                observer.clone(),
117                self.duration,
118                self.scheduler.clone(),
119                context.clone(),
120            );
121            let timer_state =
122                safe_lock!(mem_replace: context, timer_state, TimerState::Scheduled(timer));
123            match timer_state {
124                TimerState::Initialized => {} // Normal case.
125                TimerState::Scheduled(_) => unreachable!(),
126                TimerState::DidTimeout => {} // Scheduled task is too fast.
127                TimerState::Disposed => unreachable!(),
128            }
129            sub + context
130        })
131    }
132}
133
134enum TimerState {
135    Initialized,
136    Scheduled(BoxedDisposal<'static>),
137    DidTimeout,
138    Disposed,
139}
140
141struct TimeoutContext {
142    timer_state: TimerState,
143    version: usize,
144}
145
146impl Disposable for Shared<Mutable<TimeoutContext>> {
147    fn dispose(self) {
148        let timer_state = safe_lock!(mem_replace: self, timer_state, TimerState::Disposed);
149        match timer_state {
150            TimerState::Initialized => unreachable!(),
151            TimerState::Scheduled(disposal) => disposal.dispose(), // Not timeout yet.
152            TimerState::DidTimeout => {}                           // Timeout
153            TimerState::Disposed => unreachable!(),
154        }
155    }
156}
157
158struct TimeoutObserver<OR, S> {
159    observer: Shared<Mutable<Option<OR>>>,
160    duration: Duration,
161    scheduler: S,
162    context: Shared<Mutable<TimeoutContext>>,
163}
164
165impl<T, E, OR, S> Observer<T, E> for TimeoutObserver<OR, S>
166where
167    OR: Observer<T, Error<E>> + NecessarySendSync + 'static,
168    S: Scheduler,
169{
170    fn on_next(&mut self, value: T) {
171        self.context
172            .lock_mut(|mut lock| match &mut lock.timer_state {
173                TimerState::Initialized => {
174                    drop(lock);
175                    safe_lock_option_observer!(on_next: self.observer, value);
176                }
177                TimerState::Scheduled(disposal) => {
178                    // dispose old timer
179                    let disposal = std::mem::replace(
180                        disposal,
181                        BoxedDisposal::new(CallbackDisposal::new(|| {})), // Plcaceholder
182                    );
183                    disposal.dispose();
184
185                    // schedule new timer
186                    lock.version += 1;
187                    let timer = create_timer(
188                        lock.version,
189                        self.observer.clone(),
190                        self.duration,
191                        self.scheduler.clone(),
192                        self.context.clone(),
193                    );
194                    lock.timer_state = TimerState::Scheduled(timer);
195
196                    // emit
197                    drop(lock);
198                    safe_lock_option_observer!(on_next: self.observer, value);
199                }
200                TimerState::DidTimeout => {}
201                TimerState::Disposed => {}
202            });
203    }
204
205    fn on_termination(self, termination: Termination<E>) {
206        match termination {
207            Termination::Completed => {
208                safe_lock_option_observer!(on_termination: self.observer, Termination::Completed);
209            }
210            Termination::Error(error) => {
211                safe_lock_option_observer!(on_termination: self.observer, Termination::Error(Error::SourceError(error)));
212            }
213        }
214    }
215}
216
217fn create_timer<T, E, OR, S>(
218    version: usize,
219    observer: Shared<Mutable<Option<OR>>>,
220    duration: Duration,
221    scheduler: S,
222    context: Shared<Mutable<TimeoutContext>>,
223) -> BoxedDisposal<'static>
224where
225    OR: Observer<T, Error<E>> + NecessarySendSync + 'static,
226    S: Scheduler,
227{
228    BoxedDisposal::new(scheduler.schedule(
229        move || {
230            context.lock_mut(|mut lock| {
231                if lock.version != version {
232                    // New version, should ignore.
233                    return;
234                }
235                // Same version, should do timeout.
236                let timer_state = std::mem::replace(&mut lock.timer_state, TimerState::DidTimeout);
237                drop(lock);
238                safe_lock_option_observer!(on_termination: observer, Termination::Error(Error::Timeout));
239                match timer_state {
240                    TimerState::Initialized => {} //  Scheduled task is too fast.
241                    TimerState::Scheduled(disposal) => disposal.dispose(), // Dispose the old timer as soon as possible to make the `EntryExitChecker` correct.
242                    TimerState::DidTimeout => unreachable!(),
243                    TimerState::Disposed => {},
244                }
245            });
246        },
247        Some(duration),
248    ))
249}