tsuki_scheduler/async_scheduler/
mod.rs1use std::{
2 collections::VecDeque,
3 future::{Future, Pending},
4 sync::{Arc, Mutex},
5};
6const DEFAULT_EXECUTE_DURATION: std::time::Duration = std::time::Duration::from_millis(100);
7use crate::{
8 Scheduler, Task, TaskUid, handle_manager::HandleManager, prelude::AsyncRuntime,
9 runtime::Runtime,
10};
11#[cfg(feature = "async-std")]
12mod async_std;
13#[cfg(feature = "tokio")]
14mod tokio;
15
16#[derive(Debug)]
17enum Event<R: Runtime> {
18 AddTask(TaskUid, Task<R>),
19 RemoveTask(TaskUid),
20}
21
22#[derive(Debug)]
35pub struct AsyncSchedulerRunner<R: AsyncRuntime, H = ()> {
36 pub scheduler: Scheduler<R, H>,
38 pub execute_duration: std::time::Duration,
40 event_queue: Arc<Mutex<VecDeque<Event<R>>>>,
41}
42
43impl<R, H> Default for AsyncSchedulerRunner<R, H>
44where
45 R: AsyncRuntime + Default,
46 H: Default,
47{
48 fn default() -> Self {
49 Self {
50 scheduler: Scheduler::default(),
51 execute_duration: DEFAULT_EXECUTE_DURATION,
52 event_queue: Default::default(),
53 }
54 }
55}
56
57impl<R: AsyncRuntime, H: HandleManager<R::Handle>> AsyncSchedulerRunner<R, H> {
58 pub fn new(scheduler: Scheduler<R, H>) -> Self {
60 Self {
61 scheduler,
62 execute_duration: DEFAULT_EXECUTE_DURATION,
63 event_queue: Default::default(),
64 }
65 }
66 pub fn with_execute_duration(mut self, duration: std::time::Duration) -> Self {
68 self.execute_duration = duration;
69 self
70 }
71 pub fn client(&self) -> AsyncSchedulerClient<R> {
73 AsyncSchedulerClient {
74 event_queue: self.event_queue.clone(),
75 }
76 }
77 pub fn run(self) -> AsyncSchedulerRunning<R, H, Pending<()>> {
79 self.run_with_shutdown_signal(std::future::pending())
80 }
81
82 pub fn run_with_shutdown_signal<S>(self, shutdown_signal: S) -> AsyncSchedulerRunning<R, H, S>
84 where
85 S: Future<Output = ()> + Send,
86 {
87 AsyncSchedulerRunning {
88 runner: Some(self),
89 event_queue: VecDeque::new(),
90 shutdown_signal,
91 }
92 }
93}
94
95#[derive(Debug)]
102pub struct AsyncSchedulerClient<R: AsyncRuntime> {
103 event_queue: Arc<Mutex<VecDeque<Event<R>>>>,
104}
105
106impl<R: AsyncRuntime> Clone for AsyncSchedulerClient<R> {
107 fn clone(&self) -> Self {
108 Self {
109 event_queue: self.event_queue.clone(),
110 }
111 }
112}
113
114impl<R: AsyncRuntime> AsyncSchedulerClient<R> {
115 pub fn add_task(&self, key: TaskUid, task: Task<R>) {
117 let mut queue = self.event_queue.lock().expect("lock event queue failed");
118 queue.push_back(Event::AddTask(key, task));
119 }
120 pub fn remove_task(&self, key: TaskUid) {
122 let mut queue = self.event_queue.lock().expect("lock event queue failed");
123 queue.push_back(Event::RemoveTask(key));
124 }
125}
126
127pub struct AsyncSchedulerRunning<R, H, S>
128where
129 R: AsyncRuntime + Send,
130 H: HandleManager<R::Handle>,
131 S: Future<Output = ()> + Send,
132{
133 runner: Option<AsyncSchedulerRunner<R, H>>,
134 event_queue: VecDeque<Event<R>>,
135 shutdown_signal: S,
136}
137
138impl<R, H, S> Unpin for AsyncSchedulerRunning<R, H, S>
139where
140 R: AsyncRuntime,
141 H: HandleManager<R::Handle>,
142 S: Future<Output = ()> + Unpin + Send,
143{
144}
145impl<R, H, S> Future for AsyncSchedulerRunning<R, H, S>
146where
147 R: AsyncRuntime,
148 H: HandleManager<R::Handle>,
149 S: Future<Output = ()> + Unpin + Send,
150{
151 type Output = AsyncSchedulerRunner<R, H>;
152 fn poll(
153 self: std::pin::Pin<&mut Self>,
154 cx: &mut std::task::Context<'_>,
155 ) -> std::task::Poll<Self::Output> {
156 let this = self.get_mut();
157 let shutdown_signal = std::pin::pin!(&mut this.shutdown_signal);
158 match shutdown_signal.poll(cx) {
159 std::task::Poll::Ready(_) => {
160 let runner = this.runner.take().expect("missing runner");
161 {
162 let mut add_task_queue =
163 runner.event_queue.lock().expect("lock event queue failed");
164 while let Some(event) = this.event_queue.pop_back() {
165 add_task_queue.push_front(event)
166 }
167 }
168 std::task::Poll::Ready(runner)
169 }
170 std::task::Poll::Pending => {
171 let runner = this.runner.as_mut().expect("missing runner");
172 {
173 let mut add_task_queue =
174 runner.event_queue.lock().expect("lock event queue failed");
175 std::mem::swap(&mut this.event_queue, &mut add_task_queue);
176 }
177 while let Some(evt) = this.event_queue.pop_front() {
178 match evt {
179 Event::AddTask(key, task) => {
180 runner.scheduler.add_task(key, task);
181 }
182 Event::RemoveTask(key) => {
183 runner.scheduler.delete_task(key);
184 }
185 }
186 }
187 runner.scheduler.execute_by_now();
188 runner
189 .scheduler
190 .runtime
191 .wake_after(runner.execute_duration, cx);
192 std::task::Poll::Pending
193 }
194 }
195 }
196}