commonware_runtime/utils/signal.rs
1//! Mechanisms for coordinating actions across many tasks.
2
3use futures::{channel::oneshot, future::Shared, FutureExt};
4use std::{
5 future::Future,
6 pin::Pin,
7 sync::Arc,
8 task::{Context, Poll},
9};
10
11/// A one-time broadcast that can be awaited by many tasks. It is often used for
12/// coordinating shutdown across many tasks.
13///
14/// Each [Signal] tracks its lifecycle to enable proper shutdown coordination.
15/// To minimize overhead, it is recommended to wait on a reference to it
16/// (i.e. `&mut signal`) in loops rather than creating multiple `Signal`s.
17///
18/// # Example
19///
20/// ## Basic Usage
21///
22/// ```rust
23/// use commonware_runtime::{Spawner, Runner, deterministic, signal::Signaler};
24///
25/// let executor = deterministic::Runner::default();
26/// executor.start(|context| async move {
27/// // Setup signaler and get future
28/// let (signaler, signal) = Signaler::new();
29///
30/// // Signal shutdown
31/// signaler.signal(2);
32///
33/// // Wait for shutdown in task
34/// let sig = signal.await.unwrap();
35/// println!("Received signal: {}", sig);
36/// });
37/// ```
38///
39/// ## Advanced Usage
40///
41/// While `Futures::Shared` is efficient, there is still meaningful overhead
42/// to cloning it (i.e. in each iteration of a loop). To avoid
43/// a performance regression from introducing `Signaler`, it is recommended
44/// to wait on a reference to `Signal` (i.e. `&mut signal`).
45///
46/// ```rust
47/// use commonware_macros::select;
48/// use commonware_runtime::{Clock, Spawner, Runner, deterministic, Metrics, signal::Signaler};
49/// use futures::channel::oneshot;
50/// use std::time::Duration;
51///
52/// let executor = deterministic::Runner::default();
53/// executor.start(|context| async move {
54/// // Setup signaler and get future
55/// let (signaler, mut signal) = Signaler::new();
56///
57/// // Loop on the signal until resolved
58/// let (tx, rx) = oneshot::channel();
59/// context.with_label("waiter").spawn(|context| async move {
60/// loop {
61/// // Wait for signal or sleep
62/// select! {
63/// sig = &mut signal => {
64/// println!("Received signal: {}", sig.unwrap());
65/// break;
66/// },
67/// _ = context.sleep(Duration::from_secs(1)) => {},
68/// };
69/// }
70/// let _ = tx.send(());
71/// });
72///
73/// // Send signal
74/// signaler.signal(9);
75///
76/// // Wait for task
77/// rx.await.expect("shutdown signaled");
78/// });
79/// ```
80#[derive(Clone)]
81pub enum Signal {
82 /// A signal that will resolve when the signaler marks it as resolved.
83 Open(Receiver),
84 /// A signal that has been resolved with a known value.
85 Closed(i32),
86}
87
88impl Future for Signal {
89 type Output = Result<i32, oneshot::Canceled>;
90
91 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
92 match &mut *self {
93 Signal::Open(live) => Pin::new(&mut live.inner).poll(cx),
94 Signal::Closed(value) => Poll::Ready(Ok(*value)),
95 }
96 }
97}
98
99/// An open [Signal] with completion tracking.
100#[derive(Clone)]
101pub struct Receiver {
102 inner: Shared<oneshot::Receiver<i32>>,
103 _guard: Arc<Guard>,
104}
105
106/// A guard used to coordinate the resolution of a [Signal].
107struct Guard {
108 tx: Option<oneshot::Sender<()>>,
109}
110
111impl Guard {
112 /// Create a new [Guard] that will resolve when the [Signaler] marks it as resolved.
113 pub fn new(completion_tx: oneshot::Sender<()>) -> Self {
114 Self {
115 tx: Some(completion_tx),
116 }
117 }
118}
119
120impl Drop for Guard {
121 fn drop(&mut self) {
122 if let Some(tx) = self.tx.take() {
123 let _ = tx.send(());
124 }
125 }
126}
127
128/// Coordinates a one-time signal across many tasks.
129pub struct Signaler {
130 tx: oneshot::Sender<i32>,
131 completion_rx: oneshot::Receiver<()>,
132}
133
134impl Signaler {
135 /// Create a new [Signaler].
136 ///
137 /// Returns a [Signaler] and a [Signal] that will resolve when [Signaler::signal] is called.
138 pub fn new() -> (Self, Signal) {
139 let (tx, rx) = oneshot::channel();
140 let (completion_tx, completion_rx) = oneshot::channel();
141
142 let signaler = Self { tx, completion_rx };
143 let signal = Signal::Open(Receiver {
144 inner: rx.shared(),
145 _guard: Arc::new(Guard::new(completion_tx)),
146 });
147
148 (signaler, signal)
149 }
150
151 /// Resolve all [Signal]s associated with this [Signaler].
152 pub fn signal(self, value: i32) -> oneshot::Receiver<()> {
153 let _ = self.tx.send(value);
154 self.completion_rx
155 }
156}
157
158/// Employs [Signaler] to coordinate the graceful shutdown of many tasks.
159pub enum Stopper {
160 /// The stopper is running and stop has not been called yet.
161 Running {
162 // We must use an Option here because we need to move the signaler out of the
163 // Running state when stopping.
164 signaler: Option<Signaler>,
165 signal: Signal,
166 },
167 /// Stop has been called and completion is pending or resolved.
168 Stopped {
169 stop_value: i32,
170 completion: Shared<oneshot::Receiver<()>>,
171 },
172}
173
174impl Stopper {
175 /// Create a new stopper in running mode.
176 pub fn new() -> Self {
177 let (signaler, signal) = Signaler::new();
178 Self::Running {
179 signaler: Some(signaler),
180 signal,
181 }
182 }
183
184 /// Get the signal for runtime users to await.
185 pub fn stopped(&self) -> Signal {
186 match self {
187 Self::Running { signal, .. } => signal.clone(),
188 Self::Stopped { stop_value, .. } => Signal::Closed(*stop_value),
189 }
190 }
191
192 /// Initiate shutdown returning a completion future.
193 /// Always returns a completion future, even if stop was already called.
194 /// If stop was already called, returns the same shared completion future
195 /// that will resolve immediately if already completed.
196 pub fn stop(&mut self, value: i32) -> Shared<oneshot::Receiver<()>> {
197 match self {
198 Self::Running { signaler, .. } => {
199 // Take the signaler out of the Option (it is always populated in Running)
200 let sig = signaler.take().unwrap();
201
202 // Signal shutdown and get the completion receiver
203 let completion_rx = sig.signal(value);
204 let shared_completion = completion_rx.shared();
205
206 // Transition to Stopped state
207 *self = Self::Stopped {
208 stop_value: value,
209 completion: shared_completion.clone(),
210 };
211
212 shared_completion
213 }
214 Self::Stopped { completion, .. } => {
215 // Ignore the stop value (always return the first used)
216
217 // Return existing completion (may already be resolved)
218 completion.clone()
219 }
220 }
221 }
222}
223
224impl Default for Stopper {
225 fn default() -> Self {
226 Self::new()
227 }
228}