sparreal_kernel/os/async/
executor.rs1use core::future::Future;
11use core::time::Duration;
12
13use alloc::boxed::Box;
14use alloc::collections::BTreeMap;
15use alloc::collections::BinaryHeap;
16use alloc::collections::VecDeque;
17use alloc::sync::Arc;
18
19use crate::os::sync::spinlock::IrqSpinlock;
20
21use super::task::{TaskHandle, TaskId, TaskMetadata, TaskPriority, TaskRef};
22
23static GLOBAL_WAKEUP_QUEUE: IrqSpinlock<VecDeque<TaskId>> = IrqSpinlock::new(VecDeque::new());
25
26pub fn enqueue_task_wakeup(task_id: TaskId) {
28 let mut queue = GLOBAL_WAKEUP_QUEUE.lock();
29 if !queue.contains(&task_id) {
30 queue.push_back(task_id);
31 }
32}
33
34pub struct SingleCpuExecutor {
36 priority_task_queue: IrqSpinlock<BinaryHeap<PriorityTaskWrapper>>,
38 active_task_registry: IrqSpinlock<alloc::collections::BTreeMap<TaskId, Arc<TaskRef>>>,
40 executor_running: IrqSpinlock<bool>,
42 task_timeout_milliseconds: u64,
44}
45
46#[derive(Debug)]
48struct PriorityTaskWrapper {
49 task_priority: TaskPriority,
51 task_reference: Arc<TaskRef>,
53}
54
55impl PriorityTaskWrapper {
56 fn new(task_ref: Arc<TaskRef>) -> Self {
58 Self {
59 task_priority: task_ref.priority(),
60 task_reference: task_ref,
61 }
62 }
63}
64
65impl PartialEq for PriorityTaskWrapper {
67 fn eq(&self, other: &Self) -> bool {
68 self.task_priority == other.task_priority
69 }
70}
71
72impl Eq for PriorityTaskWrapper {}
73
74impl PartialOrd for PriorityTaskWrapper {
75 fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
76 Some(self.cmp(other))
77 }
78}
79
80impl Ord for PriorityTaskWrapper {
81 fn cmp(&self, other: &Self) -> core::cmp::Ordering {
82 other.task_priority.cmp(&self.task_priority)
84 }
85}
86
87impl SingleCpuExecutor {
88 pub fn new() -> Self {
90 Self::with_timeout(Duration::from_secs(1))
91 }
92
93 pub fn with_timeout(timeout: Duration) -> Self {
95 Self {
96 priority_task_queue: IrqSpinlock::new(BinaryHeap::new()),
97 active_task_registry: IrqSpinlock::new(BTreeMap::new()),
98 executor_running: IrqSpinlock::new(false),
99 task_timeout_milliseconds: timeout.as_millis() as u64,
100 }
101 }
102
103 pub fn global() -> &'static Self {
105 use core::sync::atomic::{AtomicPtr, Ordering};
106
107 static EXECUTOR_PTR: AtomicPtr<SingleCpuExecutor> = AtomicPtr::new(core::ptr::null_mut());
108
109 let ptr = EXECUTOR_PTR.load(Ordering::Acquire);
110 if ptr.is_null() {
111 let executor = Box::leak(Box::new(SingleCpuExecutor::new()));
113 match EXECUTOR_PTR.compare_exchange(
114 core::ptr::null_mut(),
115 executor,
116 Ordering::AcqRel,
117 Ordering::Acquire,
118 ) {
119 Ok(_) => executor,
120 Err(existing) => {
121 unsafe {
123 let _ = Box::from_raw(executor);
125 }
126 unsafe { &*existing }
127 }
128 }
129 } else {
130 unsafe { &*ptr }
131 }
132 }
133
134 pub fn spawn<F, T>(&self, future: F) -> TaskHandle
136 where
137 F: Future<Output = T> + Send + 'static,
138 T: Send + 'static,
139 {
140 let task_id = TaskId::new();
141 let metadata = TaskMetadata::new(task_id);
142
143 let wrapped_future = async move {
145 let _ = future.await;
146 };
147
148 let task_ref = Arc::new(TaskRef::new(wrapped_future, metadata));
149 let handle = TaskHandle::with_ref(task_id, task_ref.clone());
150
151 {
153 let mut registry = self.active_task_registry.lock();
154 registry.insert(task_id, task_ref);
155 }
156
157 self.add_task_to_queue(task_id);
159
160 debug!("Spawned task {task_id:?}",);
161 handle
162 }
163
164 fn add_task_to_queue(&self, task_id: TaskId) {
166 let registry = self.active_task_registry.lock();
167 if let Some(task_ref) = registry.get(&task_id) {
168 let priority_task = PriorityTaskWrapper::new(task_ref.clone());
169 let mut queue = self.priority_task_queue.lock();
170 queue.push(priority_task);
171 }
172 }
173
174 pub fn wake_by_id(&self, task_id: TaskId) -> bool {
176 let registry = self.active_task_registry.lock();
177 if let Some(task_ref) = registry.get(&task_id) {
178 task_ref.metadata.lock().mark_woken();
179 self.add_task_to_queue(task_id);
180 true
181 } else {
182 false
183 }
184 }
185
186 fn process_one_task(&self) -> bool {
188 let priority_task = {
189 let mut queue = self.priority_task_queue.lock();
190 queue.pop()
191 };
192
193 if let Some(priority_task) = priority_task {
194 let task_ref = priority_task.task_reference.clone();
195 let task_id = task_ref.id();
196
197 if task_ref.is_completed() {
199 let mut registry = self.active_task_registry.lock();
201 registry.remove(&task_id);
202 debug!("Task {task_id:?} completed and cleaned up");
203 return true;
204 }
205
206 let should_execute = {
208 let mut metadata = task_ref.metadata.lock();
209
210 if metadata.state == super::task::TaskState::Woken {
212 true
213 } else if metadata.is_expired(self.task_timeout_milliseconds) {
214 metadata.mark_woken();
216 debug!("Task {task_id:?} expired, promoting priority");
217 true
218 } else {
219 false
221 }
222 };
223
224 if !should_execute {
225 return true;
228 }
229
230 let waker = ExecutorWaker::new(task_id);
232 let waker = waker.into_waker();
233
234 match task_ref.poll(&waker) {
235 core::task::Poll::Ready(()) => {
236 let mut registry = self.active_task_registry.lock();
238 registry.remove(&task_id);
239 debug!("Task {task_id:?} completed");
240 true
241 }
242 core::task::Poll::Pending => {
243 debug!("Task {task_id:?} pending, waiting for wake");
246 true
247 }
248 }
249 } else {
250 false }
252 }
253
254 pub fn tick(&self) {
256 self.process_wake_queue();
258
259 let mut processed = 0;
261 const MAX_TASKS_PER_TICK: usize = 10;
262
263 while processed < MAX_TASKS_PER_TICK && self.process_one_task() {
264 processed += 1;
265 }
266
267 if processed == 0 {
268 log::debug!("No tasks to process in this tick");
269 }
270 }
271
272 fn process_wake_queue(&self) {
274 loop {
275 let task_id = {
276 let mut queue = GLOBAL_WAKEUP_QUEUE.lock();
277 queue.pop_front()
278 };
279
280 if let Some(task_id) = task_id {
281 let registry = self.active_task_registry.lock();
283 if let Some(task_ref) = registry.get(&task_id) {
284 task_ref.metadata.lock().mark_woken();
285 let priority_task = PriorityTaskWrapper::new(task_ref.clone());
286 let mut queue = self.priority_task_queue.lock();
287 queue.push(priority_task);
288 }
289 } else {
290 break;
291 }
292 }
293 }
294
295 pub fn run_until_completion(&self) {
297 *self.executor_running.lock() = true;
298
299 debug!("Executor started, running until completion");
300
301 while self.has_pending_tasks() {
302 self.tick();
303
304 for _ in 0..1000 {
306 core::hint::spin_loop();
307 }
308 }
309
310 *self.executor_running.lock() = false;
311 debug!("Executor finished, all tasks completed");
312 }
313
314 pub fn has_pending_tasks(&self) -> bool {
316 if !GLOBAL_WAKEUP_QUEUE.lock().is_empty() {
318 return true;
319 }
320 if !self.priority_task_queue.lock().is_empty() {
322 return true;
323 }
324 let registry = self.active_task_registry.lock();
326 !registry.is_empty()
327 }
328
329 pub fn task_count(&self) -> usize {
331 self.active_task_registry.lock().len()
332 }
333
334 pub fn queued_task_count(&self) -> usize {
336 self.priority_task_queue.lock().len()
337 }
338
339 pub fn is_running(&self) -> bool {
341 *self.executor_running.lock()
342 }
343}
344
345impl Default for SingleCpuExecutor {
346 fn default() -> Self {
347 Self::new()
348 }
349}
350
351pub fn spawn<F, T>(future: F) -> TaskHandle
353where
354 F: Future<Output = T> + Send + 'static,
355 T: Send + 'static,
356{
357 SingleCpuExecutor::global().spawn(future)
358}
359
360pub fn block_on<F>(future: F)
363where
364 F: Future<Output = ()> + Send + 'static,
365{
366 let executor = SingleCpuExecutor::global();
367 executor.spawn(future);
368 executor.run_until_completion();
369}
370
371pub fn tick() {
373 SingleCpuExecutor::global().tick();
374}
375
376pub fn has_pending_tasks() -> bool {
378 SingleCpuExecutor::global().has_pending_tasks()
379}
380
381pub fn task_count() -> usize {
383 SingleCpuExecutor::global().task_count()
384}
385
386#[derive(Debug)]
388struct ExecutorWaker {
389 task_id: TaskId,
391}
392
393impl ExecutorWaker {
394 fn new(task_id: TaskId) -> Self {
396 Self { task_id }
397 }
398
399 fn into_waker(self) -> core::task::Waker {
401 let arc = Arc::new(self);
402 core::task::Waker::from(arc)
403 }
404}
405
406impl alloc::task::Wake for ExecutorWaker {
407 fn wake(self: Arc<Self>) {
408 enqueue_task_wakeup(self.task_id);
409 }
410
411 fn wake_by_ref(self: &Arc<Self>) {
412 enqueue_task_wakeup(self.task_id);
413 }
414}