use std::future::Future;
use std::pin::Pin;
use std::sync::mpsc::{Sender, TryRecvError};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::thread::JoinHandle;
use crate::{CompletableTask, CurrentThreadExecutor, TaskError};
pub struct SingleThreadExecutor {
sender: Option<Sender<TaskExchange>>,
handle: Option<JoinHandle<()>>,
}
pub(crate) type SingleThreadFuture<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;
impl SingleThreadExecutor {
#[must_use]
pub fn new() -> SingleThreadExecutor {
let (sender, receiver) = std::sync::mpsc::channel::<TaskExchange>();
let handle = std::thread::spawn(move || {
let mut current = CurrentThreadExecutor::new();
loop {
if let Some(task) = match receiver.try_recv() {
Ok(e) => Some(e),
Err(e) => {
if e == TryRecvError::Disconnected {
break;
}
None
}
} {
current.submit(task.inner);
};
current.run_some();
}
current.run_until_complete();
});
SingleThreadExecutor {
handle: Some(handle),
sender: Some(sender),
}
}
pub fn submit<T: Send + 'static, F: Future<Output = T> + Send + 'static>(
&mut self,
fut: F,
) -> Result<TaskHandle<T>, TaskError> {
let complete = Arc::new(CompletableTask::new());
let task = SingleThreadTask::new(Box::pin(fut), complete.clone());
if let Some(sender) = &self.sender {
let _res = sender.send(TaskExchange {
inner: Box::pin(task),
});
}
Ok(TaskHandle {
completer: complete,
})
}
pub fn run_until_complete(self) {
drop(self)
}
}
impl Default for SingleThreadExecutor {
fn default() -> Self {
Self::new()
}
}
impl Drop for SingleThreadExecutor {
fn drop(&mut self) {
if let Some(sender) = self.sender.take() {
drop(sender);
}
if let Some(handle) = self.handle.take() {
let _res = handle.join();
}
}
}
pub(crate) struct TaskExchange {
pub(crate) inner: SingleThreadFuture<()>,
}
pub struct TaskHandle<T> {
pub(crate) completer: Arc<CompletableTask<T>>,
}
impl<T> TaskHandle<T> {
#[must_use]
pub fn is_complete(&self) -> bool {
self.completer.is_complete().unwrap_or(false)
}
pub fn get(&self) -> Option<T> {
self.completer.take_blocking().ok()
}
}
pub(crate) struct SingleThreadTask<T> {
future: SingleThreadFuture<T>,
complete: Arc<CompletableTask<T>>,
}
impl<T> SingleThreadTask<T> {
pub fn new(future: SingleThreadFuture<T>, complete: Arc<CompletableTask<T>>) -> Self {
SingleThreadTask { future, complete }
}
}
impl<T> Future for SingleThreadTask<T> {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mself = self.get_mut();
match mself.future.as_mut().poll(cx) {
Poll::Ready(e) => {
let _ign = mself.complete.try_complete(e);
Poll::Ready(())
}
Poll::Pending => Poll::Pending,
}
}
}
#[cfg(test)]
mod tests {
use crate::{SingleThreadExecutor, TaskError};
#[test]
pub fn test() -> Result<(), TaskError> {
let mut exec = SingleThreadExecutor::new();
let borrowed = String::new();
let hnd = exec.submit(async move {
println!("Hello from thread! {borrowed}");
})?;
drop(exec);
assert!(hnd.is_complete());
assert_eq!(Some(()), hnd.get());
Ok(())
}
}