1#[cfg(feature = "async-std-scheduler")]
2pub mod async_std_scheduler;
3#[cfg(feature = "local-pool-scheduler")]
4pub mod local_pool_scheduler;
5#[cfg(feature = "thread-pool-scheduler")]
6pub mod thread_pool_scheduler;
7#[cfg(feature = "tokio-scheduler")]
8pub mod tokio_scheduler;
9
10use crate::{disposable::Disposable, utils::types::NecessarySendSync};
11use educe::Educe;
12#[cfg(feature = "futures")]
13use futures::{Stream, stream::StreamExt};
14use std::time::{Duration, Instant};
15
16#[derive(Educe)]
18#[educe(Debug, Clone, PartialEq, Eq)]
19pub enum RecursionAction {
20 ContinueAt(Instant),
21 ContinueImmediately,
22 Stop,
23}
24
25pub trait Scheduler: Clone + NecessarySendSync + 'static {
29 fn schedule_future(
30 &self,
31 future: impl Future<Output = ()> + NecessarySendSync + 'static,
32 ) -> impl Disposable + NecessarySendSync + 'static;
33
34 fn sleep(&self, duration: Duration) -> impl Future + NecessarySendSync + 'static;
35
36 fn schedule(
37 &self,
38 task: impl FnOnce() + NecessarySendSync + 'static,
39 delay: Option<Duration>,
40 ) -> impl Disposable + NecessarySendSync + 'static {
41 let this = self.clone();
42 self.schedule_future(async move {
43 if let Some(delay) = delay {
44 this.sleep(delay).await;
45 }
46 task();
47 })
48 }
49
50 fn schedule_recursively(
51 &self,
52 mut task: impl FnMut(usize) -> RecursionAction + NecessarySendSync + 'static,
53 delay: Option<Duration>,
54 ) -> impl Disposable + NecessarySendSync + 'static {
55 let this = self.clone();
56 self.schedule_future(async move {
57 if let Some(delay) = delay {
58 this.clone().sleep(delay).await;
59 }
60 let mut count = 0;
61 loop {
62 match task(count) {
63 RecursionAction::ContinueAt(at) => {
64 if let Some(delay) = at.checked_duration_since(Instant::now()) {
65 this.clone().sleep(delay).await;
66 }
67 }
68 RecursionAction::ContinueImmediately => {}
69 RecursionAction::Stop => break,
70 }
71 count += 1;
72 }
73 })
74 }
75
76 fn schedule_periodically(
77 &self,
78 mut task: impl FnMut(usize) -> bool + NecessarySendSync + 'static,
79 period: Duration,
80 delay: Option<Duration>,
81 ) -> impl Disposable + NecessarySendSync + 'static {
82 let first = Instant::now() + delay.unwrap_or_default();
83 self.schedule_recursively(
84 move |count| {
85 let stop = task(count);
86 if stop {
87 RecursionAction::Stop
88 } else {
89 RecursionAction::ContinueAt(first + period * (count as u32 + 1))
90 }
91 },
92 delay,
93 )
94 }
95
96 #[cfg(feature = "futures")]
97 fn schedule_stream<SM>(
98 &self,
99 mut stream: SM,
100 mut result_callback: impl FnMut(Option<SM::Item>) + NecessarySendSync + 'static,
101 ) -> impl Disposable + NecessarySendSync + 'static
102 where
103 SM: Stream + NecessarySendSync + Unpin + 'static,
104 {
105 self.schedule_future(async move {
106 while let Some(item) = stream.next().await {
107 result_callback(Some(item));
108 }
109 result_callback(None);
110 })
111 }
112}