irox_threading/
single.rs

1// SPDX-License-Identifier: MIT
2// Copyright 2025 IROX Contributors
3//
4
5//!
6//! Single-Thread Executor implementation
7//!
8
9use std::future::Future;
10use std::pin::Pin;
11use std::sync::mpsc::{Sender, TryRecvError};
12use std::sync::Arc;
13use std::task::{Context, Poll};
14use std::thread::JoinHandle;
15
16use crate::{CompletableTask, CurrentThreadExecutor, TaskError};
17
18///
19/// An executor implementation backed by a single thread.
20///
21/// Unfortunately, in order to maintain a 100% "safe" codebase, it can only accept futures with a
22/// lifetime of [`'static`].  The [`CurrentThreadExecutor`] does not have this limitation.
23///
24/// This actually uses a [`CurrentThreadExecutor`] wrapped in a single thread.
25///
26/// This executor will run all tasks to completion, even when dropped.
27pub struct SingleThreadExecutor {
28    sender: Option<Sender<TaskExchange>>,
29    handle: Option<JoinHandle<()>>,
30}
31
32pub(crate) type SingleThreadFuture<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;
33
34impl SingleThreadExecutor {
35    ///
36    /// Creates a new [`SingleThreadExecutor`] and spawns a new thread to back it.  The thread
37    /// immediately starts attempting to execute jobs.
38    ///
39    /// The queue is an unlimited queue, and will happily accept as many jobs as you can pass to it.
40    #[must_use]
41    pub fn new() -> SingleThreadExecutor {
42        let (sender, receiver) = std::sync::mpsc::channel::<TaskExchange>();
43
44        let handle = std::thread::spawn(move || {
45            let mut current = CurrentThreadExecutor::new();
46            loop {
47                if let Some(task) = match receiver.try_recv() {
48                    Ok(e) => Some(e),
49                    Err(e) => {
50                        if e == TryRecvError::Disconnected {
51                            break;
52                        }
53                        None
54                    }
55                } {
56                    current.submit(task.inner);
57                };
58                current.run_some();
59            }
60            current.run_until_complete();
61        });
62        SingleThreadExecutor {
63            handle: Some(handle),
64            sender: Some(sender),
65        }
66    }
67
68    ///
69    /// Submits a new task to be run on this executor.  The task will start to be run as soon
70    /// as the executor has available capacity to run it.
71    ///
72    /// This function returns a [`TaskHandle`] that can be used to retrieve any return
73    /// result from the operation itself.
74    pub fn submit<T: Send + 'static, F: Future<Output = T> + Send + 'static>(
75        &mut self,
76        fut: F,
77    ) -> Result<TaskHandle<T>, TaskError> {
78        let complete = Arc::new(CompletableTask::new());
79        let task = SingleThreadTask::new(Box::pin(fut), complete.clone());
80        if let Some(sender) = &self.sender {
81            let _res = sender.send(TaskExchange {
82                inner: Box::pin(task),
83            });
84        }
85        Ok(TaskHandle {
86            completer: complete,
87        })
88    }
89
90    ///
91    /// Runs this executor until all tasks are complete.
92    pub fn run_until_complete(self) {
93        drop(self)
94    }
95}
96
97///
98/// Same as [`SingleThreadExecutor::new`]
99impl Default for SingleThreadExecutor {
100    ///
101    /// Same as [`SingleThreadExecutor::new`]
102    fn default() -> Self {
103        Self::new()
104    }
105}
106
107impl Drop for SingleThreadExecutor {
108    fn drop(&mut self) {
109        if let Some(sender) = self.sender.take() {
110            drop(sender);
111        }
112        if let Some(handle) = self.handle.take() {
113            let _res = handle.join();
114        }
115    }
116}
117
118///
119/// Simplifying struct to pass between the caller and the executor thread.
120pub(crate) struct TaskExchange {
121    pub(crate) inner: SingleThreadFuture<()>,
122}
123
124///
125/// A handle to the return result of the submitted task.
126pub struct TaskHandle<T> {
127    pub(crate) completer: Arc<CompletableTask<T>>,
128}
129
130impl<T> TaskHandle<T> {
131    ///
132    /// Returns true if the task has completed and a result is available.
133    #[must_use]
134    pub fn is_complete(&self) -> bool {
135        self.completer.is_complete().unwrap_or(false)
136    }
137
138    ///
139    /// Blocks until the task is complete, and returns [`Ok(T)`] if the task completed,
140    /// or [`Err`] if a mutex/locking (panic) occurred during execution.
141    pub fn get(&self) -> Option<T> {
142        self.completer.take_blocking().ok()
143    }
144}
145
146///
147/// A wrapper future task that actually executes the requested task, and completes the
148/// future to idnicate that the
149pub(crate) struct SingleThreadTask<T> {
150    future: SingleThreadFuture<T>,
151    complete: Arc<CompletableTask<T>>,
152}
153
154impl<T> SingleThreadTask<T> {
155    pub fn new(future: SingleThreadFuture<T>, complete: Arc<CompletableTask<T>>) -> Self {
156        SingleThreadTask { future, complete }
157    }
158}
159
160impl<T> Future for SingleThreadTask<T> {
161    type Output = ();
162
163    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
164        let mself = self.get_mut();
165        match mself.future.as_mut().poll(cx) {
166            Poll::Ready(e) => {
167                let _ign = mself.complete.try_complete(e);
168                Poll::Ready(())
169            }
170            Poll::Pending => Poll::Pending,
171        }
172    }
173}
174
175#[cfg(test)]
176mod tests {
177    use crate::{SingleThreadExecutor, TaskError};
178
179    #[test]
180    pub fn test() -> Result<(), TaskError> {
181        let mut exec = SingleThreadExecutor::new();
182        let borrowed = String::new();
183        let hnd = exec.submit(async move {
184            println!("Hello from thread! {borrowed}");
185        })?;
186
187        drop(exec);
188        assert!(hnd.is_complete());
189        assert_eq!(Some(()), hnd.get());
190
191        Ok(())
192    }
193}