1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::Mutex;
use std::task::Context;
use std::task::Poll;
use std::task::Waker;

struct State {
  completed: bool,
  waker: Option<Waker>,
}

#[derive(Clone)]
pub struct SignalFutureController {
  shared_state: Arc<Mutex<State>>,
}

impl SignalFutureController {
  pub fn signal(&self) {
    let mut shared_state = self.shared_state.lock().unwrap();
    shared_state.completed = true;
    if let Some(waker) = shared_state.waker.take() {
      waker.wake();
    };
  }
}

/// A simple future that can be programmatically resolved externally using the controller that is provided in tandem when creating a `SignalFuture`. This makes it useful as a way to signal to some consumer of the future that something has completed, using standard async syntax and semantics.
///
/// # Examples
///
/// ```
/// struct DelayedWriter { fd: File, pending: Mutex<Vec<(u64, Vec<u8>, SignalFutureController)>> }
/// impl DelayedWriter {
///   pub async fn write(&self, offset: u64, data: Vec<u8>) {
///     let (fut, fut_ctl) = SignalFuture::new();
///     self.pending.lock().await.push((offset, data, fut_ctl));
///     fut.await
///   }
///   pub async fn background_loop(&self) {
///     loop {
///       sleep(Duration::from_millis(500));
///       for (offset, data, fut_ctl) in self.pending.lock().await.drain(..) {
///         self.fd.write_at(offset, data).await;
///         fut_ctl.signal();
///       };
///     };
///   }
/// }
/// ```
pub struct SignalFuture {
  shared_state: Arc<Mutex<State>>,
}

impl SignalFuture {
  pub fn new() -> (SignalFuture, SignalFutureController) {
    let shared_state = Arc::new(Mutex::new(State {
      completed: false,
      waker: None,
    }));

    (
      SignalFuture {
        shared_state: shared_state.clone(),
      },
      SignalFutureController {
        shared_state: shared_state.clone(),
      },
    )
  }
}

impl Future for SignalFuture {
  type Output = ();

  fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
    let mut shared_state = self.shared_state.lock().unwrap();
    if shared_state.completed {
      Poll::Ready(())
    } else {
      shared_state.waker = Some(cx.waker().clone());
      Poll::Pending
    }
  }
}