commonware_runtime/utils/
signal.rs

1//! Mechanisms for coordinating actions across many tasks.
2
3use futures::{channel::oneshot, future::Shared, FutureExt};
4use std::{
5    future::Future,
6    pin::Pin,
7    sync::Arc,
8    task::{Context, Poll},
9};
10
11/// A one-time broadcast that can be awaited by many tasks. It is often used for
12/// coordinating shutdown across many tasks.
13///
14/// Each [Signal] tracks its lifecycle to enable proper shutdown coordination.
15/// To minimize overhead, it is recommended to wait on a reference to it
16/// (i.e. `&mut signal`) in loops rather than creating multiple `Signal`s.
17///
18/// # Example
19///
20/// ## Basic Usage
21///
22/// ```rust
23/// use commonware_runtime::{Spawner, Runner, deterministic, signal::Signaler};
24///
25/// let executor = deterministic::Runner::default();
26/// executor.start(|context| async move {
27///     // Setup signaler and get future
28///     let (signaler, signal) = Signaler::new();
29///
30///     // Signal shutdown
31///     signaler.signal(2);
32///
33///     // Wait for shutdown in task
34///     let sig = signal.await.unwrap();
35///     println!("Received signal: {}", sig);
36/// });
37/// ```
38///
39/// ## Advanced Usage
40///
41/// While `Futures::Shared` is efficient, there is still meaningful overhead
42/// to cloning it (i.e. in each iteration of a loop). To avoid
43/// a performance regression from introducing `Signaler`, it is recommended
44/// to wait on a reference to `Signal` (i.e. `&mut signal`).
45///
46/// _Note: Polling the same `Signal` after it has resolved will always panic.
47/// When waiting on a reference to a `Signal`, ensure it is either fused
48/// or not polled again after it has yielded a result._
49///
50/// ```rust
51/// use commonware_macros::select;
52/// use commonware_runtime::{Clock, Spawner, Runner, deterministic, Metrics, signal::Signaler};
53/// use futures::channel::oneshot;
54/// use std::time::Duration;
55///
56/// let executor = deterministic::Runner::default();
57/// executor.start(|context| async move {
58///     // Setup signaler and get future
59///     let (signaler, mut signal) = Signaler::new();
60///
61///     // Loop on the signal until resolved
62///     let (tx, rx) = oneshot::channel();
63///     context.with_label("waiter").spawn(|context| async move {
64///         loop {
65///             // Wait for signal or sleep
66///             select! {
67///                  sig = &mut signal => {
68///                      println!("Received signal: {}", sig.unwrap());
69///                      break;
70///                  },
71///                  _ = context.sleep(Duration::from_secs(1)) => {},
72///             };
73///         }
74///         let _ = tx.send(());
75///     });
76///
77///     // Send signal
78///     signaler.signal(9);
79///
80///     // Wait for task
81///     rx.await.expect("shutdown signaled");
82/// });
83/// ```
84#[derive(Clone)]
85pub enum Signal {
86    /// A signal that will resolve when the signaler marks it as resolved.
87    Open(Receiver),
88    /// A signal that has been resolved with a known value.
89    Closed(i32),
90}
91
92impl Future for Signal {
93    type Output = Result<i32, oneshot::Canceled>;
94
95    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
96        match &mut *self {
97            Signal::Open(live) => Pin::new(&mut live.inner).poll(cx),
98            Signal::Closed(value) => Poll::Ready(Ok(*value)),
99        }
100    }
101}
102
103/// An open [Signal] with completion tracking.
104#[derive(Clone)]
105pub struct Receiver {
106    inner: Shared<oneshot::Receiver<i32>>,
107    _guard: Arc<Guard>,
108}
109
110/// A guard used to coordinate the resolution of a [Signal].
111struct Guard {
112    tx: Option<oneshot::Sender<()>>,
113}
114
115impl Guard {
116    /// Create a new [Guard] that will resolve when the [Signaler] marks it as resolved.
117    pub fn new(completion_tx: oneshot::Sender<()>) -> Self {
118        Self {
119            tx: Some(completion_tx),
120        }
121    }
122}
123
124impl Drop for Guard {
125    fn drop(&mut self) {
126        if let Some(tx) = self.tx.take() {
127            let _ = tx.send(());
128        }
129    }
130}
131
132/// Coordinates a one-time signal across many tasks.
133pub struct Signaler {
134    tx: oneshot::Sender<i32>,
135    completion_rx: oneshot::Receiver<()>,
136}
137
138impl Signaler {
139    /// Create a new [Signaler].
140    ///
141    /// Returns a [Signaler] and a [Signal] that will resolve when [Signaler::signal] is called.
142    pub fn new() -> (Self, Signal) {
143        let (tx, rx) = oneshot::channel();
144        let (completion_tx, completion_rx) = oneshot::channel();
145
146        let signaler = Self { tx, completion_rx };
147        let signal = Signal::Open(Receiver {
148            inner: rx.shared(),
149            _guard: Arc::new(Guard::new(completion_tx)),
150        });
151
152        (signaler, signal)
153    }
154
155    /// Resolve all [Signal]s associated with this [Signaler].
156    pub fn signal(self, value: i32) -> oneshot::Receiver<()> {
157        let _ = self.tx.send(value);
158        self.completion_rx
159    }
160}
161
162/// Employs [Signaler] to coordinate the graceful shutdown of many tasks.
163pub enum Stopper {
164    /// The stopper is running and stop has not been called yet.
165    Running {
166        // We must use an Option here because we need to move the signaler out of the
167        // Running state when stopping.
168        signaler: Option<Signaler>,
169        signal: Signal,
170    },
171    /// Stop has been called and completion is pending or resolved.
172    Stopped {
173        stop_value: i32,
174        completion: Shared<oneshot::Receiver<()>>,
175    },
176}
177
178impl Stopper {
179    /// Create a new stopper in running mode.
180    pub fn new() -> Self {
181        let (signaler, signal) = Signaler::new();
182        Self::Running {
183            signaler: Some(signaler),
184            signal,
185        }
186    }
187
188    /// Get the signal for runtime users to await.
189    pub fn stopped(&self) -> Signal {
190        match self {
191            Self::Running { signal, .. } => signal.clone(),
192            Self::Stopped { stop_value, .. } => Signal::Closed(*stop_value),
193        }
194    }
195
196    /// Initiate shutdown returning a completion future.
197    /// Always returns a completion future, even if stop was already called.
198    /// If stop was already called, returns the same shared completion future
199    /// that will resolve immediately if already completed.
200    pub fn stop(&mut self, value: i32) -> Shared<oneshot::Receiver<()>> {
201        match self {
202            Self::Running { signaler, .. } => {
203                // Take the signaler out of the Option (it is always populated in Running)
204                let sig = signaler.take().unwrap();
205
206                // Signal shutdown and get the completion receiver
207                let completion_rx = sig.signal(value);
208                let shared_completion = completion_rx.shared();
209
210                // Transition to Stopped state
211                *self = Self::Stopped {
212                    stop_value: value,
213                    completion: shared_completion.clone(),
214                };
215
216                shared_completion
217            }
218            Self::Stopped { completion, .. } => {
219                // Ignore the stop value (always return the first used)
220
221                // Return existing completion (may already be resolved)
222                completion.clone()
223            }
224        }
225    }
226}
227
228impl Default for Stopper {
229    fn default() -> Self {
230        Self::new()
231    }
232}