signal_future/
lib.rs

1use parking_lot::Mutex;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::task::Context;
6use std::task::Poll;
7use std::task::Waker;
8
9struct State<T = ()> {
10  value: Option<T>,
11  waker: Option<Waker>,
12}
13
14#[derive(Clone)]
15pub struct SignalFutureController<T = ()> {
16  shared_state: Arc<Mutex<State<T>>>,
17}
18
19impl<T> SignalFutureController<T> {
20  pub fn signal(&self, value: T) {
21    let mut shared_state = self.shared_state.lock();
22    shared_state.value = Some(value);
23    if let Some(waker) = shared_state.waker.take() {
24      waker.wake();
25    };
26  }
27}
28
29/// A simple future that can be programmatically resolved externally using the controller that is provided in tandem when creating a `SignalFuture`. This makes it useful as a way to signal to some consumer of the future that something has completed, using standard async syntax and semantics.
30///
31/// # Examples
32///
33/// ```
34/// struct DelayedWriter { fd: File, pending: Mutex<Vec<(u64, Vec<u8>, SignalFutureController)>> }
35/// impl DelayedWriter {
36///   pub async fn write(&self, offset: u64, data: Vec<u8>) {
37///     let (fut, fut_ctl) = SignalFuture::new();
38///     self.pending.lock().await.push((offset, data, fut_ctl));
39///     fut.await
40///   }
41///   pub async fn background_loop(&self) {
42///     loop {
43///       sleep(Duration::from_millis(500));
44///       for (offset, data, fut_ctl) in self.pending.lock().await.drain(..) {
45///         self.fd.write_at(offset, data).await;
46///         fut_ctl.signal(());
47///       };
48///     };
49///   }
50/// }
51/// ```
52pub struct SignalFuture<T = ()> {
53  shared_state: Arc<Mutex<State<T>>>,
54}
55
56impl<T> SignalFuture<T> {
57  pub fn new() -> (SignalFuture<T>, SignalFutureController<T>) {
58    let shared_state = Arc::new(Mutex::new(State {
59      value: None,
60      waker: None,
61    }));
62
63    (
64      SignalFuture {
65        shared_state: shared_state.clone(),
66      },
67      SignalFutureController {
68        shared_state: shared_state.clone(),
69      },
70    )
71  }
72}
73
74impl<T> Future for SignalFuture<T> {
75  type Output = T;
76
77  fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
78    let mut shared_state = self.shared_state.lock();
79    if let Some(v) = shared_state.value.take() {
80      Poll::Ready(v)
81    } else {
82      shared_state.waker = Some(cx.waker().clone());
83      Poll::Pending
84    }
85  }
86}