Skip to main content

commonware_runtime/utils/
signal.rs

1//! Mechanisms for coordinating actions across many tasks.
2
3use commonware_utils::channel::oneshot::{self, error::RecvError};
4use futures::{future::Shared, FutureExt};
5use std::{
6    future::Future,
7    pin::Pin,
8    sync::Arc,
9    task::{Context, Poll},
10};
11
12/// A one-time broadcast that can be awaited by many tasks. It is often used for
13/// coordinating shutdown across many tasks.
14///
15/// Each [Signal] tracks its lifecycle to enable proper shutdown coordination.
16/// To minimize overhead, it is recommended to wait on a reference to it
17/// (i.e. `&mut signal`) in loops rather than creating multiple `Signal`s.
18///
19/// # Example
20///
21/// ## Basic Usage
22///
23/// ```rust
24/// use commonware_runtime::{Spawner, Runner, deterministic, signal::Signaler};
25///
26/// let executor = deterministic::Runner::default();
27/// executor.start(|context| async move {
28///     // Setup signaler and get future
29///     let (signaler, signal) = Signaler::new();
30///
31///     // Signal shutdown
32///     signaler.signal(2);
33///
34///     // Wait for shutdown in task
35///     let sig = signal.await.unwrap();
36///     println!("Received signal: {}", sig);
37/// });
38/// ```
39///
40/// ## Advanced Usage
41///
42/// While `Futures::Shared` is efficient, there is still meaningful overhead
43/// to cloning it (i.e. in each iteration of a loop). To avoid
44/// a performance regression from introducing `Signaler`, it is recommended
45/// to wait on a reference to `Signal` (i.e. `&mut signal`).
46///
47/// _Note: Polling the same `Signal` after it has resolved will always panic.
48/// When waiting on a reference to a `Signal`, ensure it is either fused
49/// or not polled again after it has yielded a result._
50///
51/// ```rust
52/// use commonware_macros::select;
53/// use commonware_runtime::{Clock, Spawner, Runner, deterministic, Metrics, signal::Signaler};
54/// use commonware_utils::channel::oneshot;
55/// use std::time::Duration;
56///
57/// let executor = deterministic::Runner::default();
58/// executor.start(|context| async move {
59///     // Setup signaler and get future
60///     let (signaler, mut signal) = Signaler::new();
61///
62///     // Loop on the signal until resolved
63///     let (tx, rx) = oneshot::channel();
64///     context.with_label("waiter").spawn(|context| async move {
65///         // Wait for signal or sleep
66///         loop {
67///             select! {
68///                 sig = &mut signal => {
69///                     println!("Received signal: {}", sig.unwrap());
70///                     break;
71///                 },
72///                 _ = context.sleep(Duration::from_secs(1)) => {},
73///             }
74///         }
75///         let _ = tx.send(());
76///     });
77///
78///     // Send signal
79///     signaler.signal(9);
80///
81///     // Wait for task
82///     rx.await.expect("shutdown signaled");
83/// });
84/// ```
85#[derive(Clone)]
86pub enum Signal {
87    /// A signal that will resolve when the signaler marks it as resolved.
88    Open(Receiver),
89    /// A signal that has been resolved with a known value.
90    Closed(i32),
91}
92
93impl Future for Signal {
94    type Output = Result<i32, RecvError>;
95
96    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
97        match &mut *self {
98            Self::Open(live) => Pin::new(&mut live.inner).poll(cx),
99            Self::Closed(value) => Poll::Ready(Ok(*value)),
100        }
101    }
102}
103
104/// An open [Signal] with completion tracking.
105#[derive(Clone)]
106pub struct Receiver {
107    inner: Shared<oneshot::Receiver<i32>>,
108    _guard: Arc<Guard>,
109}
110
111/// A guard used to coordinate the resolution of a [Signal].
112struct Guard {
113    tx: Option<oneshot::Sender<()>>,
114}
115
116impl Guard {
117    /// Create a new [Guard] that will resolve when the [Signaler] marks it as resolved.
118    pub const fn new(completion_tx: oneshot::Sender<()>) -> Self {
119        Self {
120            tx: Some(completion_tx),
121        }
122    }
123}
124
125impl Drop for Guard {
126    fn drop(&mut self) {
127        if let Some(tx) = self.tx.take() {
128            let _ = tx.send(());
129        }
130    }
131}
132
133/// Coordinates a one-time signal across many tasks.
134pub struct Signaler {
135    tx: oneshot::Sender<i32>,
136    completion_rx: oneshot::Receiver<()>,
137}
138
139impl Signaler {
140    /// Create a new [Signaler].
141    ///
142    /// Returns a [Signaler] and a [Signal] that will resolve when [Signaler::signal] is called.
143    pub fn new() -> (Self, Signal) {
144        let (tx, rx) = oneshot::channel();
145        let (completion_tx, completion_rx) = oneshot::channel();
146
147        let signaler = Self { tx, completion_rx };
148        let signal = Signal::Open(Receiver {
149            inner: rx.shared(),
150            _guard: Arc::new(Guard::new(completion_tx)),
151        });
152
153        (signaler, signal)
154    }
155
156    /// Resolve all [Signal]s associated with this [Signaler].
157    pub fn signal(self, value: i32) -> oneshot::Receiver<()> {
158        let _ = self.tx.send(value);
159        self.completion_rx
160    }
161}
162
163/// Employs [Signaler] to coordinate the graceful shutdown of many tasks.
164pub enum Stopper {
165    /// The stopper is running and stop has not been called yet.
166    Running {
167        // We must use an Option here because we need to move the signaler out of the
168        // Running state when stopping.
169        signaler: Option<Signaler>,
170        signal: Signal,
171    },
172    /// Stop has been called and completion is pending or resolved.
173    Stopped {
174        stop_value: i32,
175        completion: Shared<oneshot::Receiver<()>>,
176    },
177}
178
179impl Stopper {
180    /// Create a new stopper in running mode.
181    pub fn new() -> Self {
182        let (signaler, signal) = Signaler::new();
183        Self::Running {
184            signaler: Some(signaler),
185            signal,
186        }
187    }
188
189    /// Get the signal for runtime users to await.
190    pub fn stopped(&self) -> Signal {
191        match self {
192            Self::Running { signal, .. } => signal.clone(),
193            Self::Stopped { stop_value, .. } => Signal::Closed(*stop_value),
194        }
195    }
196
197    /// Initiate shutdown returning a completion future.
198    /// Always returns a completion future, even if stop was already called.
199    /// If stop was already called, returns the same shared completion future
200    /// that will resolve immediately if already completed.
201    pub fn stop(&mut self, value: i32) -> Shared<oneshot::Receiver<()>> {
202        match self {
203            Self::Running { signaler, .. } => {
204                // Take the signaler out of the Option (it is always populated in Running)
205                let sig = signaler.take().unwrap();
206
207                // Signal shutdown and get the completion receiver
208                let completion_rx = sig.signal(value);
209                let shared_completion = completion_rx.shared();
210
211                // Transition to Stopped state
212                *self = Self::Stopped {
213                    stop_value: value,
214                    completion: shared_completion.clone(),
215                };
216
217                shared_completion
218            }
219            Self::Stopped { completion, .. } => {
220                // Ignore the stop value (always return the first used)
221
222                // Return existing completion (may already be resolved)
223                completion.clone()
224            }
225        }
226    }
227}
228
229impl Default for Stopper {
230    fn default() -> Self {
231        Self::new()
232    }
233}