commonware_runtime/utils/signal.rs
1//! Mechanisms for coordinating actions across many tasks.
2
3use commonware_utils::channel::oneshot::{self, error::RecvError};
4use futures::{future::Shared, FutureExt};
5use std::{
6 future::Future,
7 pin::Pin,
8 sync::Arc,
9 task::{Context, Poll},
10};
11
12/// A one-time broadcast that can be awaited by many tasks. It is often used for
13/// coordinating shutdown across many tasks.
14///
15/// Each [Signal] tracks its lifecycle to enable proper shutdown coordination.
16/// To minimize overhead, it is recommended to wait on a reference to it
17/// (i.e. `&mut signal`) in loops rather than creating multiple `Signal`s.
18///
19/// # Example
20///
21/// ## Basic Usage
22///
23/// ```rust
24/// use commonware_runtime::{Spawner, Runner, deterministic, signal::Signaler};
25///
26/// let executor = deterministic::Runner::default();
27/// executor.start(|context| async move {
28/// // Setup signaler and get future
29/// let (signaler, signal) = Signaler::new();
30///
31/// // Signal shutdown
32/// signaler.signal(2);
33///
34/// // Wait for shutdown in task
35/// let sig = signal.await.unwrap();
36/// println!("Received signal: {}", sig);
37/// });
38/// ```
39///
40/// ## Advanced Usage
41///
42/// While `Futures::Shared` is efficient, there is still meaningful overhead
43/// to cloning it (i.e. in each iteration of a loop). To avoid
44/// a performance regression from introducing `Signaler`, it is recommended
45/// to wait on a reference to `Signal` (i.e. `&mut signal`).
46///
47/// _Note: Polling the same `Signal` after it has resolved will always panic.
48/// When waiting on a reference to a `Signal`, ensure it is either fused
49/// or not polled again after it has yielded a result._
50///
51/// ```rust
52/// use commonware_macros::select;
53/// use commonware_runtime::{Clock, Spawner, Runner, deterministic, Metrics, signal::Signaler};
54/// use commonware_utils::channel::oneshot;
55/// use std::time::Duration;
56///
57/// let executor = deterministic::Runner::default();
58/// executor.start(|context| async move {
59/// // Setup signaler and get future
60/// let (signaler, mut signal) = Signaler::new();
61///
62/// // Loop on the signal until resolved
63/// let (tx, rx) = oneshot::channel();
64/// context.with_label("waiter").spawn(|context| async move {
65/// // Wait for signal or sleep
66/// loop {
67/// select! {
68/// sig = &mut signal => {
69/// println!("Received signal: {}", sig.unwrap());
70/// break;
71/// },
72/// _ = context.sleep(Duration::from_secs(1)) => {},
73/// }
74/// }
75/// let _ = tx.send(());
76/// });
77///
78/// // Send signal
79/// signaler.signal(9);
80///
81/// // Wait for task
82/// rx.await.expect("shutdown signaled");
83/// });
84/// ```
85#[derive(Clone)]
86pub enum Signal {
87 /// A signal that will resolve when the signaler marks it as resolved.
88 Open(Receiver),
89 /// A signal that has been resolved with a known value.
90 Closed(i32),
91}
92
93impl Future for Signal {
94 type Output = Result<i32, RecvError>;
95
96 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
97 match &mut *self {
98 Self::Open(live) => Pin::new(&mut live.inner).poll(cx),
99 Self::Closed(value) => Poll::Ready(Ok(*value)),
100 }
101 }
102}
103
104/// An open [Signal] with completion tracking.
105#[derive(Clone)]
106pub struct Receiver {
107 inner: Shared<oneshot::Receiver<i32>>,
108 _guard: Arc<Guard>,
109}
110
111/// A guard used to coordinate the resolution of a [Signal].
112struct Guard {
113 tx: Option<oneshot::Sender<()>>,
114}
115
116impl Guard {
117 /// Create a new [Guard] that will resolve when the [Signaler] marks it as resolved.
118 pub const fn new(completion_tx: oneshot::Sender<()>) -> Self {
119 Self {
120 tx: Some(completion_tx),
121 }
122 }
123}
124
125impl Drop for Guard {
126 fn drop(&mut self) {
127 if let Some(tx) = self.tx.take() {
128 let _ = tx.send(());
129 }
130 }
131}
132
133/// Coordinates a one-time signal across many tasks.
134pub struct Signaler {
135 tx: oneshot::Sender<i32>,
136 completion_rx: oneshot::Receiver<()>,
137}
138
139impl Signaler {
140 /// Create a new [Signaler].
141 ///
142 /// Returns a [Signaler] and a [Signal] that will resolve when [Signaler::signal] is called.
143 pub fn new() -> (Self, Signal) {
144 let (tx, rx) = oneshot::channel();
145 let (completion_tx, completion_rx) = oneshot::channel();
146
147 let signaler = Self { tx, completion_rx };
148 let signal = Signal::Open(Receiver {
149 inner: rx.shared(),
150 _guard: Arc::new(Guard::new(completion_tx)),
151 });
152
153 (signaler, signal)
154 }
155
156 /// Resolve all [Signal]s associated with this [Signaler].
157 pub fn signal(self, value: i32) -> oneshot::Receiver<()> {
158 let _ = self.tx.send(value);
159 self.completion_rx
160 }
161}
162
163/// Employs [Signaler] to coordinate the graceful shutdown of many tasks.
164pub enum Stopper {
165 /// The stopper is running and stop has not been called yet.
166 Running {
167 // We must use an Option here because we need to move the signaler out of the
168 // Running state when stopping.
169 signaler: Option<Signaler>,
170 signal: Signal,
171 },
172 /// Stop has been called and completion is pending or resolved.
173 Stopped {
174 stop_value: i32,
175 completion: Shared<oneshot::Receiver<()>>,
176 },
177}
178
179impl Stopper {
180 /// Create a new stopper in running mode.
181 pub fn new() -> Self {
182 let (signaler, signal) = Signaler::new();
183 Self::Running {
184 signaler: Some(signaler),
185 signal,
186 }
187 }
188
189 /// Get the signal for runtime users to await.
190 pub fn stopped(&self) -> Signal {
191 match self {
192 Self::Running { signal, .. } => signal.clone(),
193 Self::Stopped { stop_value, .. } => Signal::Closed(*stop_value),
194 }
195 }
196
197 /// Initiate shutdown returning a completion future.
198 /// Always returns a completion future, even if stop was already called.
199 /// If stop was already called, returns the same shared completion future
200 /// that will resolve immediately if already completed.
201 pub fn stop(&mut self, value: i32) -> Shared<oneshot::Receiver<()>> {
202 match self {
203 Self::Running { signaler, .. } => {
204 // Take the signaler out of the Option (it is always populated in Running)
205 let sig = signaler.take().unwrap();
206
207 // Signal shutdown and get the completion receiver
208 let completion_rx = sig.signal(value);
209 let shared_completion = completion_rx.shared();
210
211 // Transition to Stopped state
212 *self = Self::Stopped {
213 stop_value: value,
214 completion: shared_completion.clone(),
215 };
216
217 shared_completion
218 }
219 Self::Stopped { completion, .. } => {
220 // Ignore the stop value (always return the first used)
221
222 // Return existing completion (may already be resolved)
223 completion.clone()
224 }
225 }
226 }
227}
228
229impl Default for Stopper {
230 fn default() -> Self {
231 Self::new()
232 }
233}