commonware_runtime/utils/
signal.rs

1//! Mechanisms for coordinating actions across many tasks.
2
3use futures::{channel::oneshot, future::Shared, FutureExt};
4use std::{
5    future::Future,
6    pin::Pin,
7    sync::Arc,
8    task::{Context, Poll},
9};
10
11/// A one-time broadcast that can be awaited by many tasks. It is often used for
12/// coordinating shutdown across many tasks.
13///
14/// Each [Signal] tracks its lifecycle to enable proper shutdown coordination.
15/// To minimize overhead, it is recommended to wait on a reference to it
16/// (i.e. `&mut signal`) in loops rather than creating multiple `Signal`s.
17///
18/// # Example
19///
20/// ## Basic Usage
21///
22/// ```rust
23/// use commonware_runtime::{Spawner, Runner, deterministic, signal::Signaler};
24///
25/// let executor = deterministic::Runner::default();
26/// executor.start(|context| async move {
27///     // Setup signaler and get future
28///     let (signaler, signal) = Signaler::new();
29///
30///     // Signal shutdown
31///     signaler.signal(2);
32///
33///     // Wait for shutdown in task
34///     let sig = signal.await.unwrap();
35///     println!("Received signal: {}", sig);
36/// });
37/// ```
38///
39/// ## Advanced Usage
40///
41/// While `Futures::Shared` is efficient, there is still meaningful overhead
42/// to cloning it (i.e. in each iteration of a loop). To avoid
43/// a performance regression from introducing `Signaler`, it is recommended
44/// to wait on a reference to `Signal` (i.e. `&mut signal`).
45///
46/// ```rust
47/// use commonware_macros::select;
48/// use commonware_runtime::{Clock, Spawner, Runner, deterministic, Metrics, signal::Signaler};
49/// use futures::channel::oneshot;
50/// use std::time::Duration;
51///
52/// let executor = deterministic::Runner::default();
53/// executor.start(|context| async move {
54///     // Setup signaler and get future
55///     let (signaler, mut signal) = Signaler::new();
56///
57///     // Loop on the signal until resolved
58///     let (tx, rx) = oneshot::channel();
59///     context.with_label("waiter").spawn(|context| async move {
60///         loop {
61///             // Wait for signal or sleep
62///             select! {
63///                  sig = &mut signal => {
64///                      println!("Received signal: {}", sig.unwrap());
65///                      break;
66///                  },
67///                  _ = context.sleep(Duration::from_secs(1)) => {},
68///             };
69///         }
70///         let _ = tx.send(());
71///     });
72///
73///     // Send signal
74///     signaler.signal(9);
75///
76///     // Wait for task
77///     rx.await.expect("shutdown signaled");
78/// });
79/// ```
80#[derive(Clone)]
81pub enum Signal {
82    /// A signal that will resolve when the signaler marks it as resolved.
83    Open(Receiver),
84    /// A signal that has been resolved with a known value.
85    Closed(i32),
86}
87
88impl Future for Signal {
89    type Output = Result<i32, oneshot::Canceled>;
90
91    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
92        match &mut *self {
93            Signal::Open(live) => Pin::new(&mut live.inner).poll(cx),
94            Signal::Closed(value) => Poll::Ready(Ok(*value)),
95        }
96    }
97}
98
99/// An open [Signal] with completion tracking.
100#[derive(Clone)]
101pub struct Receiver {
102    inner: Shared<oneshot::Receiver<i32>>,
103    _guard: Arc<Guard>,
104}
105
106/// A guard used to coordinate the resolution of a [Signal].
107struct Guard {
108    tx: Option<oneshot::Sender<()>>,
109}
110
111impl Guard {
112    /// Create a new [Guard] that will resolve when the [Signaler] marks it as resolved.
113    pub fn new(completion_tx: oneshot::Sender<()>) -> Self {
114        Self {
115            tx: Some(completion_tx),
116        }
117    }
118}
119
120impl Drop for Guard {
121    fn drop(&mut self) {
122        if let Some(tx) = self.tx.take() {
123            let _ = tx.send(());
124        }
125    }
126}
127
128/// Coordinates a one-time signal across many tasks.
129pub struct Signaler {
130    tx: oneshot::Sender<i32>,
131    completion_rx: oneshot::Receiver<()>,
132}
133
134impl Signaler {
135    /// Create a new [Signaler].
136    ///
137    /// Returns a [Signaler] and a [Signal] that will resolve when [Signaler::signal] is called.
138    pub fn new() -> (Self, Signal) {
139        let (tx, rx) = oneshot::channel();
140        let (completion_tx, completion_rx) = oneshot::channel();
141
142        let signaler = Self { tx, completion_rx };
143        let signal = Signal::Open(Receiver {
144            inner: rx.shared(),
145            _guard: Arc::new(Guard::new(completion_tx)),
146        });
147
148        (signaler, signal)
149    }
150
151    /// Resolve all [Signal]s associated with this [Signaler].
152    pub fn signal(self, value: i32) -> oneshot::Receiver<()> {
153        let _ = self.tx.send(value);
154        self.completion_rx
155    }
156}
157
158/// Employs [Signaler] to coordinate the graceful shutdown of many tasks.
159pub enum Stopper {
160    /// The stopper is running and stop has not been called yet.
161    Running {
162        // We must use an Option here because we need to move the signaler out of the
163        // Running state when stopping.
164        signaler: Option<Signaler>,
165        signal: Signal,
166    },
167    /// Stop has been called and completion is pending or resolved.
168    Stopped {
169        stop_value: i32,
170        completion: Shared<oneshot::Receiver<()>>,
171    },
172}
173
174impl Stopper {
175    /// Create a new stopper in running mode.
176    pub fn new() -> Self {
177        let (signaler, signal) = Signaler::new();
178        Self::Running {
179            signaler: Some(signaler),
180            signal,
181        }
182    }
183
184    /// Get the signal for runtime users to await.
185    pub fn stopped(&self) -> Signal {
186        match self {
187            Self::Running { signal, .. } => signal.clone(),
188            Self::Stopped { stop_value, .. } => Signal::Closed(*stop_value),
189        }
190    }
191
192    /// Initiate shutdown returning a completion future.
193    /// Always returns a completion future, even if stop was already called.
194    /// If stop was already called, returns the same shared completion future
195    /// that will resolve immediately if already completed.
196    pub fn stop(&mut self, value: i32) -> Shared<oneshot::Receiver<()>> {
197        match self {
198            Self::Running { signaler, .. } => {
199                // Take the signaler out of the Option (it is always populated in Running)
200                let sig = signaler.take().unwrap();
201
202                // Signal shutdown and get the completion receiver
203                let completion_rx = sig.signal(value);
204                let shared_completion = completion_rx.shared();
205
206                // Transition to Stopped state
207                *self = Self::Stopped {
208                    stop_value: value,
209                    completion: shared_completion.clone(),
210                };
211
212                shared_completion
213            }
214            Self::Stopped { completion, .. } => {
215                // Ignore the stop value (always return the first used)
216
217                // Return existing completion (may already be resolved)
218                completion.clone()
219            }
220        }
221    }
222}
223
224impl Default for Stopper {
225    fn default() -> Self {
226        Self::new()
227    }
228}