blocking_permit/
dispatch.rs

1use std::cell::RefCell;
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6#[cfg(not(any(feature = "tokio-oneshot", feature = "futures-channel")))]
7compile_error!("One of tokio-oneshot or futures-channel (default) \
8                features is required for this crate!");
9
10#[cfg(feature="tokio-oneshot")]
11use tokio::sync::oneshot;
12
13#[cfg(not(feature="tokio-oneshot"))]
14use futures_channel::oneshot;
15
16use crate::{Canceled, DispatchPool};
17
18thread_local!(static POOL: RefCell<Option<DispatchPool>> = RefCell::new(None));
19
20/// Register a `DispatchPool` on the calling thread.
21///
22/// Any prior instance is returned. This consumes the pool by value, but it can
23/// be cloned beforehand to preserve an owned handle.
24pub fn register_dispatch_pool(pool: DispatchPool) -> Option<DispatchPool> {
25    POOL.with(|p| p.replace(Some(pool)))
26}
27
28/// Deregister and return any `DispatchPool` on the current thread.
29pub fn deregister_dispatch_pool() -> Option<DispatchPool> {
30    POOL.with(|p| p.replace(None))
31}
32
33/// Return true if a DispatchPool is registered to the current thread.
34pub fn is_dispatch_pool_registered() -> bool {
35    POOL.with(|p| p.borrow().is_some())
36}
37
38/// Dispatch a blocking operation closure to a pool, if registered.
39///
40/// If a pool has been registered via [`register_dispatch_pool`], then the
41/// closure is spawned on the pool and this returns `None`. Otherwise the
42/// original closure is returned.
43///
44/// Alternatively [`dispatch_rx`] may be used to await and obtain a return
45/// value from the closure.
46pub fn dispatch<F>(f: F) -> Option<F>
47    where F: FnOnce() + Send + 'static
48{
49    POOL.with(|p| {
50        if let Some(pool) = p.borrow().as_ref() {
51            pool.spawn(Box::new(f));
52            None
53        } else {
54            Some(f)
55        }
56    })
57}
58
59/// Value returned by [`dispatch_rx`].
60#[must_use = "futures do nothing unless awaited or polled"]
61pub enum DispatchRx<F, T> {
62    Dispatch(Dispatched<T>),
63    NotRegistered(F),
64}
65
66impl<F, T> DispatchRx<F, T> {
67    /// Unwrap to the contained `Dispatched` future.
68    ///
69    /// ## Panics
70    /// Panics if `NotRegistered`.
71    pub fn unwrap(self) -> Dispatched<T> {
72        match self {
73            DispatchRx::Dispatch(disp) => disp,
74            DispatchRx::NotRegistered(_) => {
75                panic!("no BlockingPool was registered for this thread")
76            }
77        }
78    }
79}
80
81/// Dispatch a blocking operation closure to a registered pool, returning
82/// a future for awaiting the result.
83///
84/// If a pool has been registered via [`register_dispatch_pool`], then the
85/// closure is spawned on the pool, and this returns a `Dispatched` future,
86/// which resolves to the result of the closure. Otherwise the original closure
87/// is returned.
88pub fn dispatch_rx<F, T>(f: F) -> DispatchRx<F, T>
89    where F: FnOnce() -> T + Send + 'static,
90          T: Send + 'static
91{
92    POOL.with(|p| {
93        if let Some(pool) = p.borrow().as_ref() {
94            let (tx, rx) = oneshot::channel();
95            pool.spawn(Box::new(|| {
96                tx.send(f()).ok();
97            }));
98            DispatchRx::Dispatch(Dispatched(rx))
99        } else {
100            DispatchRx::NotRegistered(f)
101        }
102    })
103}
104
105/// A future type created by [`dispatch_rx`].
106#[derive(Debug)]
107pub struct Dispatched<T>(oneshot::Receiver<T>);
108
109impl<T> Future for Dispatched<T> {
110    type Output = Result<T, Canceled>;
111
112    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
113        -> Poll<Self::Output>
114    {
115        match Future::poll(Pin::new(&mut self.0), cx) {
116            Poll::Pending => Poll::Pending,
117            Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)),
118            Poll::Ready(Err(_)) => Poll::Ready(Err(Canceled))
119        }
120    }
121}