commonware_runtime/utils/signal.rs
1//! Mechanisms for coordinating actions across many tasks.
2
3use futures::{channel::oneshot, future::Shared, FutureExt};
4use std::{
5 future::Future,
6 pin::Pin,
7 sync::Arc,
8 task::{Context, Poll},
9};
10
11/// A one-time broadcast that can be awaited by many tasks. It is often used for
12/// coordinating shutdown across many tasks.
13///
14/// Each [Signal] tracks its lifecycle to enable proper shutdown coordination.
15/// To minimize overhead, it is recommended to wait on a reference to it
16/// (i.e. `&mut signal`) in loops rather than creating multiple `Signal`s.
17///
18/// # Example
19///
20/// ## Basic Usage
21///
22/// ```rust
23/// use commonware_runtime::{Spawner, Runner, deterministic, signal::Signaler};
24///
25/// let executor = deterministic::Runner::default();
26/// executor.start(|context| async move {
27/// // Setup signaler and get future
28/// let (signaler, signal) = Signaler::new();
29///
30/// // Signal shutdown
31/// signaler.signal(2);
32///
33/// // Wait for shutdown in task
34/// let sig = signal.await.unwrap();
35/// println!("Received signal: {}", sig);
36/// });
37/// ```
38///
39/// ## Advanced Usage
40///
41/// While `Futures::Shared` is efficient, there is still meaningful overhead
42/// to cloning it (i.e. in each iteration of a loop). To avoid
43/// a performance regression from introducing `Signaler`, it is recommended
44/// to wait on a reference to `Signal` (i.e. `&mut signal`).
45///
46/// _Note: Polling the same `Signal` after it has resolved will always panic.
47/// When waiting on a reference to a `Signal`, ensure it is either fused
48/// or not polled again after it has yielded a result._
49///
50/// ```rust
51/// use commonware_macros::select;
52/// use commonware_runtime::{Clock, Spawner, Runner, deterministic, Metrics, signal::Signaler};
53/// use futures::channel::oneshot;
54/// use std::time::Duration;
55///
56/// let executor = deterministic::Runner::default();
57/// executor.start(|context| async move {
58/// // Setup signaler and get future
59/// let (signaler, mut signal) = Signaler::new();
60///
61/// // Loop on the signal until resolved
62/// let (tx, rx) = oneshot::channel();
63/// context.with_label("waiter").spawn(|context| async move {
64/// loop {
65/// // Wait for signal or sleep
66/// select! {
67/// sig = &mut signal => {
68/// println!("Received signal: {}", sig.unwrap());
69/// break;
70/// },
71/// _ = context.sleep(Duration::from_secs(1)) => {},
72/// };
73/// }
74/// let _ = tx.send(());
75/// });
76///
77/// // Send signal
78/// signaler.signal(9);
79///
80/// // Wait for task
81/// rx.await.expect("shutdown signaled");
82/// });
83/// ```
84#[derive(Clone)]
85pub enum Signal {
86 /// A signal that will resolve when the signaler marks it as resolved.
87 Open(Receiver),
88 /// A signal that has been resolved with a known value.
89 Closed(i32),
90}
91
92impl Future for Signal {
93 type Output = Result<i32, oneshot::Canceled>;
94
95 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
96 match &mut *self {
97 Signal::Open(live) => Pin::new(&mut live.inner).poll(cx),
98 Signal::Closed(value) => Poll::Ready(Ok(*value)),
99 }
100 }
101}
102
103/// An open [Signal] with completion tracking.
104#[derive(Clone)]
105pub struct Receiver {
106 inner: Shared<oneshot::Receiver<i32>>,
107 _guard: Arc<Guard>,
108}
109
110/// A guard used to coordinate the resolution of a [Signal].
111struct Guard {
112 tx: Option<oneshot::Sender<()>>,
113}
114
115impl Guard {
116 /// Create a new [Guard] that will resolve when the [Signaler] marks it as resolved.
117 pub fn new(completion_tx: oneshot::Sender<()>) -> Self {
118 Self {
119 tx: Some(completion_tx),
120 }
121 }
122}
123
124impl Drop for Guard {
125 fn drop(&mut self) {
126 if let Some(tx) = self.tx.take() {
127 let _ = tx.send(());
128 }
129 }
130}
131
132/// Coordinates a one-time signal across many tasks.
133pub struct Signaler {
134 tx: oneshot::Sender<i32>,
135 completion_rx: oneshot::Receiver<()>,
136}
137
138impl Signaler {
139 /// Create a new [Signaler].
140 ///
141 /// Returns a [Signaler] and a [Signal] that will resolve when [Signaler::signal] is called.
142 pub fn new() -> (Self, Signal) {
143 let (tx, rx) = oneshot::channel();
144 let (completion_tx, completion_rx) = oneshot::channel();
145
146 let signaler = Self { tx, completion_rx };
147 let signal = Signal::Open(Receiver {
148 inner: rx.shared(),
149 _guard: Arc::new(Guard::new(completion_tx)),
150 });
151
152 (signaler, signal)
153 }
154
155 /// Resolve all [Signal]s associated with this [Signaler].
156 pub fn signal(self, value: i32) -> oneshot::Receiver<()> {
157 let _ = self.tx.send(value);
158 self.completion_rx
159 }
160}
161
162/// Employs [Signaler] to coordinate the graceful shutdown of many tasks.
163pub enum Stopper {
164 /// The stopper is running and stop has not been called yet.
165 Running {
166 // We must use an Option here because we need to move the signaler out of the
167 // Running state when stopping.
168 signaler: Option<Signaler>,
169 signal: Signal,
170 },
171 /// Stop has been called and completion is pending or resolved.
172 Stopped {
173 stop_value: i32,
174 completion: Shared<oneshot::Receiver<()>>,
175 },
176}
177
178impl Stopper {
179 /// Create a new stopper in running mode.
180 pub fn new() -> Self {
181 let (signaler, signal) = Signaler::new();
182 Self::Running {
183 signaler: Some(signaler),
184 signal,
185 }
186 }
187
188 /// Get the signal for runtime users to await.
189 pub fn stopped(&self) -> Signal {
190 match self {
191 Self::Running { signal, .. } => signal.clone(),
192 Self::Stopped { stop_value, .. } => Signal::Closed(*stop_value),
193 }
194 }
195
196 /// Initiate shutdown returning a completion future.
197 /// Always returns a completion future, even if stop was already called.
198 /// If stop was already called, returns the same shared completion future
199 /// that will resolve immediately if already completed.
200 pub fn stop(&mut self, value: i32) -> Shared<oneshot::Receiver<()>> {
201 match self {
202 Self::Running { signaler, .. } => {
203 // Take the signaler out of the Option (it is always populated in Running)
204 let sig = signaler.take().unwrap();
205
206 // Signal shutdown and get the completion receiver
207 let completion_rx = sig.signal(value);
208 let shared_completion = completion_rx.shared();
209
210 // Transition to Stopped state
211 *self = Self::Stopped {
212 stop_value: value,
213 completion: shared_completion.clone(),
214 };
215
216 shared_completion
217 }
218 Self::Stopped { completion, .. } => {
219 // Ignore the stop value (always return the first used)
220
221 // Return existing completion (may already be resolved)
222 completion.clone()
223 }
224 }
225 }
226}
227
228impl Default for Stopper {
229 fn default() -> Self {
230 Self::new()
231 }
232}