rx_rust/scheduler/
mod.rs

1#[cfg(feature = "async-std-scheduler")]
2pub mod async_std_scheduler;
3#[cfg(feature = "local-pool-scheduler")]
4pub mod local_pool_scheduler;
5#[cfg(feature = "thread-pool-scheduler")]
6pub mod thread_pool_scheduler;
7#[cfg(feature = "tokio-scheduler")]
8pub mod tokio_scheduler;
9
10use crate::{disposable::Disposable, utils::types::NecessarySendSync};
11use educe::Educe;
12#[cfg(feature = "futures")]
13use futures::{Stream, stream::StreamExt};
14use std::time::{Duration, Instant};
15
16/// Indicates how a recursive scheduling step should continue.
17#[derive(Educe)]
18#[educe(Debug, Clone, PartialEq, Eq)]
19pub enum RecursionAction {
20    ContinueAt(Instant),
21    ContinueImmediately,
22    Stop,
23}
24
25/// Core abstraction for driving asynchronous work across runtimes.
26/// See <https://reactivex.io/documentation/scheduler.html>
27/// This is why the task must be 'static: <https://stackoverflow.com/a/65287449/9315497>
28pub trait Scheduler: Clone + NecessarySendSync + 'static {
29    fn schedule_future(
30        &self,
31        future: impl Future<Output = ()> + NecessarySendSync + 'static,
32    ) -> impl Disposable + NecessarySendSync + 'static;
33
34    fn sleep(&self, duration: Duration) -> impl Future + NecessarySendSync + 'static;
35
36    fn schedule(
37        &self,
38        task: impl FnOnce() + NecessarySendSync + 'static,
39        delay: Option<Duration>,
40    ) -> impl Disposable + NecessarySendSync + 'static {
41        let this = self.clone();
42        self.schedule_future(async move {
43            if let Some(delay) = delay {
44                this.sleep(delay).await;
45            }
46            task();
47        })
48    }
49
50    fn schedule_recursively(
51        &self,
52        mut task: impl FnMut(usize) -> RecursionAction + NecessarySendSync + 'static,
53        delay: Option<Duration>,
54    ) -> impl Disposable + NecessarySendSync + 'static {
55        let this = self.clone();
56        self.schedule_future(async move {
57            if let Some(delay) = delay {
58                this.clone().sleep(delay).await;
59            }
60            let mut count = 0;
61            loop {
62                match task(count) {
63                    RecursionAction::ContinueAt(at) => {
64                        if let Some(delay) = at.checked_duration_since(Instant::now()) {
65                            this.clone().sleep(delay).await;
66                        }
67                    }
68                    RecursionAction::ContinueImmediately => {}
69                    RecursionAction::Stop => break,
70                }
71                count += 1;
72            }
73        })
74    }
75
76    fn schedule_periodically(
77        &self,
78        mut task: impl FnMut(usize) -> bool + NecessarySendSync + 'static,
79        period: Duration,
80        delay: Option<Duration>,
81    ) -> impl Disposable + NecessarySendSync + 'static {
82        let first = Instant::now() + delay.unwrap_or_default();
83        self.schedule_recursively(
84            move |count| {
85                let stop = task(count);
86                if stop {
87                    RecursionAction::Stop
88                } else {
89                    RecursionAction::ContinueAt(first + period * (count as u32 + 1))
90                }
91            },
92            delay,
93        )
94    }
95
96    #[cfg(feature = "futures")]
97    fn schedule_stream<SM>(
98        &self,
99        mut stream: SM,
100        mut result_callback: impl FnMut(Option<SM::Item>) + NecessarySendSync + 'static,
101    ) -> impl Disposable + NecessarySendSync + 'static
102    where
103        SM: Stream + NecessarySendSync + Unpin + 'static,
104    {
105        self.schedule_future(async move {
106            while let Some(item) = stream.next().await {
107                result_callback(Some(item));
108            }
109            result_callback(None);
110        })
111    }
112}