blocking_permit/
dispatch.rs1use std::cell::RefCell;
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6#[cfg(not(any(feature = "tokio-oneshot", feature = "futures-channel")))]
7compile_error!("One of tokio-oneshot or futures-channel (default) \
8 features is required for this crate!");
9
10#[cfg(feature="tokio-oneshot")]
11use tokio::sync::oneshot;
12
13#[cfg(not(feature="tokio-oneshot"))]
14use futures_channel::oneshot;
15
16use crate::{Canceled, DispatchPool};
17
18thread_local!(static POOL: RefCell<Option<DispatchPool>> = RefCell::new(None));
19
20pub fn register_dispatch_pool(pool: DispatchPool) -> Option<DispatchPool> {
25 POOL.with(|p| p.replace(Some(pool)))
26}
27
28pub fn deregister_dispatch_pool() -> Option<DispatchPool> {
30 POOL.with(|p| p.replace(None))
31}
32
33pub fn is_dispatch_pool_registered() -> bool {
35 POOL.with(|p| p.borrow().is_some())
36}
37
38pub fn dispatch<F>(f: F) -> Option<F>
47 where F: FnOnce() + Send + 'static
48{
49 POOL.with(|p| {
50 if let Some(pool) = p.borrow().as_ref() {
51 pool.spawn(Box::new(f));
52 None
53 } else {
54 Some(f)
55 }
56 })
57}
58
59#[must_use = "futures do nothing unless awaited or polled"]
61pub enum DispatchRx<F, T> {
62 Dispatch(Dispatched<T>),
63 NotRegistered(F),
64}
65
66impl<F, T> DispatchRx<F, T> {
67 pub fn unwrap(self) -> Dispatched<T> {
72 match self {
73 DispatchRx::Dispatch(disp) => disp,
74 DispatchRx::NotRegistered(_) => {
75 panic!("no BlockingPool was registered for this thread")
76 }
77 }
78 }
79}
80
81pub fn dispatch_rx<F, T>(f: F) -> DispatchRx<F, T>
89 where F: FnOnce() -> T + Send + 'static,
90 T: Send + 'static
91{
92 POOL.with(|p| {
93 if let Some(pool) = p.borrow().as_ref() {
94 let (tx, rx) = oneshot::channel();
95 pool.spawn(Box::new(|| {
96 tx.send(f()).ok();
97 }));
98 DispatchRx::Dispatch(Dispatched(rx))
99 } else {
100 DispatchRx::NotRegistered(f)
101 }
102 })
103}
104
105#[derive(Debug)]
107pub struct Dispatched<T>(oneshot::Receiver<T>);
108
109impl<T> Future for Dispatched<T> {
110 type Output = Result<T, Canceled>;
111
112 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
113 -> Poll<Self::Output>
114 {
115 match Future::poll(Pin::new(&mut self.0), cx) {
116 Poll::Pending => Poll::Pending,
117 Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)),
118 Poll::Ready(Err(_)) => Poll::Ready(Err(Canceled))
119 }
120 }
121}