tsuki_scheduler/async_scheduler/
mod.rs

1use std::{
2    collections::VecDeque,
3    future::{Future, Pending},
4    sync::{Arc, Mutex},
5};
6const DEFAULT_EXECUTE_DURATION: std::time::Duration = std::time::Duration::from_millis(100);
7use crate::{
8    Scheduler, Task, TaskUid, handle_manager::HandleManager, prelude::AsyncRuntime,
9    runtime::Runtime,
10};
11#[cfg(feature = "async-std")]
12mod async_std;
13#[cfg(feature = "tokio")]
14mod tokio;
15
16#[derive(Debug)]
17enum Event<R: Runtime> {
18    AddTask(TaskUid, Task<R>),
19    RemoveTask(TaskUid),
20}
21
22/// A implementation of async scheduler runner
23///
24/// ```
25/// # use tsuki_scheduler::prelude::*;
26/// // create runner
27/// let mut runner = AsyncSchedulerRunner::<Tokio>::default();
28/// // get client
29/// let client = runner.client();
30/// let task = async move {
31///     runner.run().await;
32/// };
33/// ```
34#[derive(Debug)]
35pub struct AsyncSchedulerRunner<R: AsyncRuntime, H = ()> {
36    /// inner scheduler
37    pub scheduler: Scheduler<R, H>,
38    /// execute duration
39    pub execute_duration: std::time::Duration,
40    event_queue: Arc<Mutex<VecDeque<Event<R>>>>,
41}
42
43impl<R, H> Default for AsyncSchedulerRunner<R, H>
44where
45    R: AsyncRuntime + Default,
46    H: Default,
47{
48    fn default() -> Self {
49        Self {
50            scheduler: Scheduler::default(),
51            execute_duration: DEFAULT_EXECUTE_DURATION,
52            event_queue: Default::default(),
53        }
54    }
55}
56
57impl<R: AsyncRuntime, H: HandleManager<R::Handle>> AsyncSchedulerRunner<R, H> {
58    /// create a new async scheduler runner
59    pub fn new(scheduler: Scheduler<R, H>) -> Self {
60        Self {
61            scheduler,
62            execute_duration: DEFAULT_EXECUTE_DURATION,
63            event_queue: Default::default(),
64        }
65    }
66    /// set execute duration
67    pub fn with_execute_duration(mut self, duration: std::time::Duration) -> Self {
68        self.execute_duration = duration;
69        self
70    }
71    /// get runner client
72    pub fn client(&self) -> AsyncSchedulerClient<R> {
73        AsyncSchedulerClient {
74            event_queue: self.event_queue.clone(),
75        }
76    }
77    /// start running
78    pub fn run(self) -> AsyncSchedulerRunning<R, H, Pending<()>> {
79        self.run_with_shutdown_signal(std::future::pending())
80    }
81
82    /// start running with shutdown signal
83    pub fn run_with_shutdown_signal<S>(self, shutdown_signal: S) -> AsyncSchedulerRunning<R, H, S>
84    where
85        S: Future<Output = ()> + Send,
86    {
87        AsyncSchedulerRunning {
88            runner: Some(self),
89            event_queue: VecDeque::new(),
90            shutdown_signal,
91        }
92    }
93}
94
95/// Client for [`AsyncSchedulerRunner`]
96///
97/// created by [`AsyncSchedulerRunner::client`].
98///
99/// # Clone
100/// this client is cheap to clone.
101#[derive(Debug)]
102pub struct AsyncSchedulerClient<R: AsyncRuntime> {
103    event_queue: Arc<Mutex<VecDeque<Event<R>>>>,
104}
105
106impl<R: AsyncRuntime> Clone for AsyncSchedulerClient<R> {
107    fn clone(&self) -> Self {
108        Self {
109            event_queue: self.event_queue.clone(),
110        }
111    }
112}
113
114impl<R: AsyncRuntime> AsyncSchedulerClient<R> {
115    /// add a new task
116    pub fn add_task(&self, key: TaskUid, task: Task<R>) {
117        let mut queue = self.event_queue.lock().expect("lock event queue failed");
118        queue.push_back(Event::AddTask(key, task));
119    }
120    /// remove a task by id
121    pub fn remove_task(&self, key: TaskUid) {
122        let mut queue = self.event_queue.lock().expect("lock event queue failed");
123        queue.push_back(Event::RemoveTask(key));
124    }
125}
126
127pub struct AsyncSchedulerRunning<R, H, S>
128where
129    R: AsyncRuntime + Send,
130    H: HandleManager<R::Handle>,
131    S: Future<Output = ()> + Send,
132{
133    runner: Option<AsyncSchedulerRunner<R, H>>,
134    event_queue: VecDeque<Event<R>>,
135    shutdown_signal: S,
136}
137
138impl<R, H, S> Unpin for AsyncSchedulerRunning<R, H, S>
139where
140    R: AsyncRuntime,
141    H: HandleManager<R::Handle>,
142    S: Future<Output = ()> + Unpin + Send,
143{
144}
145impl<R, H, S> Future for AsyncSchedulerRunning<R, H, S>
146where
147    R: AsyncRuntime,
148    H: HandleManager<R::Handle>,
149    S: Future<Output = ()> + Unpin + Send,
150{
151    type Output = AsyncSchedulerRunner<R, H>;
152    fn poll(
153        self: std::pin::Pin<&mut Self>,
154        cx: &mut std::task::Context<'_>,
155    ) -> std::task::Poll<Self::Output> {
156        let this = self.get_mut();
157        let shutdown_signal = std::pin::pin!(&mut this.shutdown_signal);
158        match shutdown_signal.poll(cx) {
159            std::task::Poll::Ready(_) => {
160                let runner = this.runner.take().expect("missing runner");
161                {
162                    let mut add_task_queue =
163                        runner.event_queue.lock().expect("lock event queue failed");
164                    while let Some(event) = this.event_queue.pop_back() {
165                        add_task_queue.push_front(event)
166                    }
167                }
168                std::task::Poll::Ready(runner)
169            }
170            std::task::Poll::Pending => {
171                let runner = this.runner.as_mut().expect("missing runner");
172                {
173                    let mut add_task_queue =
174                        runner.event_queue.lock().expect("lock event queue failed");
175                    std::mem::swap(&mut this.event_queue, &mut add_task_queue);
176                }
177                while let Some(evt) = this.event_queue.pop_front() {
178                    match evt {
179                        Event::AddTask(key, task) => {
180                            runner.scheduler.add_task(key, task);
181                        }
182                        Event::RemoveTask(key) => {
183                            runner.scheduler.delete_task(key);
184                        }
185                    }
186                }
187                runner.scheduler.execute_by_now();
188                runner
189                    .scheduler
190                    .runtime
191                    .wake_after(runner.execute_duration, cx);
192                std::task::Poll::Pending
193            }
194        }
195    }
196}