1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
// SPDX-License-Identifier: MIT
// Copyright 2023 IROX Contributors
//

//!
//! Single-Thread Executor implementation
//!

use std::future::Future;
use std::pin::Pin;
use std::sync::mpsc::{Sender, TryRecvError};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::thread::JoinHandle;

use crate::{CompletableTask, CurrentThreadExecutor, TaskError};

///
/// An executor implementation backed by a single thread.
///
/// Unfortunately, in order to maintain a 100% "safe" codebase, it can only accept futures with a
/// lifetime of [`'static`].  The [`CurrentThreadExecutor`] does not have this limitation.
///
/// This actually uses a [`CurrentThreadExecutor`] wrapped in a single thread.
///
/// This executor will run all tasks to completion, even when dropped.
pub struct SingleThreadExecutor {
    sender: Option<Sender<TaskExchange>>,
    handle: Option<JoinHandle<()>>,
}

pub(crate) type SingleThreadFuture<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;

impl SingleThreadExecutor {
    ///
    /// Creates a new [`SingleThreadExecutor`] and spawns a new thread to back it.  The thread
    /// immediately starts attempting to execute jobs.
    ///
    /// The queue is an unlimited queue, and will happily accept as many jobs as you can pass to it.
    #[must_use]
    pub fn new() -> SingleThreadExecutor {
        let (sender, receiver) = std::sync::mpsc::channel::<TaskExchange>();

        let handle = std::thread::spawn(move || {
            let mut current = CurrentThreadExecutor::new();
            loop {
                if let Some(task) = match receiver.try_recv() {
                    Ok(e) => Some(e),
                    Err(e) => {
                        if e == TryRecvError::Disconnected {
                            break;
                        }
                        None
                    }
                } {
                    current.submit(task.inner);
                };
                current.run_some();
            }
            current.run_until_complete();
        });
        SingleThreadExecutor {
            handle: Some(handle),
            sender: Some(sender),
        }
    }

    ///
    /// Submits a new task to be run on this executor.  The task will start to be run as soon
    /// as the executor has available capacity to run it.
    ///
    /// This function returns a [`TaskHandle`] that can be used to retrieve any return
    /// result from the operation itself.
    pub fn submit<T: Send + 'static, F: Future<Output = T> + Send + 'static>(
        &mut self,
        fut: F,
    ) -> Result<TaskHandle<T>, TaskError> {
        let complete = Arc::new(CompletableTask::new());
        let task = SingleThreadTask::new(Box::pin(fut), complete.clone());
        if let Some(sender) = &self.sender {
            let _res = sender.send(TaskExchange {
                inner: Box::pin(task),
            });
        }
        Ok(TaskHandle {
            completer: complete,
        })
    }

    ///
    /// Runs this executor until all tasks are complete.
    pub fn run_until_complete(self) {
        drop(self)
    }
}

///
/// Same as [`SingleThreadExecutor::new`]
impl Default for SingleThreadExecutor {
    ///
    /// Same as [`SingleThreadExecutor::new`]
    fn default() -> Self {
        Self::new()
    }
}

impl Drop for SingleThreadExecutor {
    fn drop(&mut self) {
        if let Some(sender) = self.sender.take() {
            drop(sender);
        }
        if let Some(handle) = self.handle.take() {
            let _res = handle.join();
        }
    }
}

///
/// Simplifying struct to pass between the caller and the executor thread.
pub(crate) struct TaskExchange {
    pub(crate) inner: SingleThreadFuture<()>,
}

///
/// A handle to the return result of the submitted task.
pub struct TaskHandle<T> {
    pub(crate) completer: Arc<CompletableTask<T>>,
}

impl<T> TaskHandle<T> {
    ///
    /// Returns true if the task has completed and a result is available.
    #[must_use]
    pub fn is_complete(&self) -> bool {
        self.completer.is_complete().unwrap_or(false)
    }

    ///
    /// Blocks until the task is complete, and returns [`Ok(T)`] if the task completed,
    /// or [`Err`] if a mutex/locking (panic) occurred during execution.
    pub fn get(&self) -> Option<T> {
        self.completer.take_blocking().ok()
    }
}

///
/// A wrapper future task that actually executes the requested task, and completes the
/// future to idnicate that the
pub(crate) struct SingleThreadTask<T> {
    future: SingleThreadFuture<T>,
    complete: Arc<CompletableTask<T>>,
}

impl<T> SingleThreadTask<T> {
    pub fn new(future: SingleThreadFuture<T>, complete: Arc<CompletableTask<T>>) -> Self {
        SingleThreadTask { future, complete }
    }
}

impl<T> Future for SingleThreadTask<T> {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mself = self.get_mut();
        match mself.future.as_mut().poll(cx) {
            Poll::Ready(e) => {
                let _ign = mself.complete.try_complete(e);
                Poll::Ready(())
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

#[cfg(test)]
mod tests {
    use crate::{SingleThreadExecutor, TaskError};

    #[test]
    pub fn test() -> Result<(), TaskError> {
        let mut exec = SingleThreadExecutor::new();
        let borrowed = String::new();
        let hnd = exec.submit(async move {
            println!("Hello from thread! {borrowed}");
            ()
        })?;

        drop(exec);
        assert!(hnd.is_complete());
        assert_eq!(Some(()), hnd.get());

        Ok(())
    }
}