irox_threading/
current.rs1use std::collections::VecDeque;
6use std::future::Future;
7use std::pin::Pin;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::Arc;
10use std::task::{Context, Poll, Wake, Waker};
11
12use crate::{LocalCompletableTask, LocalFuture};
13
14trait LocalFutureType<'a>: Future<Output = ()> + 'a + HasLocalWaker {}
15
16#[derive(Default)]
19pub struct CurrentThreadExecutor<'a> {
20 processing_queue: VecDeque<Pin<Box<dyn LocalFutureType<'a, Output = ()>>>>,
21}
22
23impl<'a> CurrentThreadExecutor<'a> {
24 pub fn new() -> Self {
26 CurrentThreadExecutor::default()
27 }
28
29 pub fn submit<T: 'a, F: Future<Output = T> + 'a>(&mut self, fut: F) -> LocalTaskHandle<T> {
34 let task = LocalTask {
35 future: Box::pin(fut),
36 waker: Arc::new(LocalWaker::default()),
37 complete: LocalCompletableTask::new(),
38 };
39 let handle = task.join_handle();
40 self.processing_queue.push_back(Box::pin(task));
41 handle
42 }
43
44 pub fn run_some(&mut self) {
48 let mut pinned = Pin::new(self);
49 let mut pending = VecDeque::new();
50 while let Some(mut task) = pinned.processing_queue.pop_front() {
51 if !task.needs_wake() {
52 pending.push_back(task);
53 continue;
54 }
55 let waker = Waker::from(task.get_waker());
56 let mut context = Context::from_waker(&waker);
57
58 task.get_waker()
61 .needs_running
62 .store(false, Ordering::Relaxed);
63
64 match task.as_mut().poll(&mut context) {
66 Poll::Ready(()) => {}
67 Poll::Pending => {
68 pending.push_back(task);
70 }
71 }
72 }
73 pinned.processing_queue.append(&mut pending);
74 }
75
76 pub fn run_until_complete(&mut self) {
79 while !self.processing_queue.is_empty() {
80 self.run_some();
81 }
82 }
83}
84
85pub struct LocalWaker {
88 needs_running: AtomicBool,
89}
90
91impl Default for LocalWaker {
92 fn default() -> Self {
93 LocalWaker {
94 needs_running: AtomicBool::new(true),
95 }
96 }
97}
98
99impl Wake for LocalWaker {
100 fn wake(self: Arc<Self>) {
101 self.needs_running.store(true, Ordering::Relaxed);
102 }
103}
104
105trait HasLocalWaker {
106 fn needs_wake(&self) -> bool;
107 fn get_waker(&self) -> Arc<LocalWaker>;
109}
110pub struct LocalTask<'a, T> {
113 future: LocalFuture<'a, T>,
114 waker: Arc<LocalWaker>,
115 complete: LocalCompletableTask<T>,
116}
117impl<'a, T> HasLocalWaker for LocalTask<'a, T>
118where
119 T: 'a,
120{
121 fn needs_wake(&self) -> bool {
122 self.waker.needs_running.load(Ordering::Relaxed)
123 }
124
125 fn get_waker(&self) -> Arc<LocalWaker> {
130 self.waker.clone()
131 }
132}
133impl<'a, T> LocalTask<'a, T>
134where
135 T: 'a,
136{
137 pub fn join_handle(&self) -> LocalTaskHandle<T> {
138 LocalTaskHandle {
139 result: self.complete.clone(),
140 }
141 }
142}
143
144impl<T> Future for LocalTask<'_, T> {
145 type Output = ();
146
147 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
148 let mself = self.get_mut();
149 match mself.future.as_mut().poll(cx) {
150 Poll::Ready(e) => {
151 let _ign = mself.complete.try_complete(e);
152 Poll::Ready(())
153 }
154 Poll::Pending => Poll::Pending,
155 }
156 }
157}
158
159impl<'a, T: 'a> LocalFutureType<'a> for LocalTask<'a, T> {}
160
161pub struct LocalTaskHandle<T> {
164 result: LocalCompletableTask<T>,
165}
166
167impl<T> LocalTaskHandle<T> {
168 pub fn get(&mut self) -> Option<T> {
172 match self.result.get() {
173 Poll::Ready(e) => Some(e),
174 Poll::Pending => None,
175 }
176 }
177}
178
179#[cfg(test)]
180mod tests {
181 use crate::CurrentThreadExecutor;
182
183 #[test]
184 pub fn test() {
185 let mut executor = CurrentThreadExecutor::new();
186
187 let mut handle = executor.submit(async { println!("Hello async") });
188 let mut handle2 = executor.submit(async { println!("Hello async2") });
189
190 assert_eq!(None, handle.get());
191 assert_eq!(None, handle2.get());
192
193 executor.run_until_complete();
194
195 assert_ne!(None, handle.get());
196 assert_ne!(None, handle2.get());
197 }
198}