fast_threadpool/
handler.rs

1use crate::*;
2
3#[cfg(feature = "async")]
4#[derive(Clone, Debug)]
5/// Asynchronous handler to execute jobs on the thread pool
6pub struct ThreadPoolAsyncHandler<Shared: 'static + Clone + Send> {
7    sender: FlumeSender<MsgForWorker<Shared>>,
8}
9
10#[cfg(feature = "async")]
11impl<Shared: 'static + Clone + Send> ThreadPoolAsyncHandler<Shared> {
12    pub(crate) fn new(sender: FlumeSender<MsgForWorker<Shared>>) -> ThreadPoolAsyncHandler<Shared> {
13        ThreadPoolAsyncHandler { sender }
14    }
15    /// Execute the given closure and return a Future that output closure return type
16    pub async fn execute<F, R>(&self, f: F) -> Result<R, ThreadPoolDisconnected>
17    where
18        F: 'static + Send + FnOnce(&Shared) -> R,
19        R: 'static + Send + Sync,
20    {
21        let (s, r) = async_oneshot::oneshot();
22        self.sender
23            .send_async(MsgForWorker::NewJob(Box::new(move |shared| {
24                let _ = s.send(f(shared));
25            })))
26            .await
27            .map_err(|_| ThreadPoolDisconnected)?;
28
29        r.await.map_err(|_| ThreadPoolDisconnected)
30    }
31}
32
33#[derive(Clone, Debug)]
34/// Synchronous handler to execute jobs on the thread pool
35pub struct ThreadPoolSyncHandler<Shared: 'static + Clone + Send> {
36    sender: FlumeSender<MsgForWorker<Shared>>,
37}
38
39impl<Shared: 'static + Clone + Send> ThreadPoolSyncHandler<Shared> {
40    pub(crate) fn new(sender: FlumeSender<MsgForWorker<Shared>>) -> ThreadPoolSyncHandler<Shared> {
41        ThreadPoolSyncHandler { sender }
42    }
43    /// Execute the given job and block the current thread until finished.
44    /// If you need a non blocking method, see `ThreadPoolAsyncHandler`.
45    pub fn execute<F, R>(&self, f: F) -> Result<R, ThreadPoolDisconnected>
46    where
47        F: 'static + Send + FnOnce(&Shared) -> R,
48        R: 'static + Send,
49    {
50        let (s, r) = flume::bounded(1);
51        self.sender
52            .send(MsgForWorker::NewJob(Box::new(move |shared| {
53                let _ = s.send(f(shared));
54            })))
55            .map_err(|_| ThreadPoolDisconnected)?;
56
57        r.recv().map_err(|_| ThreadPoolDisconnected)
58    }
59    /// Launch the given job and return a oneshot receiver that listen job result.
60    /// If you need a non blocking method, see `ThreadPoolAsyncHandler`.
61    pub fn launch<F, R>(&self, f: F) -> Result<JoinHandle<R>, ThreadPoolDisconnected>
62    where
63        F: 'static + Send + FnOnce(&Shared) -> R,
64        R: 'static + Send,
65    {
66        let (s, r) = flume::bounded(1);
67        let s_clone = s.clone();
68        self.sender
69            .send(MsgForWorker::NewJob(Box::new(move |shared| {
70                let _ = s_clone.send(f(shared));
71            })))
72            .map_err(|_| ThreadPoolDisconnected)?;
73
74        Ok(JoinHandle(s, r))
75    }
76}
77
78#[derive(Debug)]
79/// Join handle
80pub struct JoinHandle<R>(FlumeSender<R>, FlumeReceiver<R>);
81
82impl<R> JoinHandle<R> {
83    /// Block the current thread until job finished
84    pub fn join(self) -> Result<R, ThreadPoolDisconnected> {
85        self.1.recv().map_err(|_| ThreadPoolDisconnected)
86    }
87}