1use std::future::Future;
3
4use async_trait::async_trait;
5use flume::Receiver;
6
7use super::{AsyncJoin, Executor, SyncJoin};
8use crate::types::Either;
9
10pub struct Threads;
14
15#[derive(Clone, Copy, Debug)]
17pub struct Closed;
18
19impl std::error::Error for Closed {}
20
21impl std::fmt::Display for Closed {
22 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23 write!(f, "Background task or thread closed before join")
24 }
25}
26
27pub struct ThreadJoin<T>(Receiver<T>);
29
30#[async_trait]
31impl<T> AsyncJoin for ThreadJoin<T>
32where
33 T: Send,
34{
35 type Item = T;
36 type Error = Closed;
37 async fn async_join(self) -> Result<Self::Item, Self::Error> {
38 self.0.recv_async().await.map_err(|_| Closed)
39 }
40}
41
42impl<T> SyncJoin for ThreadJoin<T>
43where
44 T: Send + 'static,
45{
46 type Item = T;
47 type Error = Closed;
48 fn sync_join(self) -> Result<Self::Item, Self::Error> {
49 self.0.recv().map_err(|_| Closed)
50 }
51
52 fn to_async(self) -> Either<Box<dyn AsyncJoin<Item = Self::Item, Error = Self::Error>>, Self>
53 where
54 Self: Sized,
55 {
56 Either::Happy(Box::new(self))
57 }
58}
59
60impl Executor for Threads {
61 type AsyncJoin<T: Send + 'static> = ThreadJoin<T>;
62 type SyncJoin<T: Send + 'static> = ThreadJoin<T>;
63 fn spawn_async<T, F>(future: F) -> Self::AsyncJoin<T>
64 where
65 T: Send + Sync + 'static,
66 F: Future<Output = T> + Send + 'static,
67 {
68 let (i, o) = flume::bounded(1);
69 std::thread::spawn(move || {
70 let output = futures::executor::block_on(future);
71 let _result = i.send(output);
72 });
73 ThreadJoin(o)
74 }
75
76 fn spawn_sync<T, C>(closure: C) -> Self::SyncJoin<T>
77 where
78 T: Send + Sync + 'static,
79 C: FnOnce() -> T + Send + 'static,
80 {
81 let (i, o) = flume::bounded(1);
82 std::thread::spawn(move || {
83 let output = closure();
84 let _result = i.send(output);
85 });
86 ThreadJoin(o)
87 }
88}