irox_threading/
current.rs

1// SPDX-License-Identifier: MIT
2// Copyright 2025 IROX Contributors
3//
4
5use std::collections::VecDeque;
6use std::future::Future;
7use std::pin::Pin;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::Arc;
10use std::task::{Context, Poll, Wake, Waker};
11
12use crate::{LocalCompletableTask, LocalFuture};
13
14trait LocalFutureType<'a>: Future<Output = ()> + 'a + HasLocalWaker {}
15
16///
17/// An Executor that doesn't spawn new threads, just runs on the current thread.
18#[derive(Default)]
19pub struct CurrentThreadExecutor<'a> {
20    processing_queue: VecDeque<Pin<Box<dyn LocalFutureType<'a, Output = ()>>>>,
21}
22
23impl<'a> CurrentThreadExecutor<'a> {
24    /// Create a new [`CurrentThreadExecutor`]
25    pub fn new() -> Self {
26        CurrentThreadExecutor::default()
27    }
28
29    ///
30    /// Submit a new task to this executor.  Note:  This does not immediately run the task, you
31    /// still need to call either [`CurrentThreadExecutor::run_some`] or
32    /// [`CurrentThreadExecutor::run_until_complete`]
33    pub fn submit<T: 'a, F: Future<Output = T> + 'a>(&mut self, fut: F) -> LocalTaskHandle<T> {
34        let task = LocalTask {
35            future: Box::pin(fut),
36            waker: Arc::new(LocalWaker::default()),
37            complete: LocalCompletableTask::new(),
38        };
39        let handle = task.join_handle();
40        self.processing_queue.push_back(Box::pin(task));
41        handle
42    }
43
44    ///
45    /// Runs a single loop through the processing queue, in order, letting each task attempt to do
46    /// work.
47    pub fn run_some(&mut self) {
48        let mut pinned = Pin::new(self);
49        let mut pending = VecDeque::new();
50        while let Some(mut task) = pinned.processing_queue.pop_front() {
51            if !task.needs_wake() {
52                pending.push_back(task);
53                continue;
54            }
55            let waker = Waker::from(task.get_waker());
56            let mut context = Context::from_waker(&waker);
57
58            // clear out the 'needs running' flag BEFORE we poll the future,
59            // this way the future can re-schedule itself if necessary
60            task.get_waker()
61                .needs_running
62                .store(false, Ordering::Relaxed);
63
64            // poll the future
65            match task.as_mut().poll(&mut context) {
66                Poll::Ready(()) => {}
67                Poll::Pending => {
68                    // reschedule task again.
69                    pending.push_back(task);
70                }
71            }
72        }
73        pinned.processing_queue.append(&mut pending);
74    }
75
76    ///
77    /// Runs this executor until all submitted tasks are complete.
78    pub fn run_until_complete(&mut self) {
79        while !self.processing_queue.is_empty() {
80            self.run_some();
81        }
82    }
83}
84
85///
86/// Local thread Waker struct
87pub struct LocalWaker {
88    needs_running: AtomicBool,
89}
90
91impl Default for LocalWaker {
92    fn default() -> Self {
93        LocalWaker {
94            needs_running: AtomicBool::new(true),
95        }
96    }
97}
98
99impl Wake for LocalWaker {
100    fn wake(self: Arc<Self>) {
101        self.needs_running.store(true, Ordering::Relaxed);
102    }
103}
104
105trait HasLocalWaker {
106    fn needs_wake(&self) -> bool;
107    // fn clear_wake(&self);
108    fn get_waker(&self) -> Arc<LocalWaker>;
109}
110///
111/// A task that can be run on the current thread.
112pub struct LocalTask<'a, T> {
113    future: LocalFuture<'a, T>,
114    waker: Arc<LocalWaker>,
115    complete: LocalCompletableTask<T>,
116}
117impl<'a, T> HasLocalWaker for LocalTask<'a, T>
118where
119    T: 'a,
120{
121    fn needs_wake(&self) -> bool {
122        self.waker.needs_running.load(Ordering::Relaxed)
123    }
124
125    // fn clear_wake(&self) {
126    //     self.waker.needs_running.store(false, Ordering::Relaxed);
127    // }
128
129    fn get_waker(&self) -> Arc<LocalWaker> {
130        self.waker.clone()
131    }
132}
133impl<'a, T> LocalTask<'a, T>
134where
135    T: 'a,
136{
137    pub fn join_handle(&self) -> LocalTaskHandle<T> {
138        LocalTaskHandle {
139            result: self.complete.clone(),
140        }
141    }
142}
143
144impl<T> Future for LocalTask<'_, T> {
145    type Output = ();
146
147    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
148        let mself = self.get_mut();
149        match mself.future.as_mut().poll(cx) {
150            Poll::Ready(e) => {
151                let _ign = mself.complete.try_complete(e);
152                Poll::Ready(())
153            }
154            Poll::Pending => Poll::Pending,
155        }
156    }
157}
158
159impl<'a, T: 'a> LocalFutureType<'a> for LocalTask<'a, T> {}
160
161///
162/// A handle to the submitted task, to retrieve the result of the operation
163pub struct LocalTaskHandle<T> {
164    result: LocalCompletableTask<T>,
165}
166
167impl<T> LocalTaskHandle<T> {
168    ///
169    /// Attempts to retrive the result of the operation.  If the operation isn't complete yet,
170    /// returns [`None`]
171    pub fn get(&mut self) -> Option<T> {
172        match self.result.get() {
173            Poll::Ready(e) => Some(e),
174            Poll::Pending => None,
175        }
176    }
177}
178
179#[cfg(test)]
180mod tests {
181    use crate::CurrentThreadExecutor;
182
183    #[test]
184    pub fn test() {
185        let mut executor = CurrentThreadExecutor::new();
186
187        let mut handle = executor.submit(async { println!("Hello async") });
188        let mut handle2 = executor.submit(async { println!("Hello async2") });
189
190        assert_eq!(None, handle.get());
191        assert_eq!(None, handle2.get());
192
193        executor.run_until_complete();
194
195        assert_ne!(None, handle.get());
196        assert_ne!(None, handle2.get());
197    }
198}