actm/executor/
threads.rs

1//! Thread-per-task support
2use std::future::Future;
3
4use async_trait::async_trait;
5use flume::Receiver;
6
7use super::{AsyncJoin, Executor, SyncJoin};
8use crate::types::Either;
9
10/// [`Executor`] implementation on top of native threads
11///
12/// Uses the `futures` crate local executor inside a newly spawned thread for handling async tasks
13pub struct Threads;
14
15/// Error type indicating that background thread was closed before joining
16#[derive(Clone, Copy, Debug)]
17pub struct Closed;
18
19impl std::error::Error for Closed {}
20
21impl std::fmt::Display for Closed {
22    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23        write!(f, "Background task or thread closed before join")
24    }
25}
26
27/// Join handle for a thread
28pub struct ThreadJoin<T>(Receiver<T>);
29
30#[async_trait]
31impl<T> AsyncJoin for ThreadJoin<T>
32where
33    T: Send,
34{
35    type Item = T;
36    type Error = Closed;
37    async fn async_join(self) -> Result<Self::Item, Self::Error> {
38        self.0.recv_async().await.map_err(|_| Closed)
39    }
40}
41
42impl<T> SyncJoin for ThreadJoin<T>
43where
44    T: Send + 'static,
45{
46    type Item = T;
47    type Error = Closed;
48    fn sync_join(self) -> Result<Self::Item, Self::Error> {
49        self.0.recv().map_err(|_| Closed)
50    }
51
52    fn to_async(self) -> Either<Box<dyn AsyncJoin<Item = Self::Item, Error = Self::Error>>, Self>
53    where
54        Self: Sized,
55    {
56        Either::Happy(Box::new(self))
57    }
58}
59
60impl Executor for Threads {
61    type AsyncJoin<T: Send + 'static> = ThreadJoin<T>;
62    type SyncJoin<T: Send + 'static> = ThreadJoin<T>;
63    fn spawn_async<T, F>(future: F) -> Self::AsyncJoin<T>
64    where
65        T: Send + Sync + 'static,
66        F: Future<Output = T> + Send + 'static,
67    {
68        let (i, o) = flume::bounded(1);
69        std::thread::spawn(move || {
70            let output = futures::executor::block_on(future);
71            let _result = i.send(output);
72        });
73        ThreadJoin(o)
74    }
75
76    fn spawn_sync<T, C>(closure: C) -> Self::SyncJoin<T>
77    where
78        T: Send + Sync + 'static,
79        C: FnOnce() -> T + Send + 'static,
80    {
81        let (i, o) = flume::bounded(1);
82        std::thread::spawn(move || {
83            let output = closure();
84            let _result = i.send(output);
85        });
86        ThreadJoin(o)
87    }
88}