1use std::future::Future;
10use std::pin::Pin;
11use std::sync::mpsc::{Sender, TryRecvError};
12use std::sync::Arc;
13use std::task::{Context, Poll};
14use std::thread::JoinHandle;
15
16use crate::{CompletableTask, CurrentThreadExecutor, TaskError};
17
18pub struct SingleThreadExecutor {
28 sender: Option<Sender<TaskExchange>>,
29 handle: Option<JoinHandle<()>>,
30}
31
32pub(crate) type SingleThreadFuture<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;
33
34impl SingleThreadExecutor {
35 #[must_use]
41 pub fn new() -> SingleThreadExecutor {
42 let (sender, receiver) = std::sync::mpsc::channel::<TaskExchange>();
43
44 let handle = std::thread::spawn(move || {
45 let mut current = CurrentThreadExecutor::new();
46 loop {
47 if let Some(task) = match receiver.try_recv() {
48 Ok(e) => Some(e),
49 Err(e) => {
50 if e == TryRecvError::Disconnected {
51 break;
52 }
53 None
54 }
55 } {
56 current.submit(task.inner);
57 };
58 current.run_some();
59 }
60 current.run_until_complete();
61 });
62 SingleThreadExecutor {
63 handle: Some(handle),
64 sender: Some(sender),
65 }
66 }
67
68 pub fn submit<T: Send + 'static, F: Future<Output = T> + Send + 'static>(
75 &mut self,
76 fut: F,
77 ) -> Result<TaskHandle<T>, TaskError> {
78 let complete = Arc::new(CompletableTask::new());
79 let task = SingleThreadTask::new(Box::pin(fut), complete.clone());
80 if let Some(sender) = &self.sender {
81 let _res = sender.send(TaskExchange {
82 inner: Box::pin(task),
83 });
84 }
85 Ok(TaskHandle {
86 completer: complete,
87 })
88 }
89
90 pub fn run_until_complete(self) {
93 drop(self)
94 }
95}
96
97impl Default for SingleThreadExecutor {
100 fn default() -> Self {
103 Self::new()
104 }
105}
106
107impl Drop for SingleThreadExecutor {
108 fn drop(&mut self) {
109 if let Some(sender) = self.sender.take() {
110 drop(sender);
111 }
112 if let Some(handle) = self.handle.take() {
113 let _res = handle.join();
114 }
115 }
116}
117
118pub(crate) struct TaskExchange {
121 pub(crate) inner: SingleThreadFuture<()>,
122}
123
124pub struct TaskHandle<T> {
127 pub(crate) completer: Arc<CompletableTask<T>>,
128}
129
130impl<T> TaskHandle<T> {
131 #[must_use]
134 pub fn is_complete(&self) -> bool {
135 self.completer.is_complete().unwrap_or(false)
136 }
137
138 pub fn get(&self) -> Option<T> {
142 self.completer.take_blocking().ok()
143 }
144}
145
146pub(crate) struct SingleThreadTask<T> {
150 future: SingleThreadFuture<T>,
151 complete: Arc<CompletableTask<T>>,
152}
153
154impl<T> SingleThreadTask<T> {
155 pub fn new(future: SingleThreadFuture<T>, complete: Arc<CompletableTask<T>>) -> Self {
156 SingleThreadTask { future, complete }
157 }
158}
159
160impl<T> Future for SingleThreadTask<T> {
161 type Output = ();
162
163 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
164 let mself = self.get_mut();
165 match mself.future.as_mut().poll(cx) {
166 Poll::Ready(e) => {
167 let _ign = mself.complete.try_complete(e);
168 Poll::Ready(())
169 }
170 Poll::Pending => Poll::Pending,
171 }
172 }
173}
174
175#[cfg(test)]
176mod tests {
177 use crate::{SingleThreadExecutor, TaskError};
178
179 #[test]
180 pub fn test() -> Result<(), TaskError> {
181 let mut exec = SingleThreadExecutor::new();
182 let borrowed = String::new();
183 let hnd = exec.submit(async move {
184 println!("Hello from thread! {borrowed}");
185 })?;
186
187 drop(exec);
188 assert!(hnd.is_complete());
189 assert_eq!(Some(()), hnd.get());
190
191 Ok(())
192 }
193}