1use std::sync::Arc;
8use std::sync::atomic::{AtomicBool, Ordering};
9
10use crate::sync::Notify;
11
12#[derive(Debug)]
14struct ShutdownState {
15 initiated: AtomicBool,
17 notify: Notify,
19}
20
21#[derive(Debug)]
46pub struct ShutdownController {
47 state: Arc<ShutdownState>,
49}
50
51impl ShutdownController {
52 #[must_use]
54 pub fn new() -> Self {
55 Self {
56 state: Arc::new(ShutdownState {
57 initiated: AtomicBool::new(false),
58 notify: Notify::new(),
59 }),
60 }
61 }
62
63 #[must_use]
68 pub fn subscribe(&self) -> ShutdownReceiver {
69 ShutdownReceiver {
70 state: Arc::clone(&self.state),
71 }
72 }
73
74 pub fn shutdown(&self) {
79 if self
81 .state
82 .initiated
83 .compare_exchange(false, true, Ordering::Release, Ordering::Relaxed)
84 .is_ok()
85 {
86 self.state.notify.notify_waiters();
88 }
89 }
90
91 #[must_use]
93 pub fn is_shutting_down(&self) -> bool {
94 self.state.initiated.load(Ordering::Acquire)
95 }
96
97 pub fn listen_for_signals(self: &Arc<Self>) {
107 }
116}
117
118impl Default for ShutdownController {
119 fn default() -> Self {
120 Self::new()
121 }
122}
123
124impl Clone for ShutdownController {
125 fn clone(&self) -> Self {
126 Self {
127 state: Arc::clone(&self.state),
128 }
129 }
130}
131
132#[derive(Debug)]
137pub struct ShutdownReceiver {
138 state: Arc<ShutdownState>,
140}
141
142impl ShutdownReceiver {
143 pub async fn wait(&mut self) {
148 let notified = self.state.notify.notified();
151
152 if self.is_shutting_down() {
154 return;
155 }
156
157 notified.await;
159 }
160
161 #[must_use]
163 pub fn is_shutting_down(&self) -> bool {
164 self.state.initiated.load(Ordering::Acquire)
165 }
166}
167
168impl Clone for ShutdownReceiver {
169 fn clone(&self) -> Self {
170 Self {
171 state: Arc::clone(&self.state),
172 }
173 }
174}
175
176#[cfg(test)]
177mod tests {
178 use super::*;
179 use std::sync::Arc;
180 use std::task::{Context, Poll, Wake, Waker};
181 use std::thread;
182 use std::time::Duration;
183
184 struct NoopWaker;
185
186 impl Wake for NoopWaker {
187 fn wake(self: Arc<Self>) {}
188 fn wake_by_ref(self: &Arc<Self>) {}
189 }
190
191 fn noop_waker() -> Waker {
192 Arc::new(NoopWaker).into()
193 }
194
195 fn poll_once<F: std::future::Future + Unpin>(fut: &mut F) -> Poll<F::Output> {
196 let waker = noop_waker();
197 let mut cx = Context::from_waker(&waker);
198 std::pin::Pin::new(fut).poll(&mut cx)
199 }
200
201 fn init_test(name: &str) {
202 crate::test_utils::init_test_logging();
203 crate::test_phase!(name);
204 }
205
206 #[test]
207 fn shutdown_controller_initial_state() {
208 init_test("shutdown_controller_initial_state");
209 let controller = ShutdownController::new();
210 let shutting_down = controller.is_shutting_down();
211 crate::assert_with_log!(
212 !shutting_down,
213 "controller not shutting down",
214 false,
215 shutting_down
216 );
217
218 let receiver = controller.subscribe();
219 let rx_shutdown = receiver.is_shutting_down();
220 crate::assert_with_log!(
221 !rx_shutdown,
222 "receiver not shutting down",
223 false,
224 rx_shutdown
225 );
226 crate::test_complete!("shutdown_controller_initial_state");
227 }
228
229 #[test]
230 fn shutdown_controller_initiates() {
231 init_test("shutdown_controller_initiates");
232 let controller = ShutdownController::new();
233 let receiver = controller.subscribe();
234
235 controller.shutdown();
236
237 let ctrl_shutdown = controller.is_shutting_down();
238 crate::assert_with_log!(
239 ctrl_shutdown,
240 "controller shutting down",
241 true,
242 ctrl_shutdown
243 );
244 let rx_shutdown = receiver.is_shutting_down();
245 crate::assert_with_log!(rx_shutdown, "receiver shutting down", true, rx_shutdown);
246 crate::test_complete!("shutdown_controller_initiates");
247 }
248
249 #[test]
250 fn shutdown_only_once() {
251 init_test("shutdown_only_once");
252 let controller = ShutdownController::new();
253
254 controller.shutdown();
256 controller.shutdown();
257 controller.shutdown();
258
259 let shutting_down = controller.is_shutting_down();
260 crate::assert_with_log!(shutting_down, "shutting down", true, shutting_down);
261 crate::test_complete!("shutdown_only_once");
262 }
263
264 #[test]
265 fn multiple_receivers() {
266 init_test("multiple_receivers");
267 let controller = ShutdownController::new();
268 let rx1 = controller.subscribe();
269 let rx2 = controller.subscribe();
270 let rx3 = controller.subscribe();
271
272 let rx1_shutdown = rx1.is_shutting_down();
273 crate::assert_with_log!(!rx1_shutdown, "rx1 not shutting down", false, rx1_shutdown);
274 let rx2_shutdown = rx2.is_shutting_down();
275 crate::assert_with_log!(!rx2_shutdown, "rx2 not shutting down", false, rx2_shutdown);
276 let rx3_shutdown = rx3.is_shutting_down();
277 crate::assert_with_log!(!rx3_shutdown, "rx3 not shutting down", false, rx3_shutdown);
278
279 controller.shutdown();
280
281 let rx1_shutdown = rx1.is_shutting_down();
282 crate::assert_with_log!(rx1_shutdown, "rx1 shutting down", true, rx1_shutdown);
283 let rx2_shutdown = rx2.is_shutting_down();
284 crate::assert_with_log!(rx2_shutdown, "rx2 shutting down", true, rx2_shutdown);
285 let rx3_shutdown = rx3.is_shutting_down();
286 crate::assert_with_log!(rx3_shutdown, "rx3 shutting down", true, rx3_shutdown);
287 crate::test_complete!("multiple_receivers");
288 }
289
290 #[test]
291 fn receiver_wait_after_shutdown() {
292 init_test("receiver_wait_after_shutdown");
293 let controller = ShutdownController::new();
294 let mut receiver = controller.subscribe();
295
296 controller.shutdown();
297
298 let mut fut = Box::pin(receiver.wait());
300 let ready = poll_once(&mut fut).is_ready();
301 crate::assert_with_log!(ready, "wait ready", true, ready);
302 crate::test_complete!("receiver_wait_after_shutdown");
303 }
304
305 #[test]
306 fn receiver_wait_before_shutdown() {
307 init_test("receiver_wait_before_shutdown");
308 let controller = Arc::new(ShutdownController::new());
309 let controller2 = Arc::clone(&controller);
310 let mut receiver = controller.subscribe();
311
312 let handle = thread::spawn(move || {
313 thread::sleep(Duration::from_millis(50));
314 controller2.shutdown();
315 });
316
317 let mut fut = Box::pin(receiver.wait());
319 let pending = poll_once(&mut fut).is_pending();
320 crate::assert_with_log!(pending, "wait pending", true, pending);
321
322 handle.join().expect("thread panicked");
324
325 let ready = poll_once(&mut fut).is_ready();
327 crate::assert_with_log!(ready, "wait ready", true, ready);
328 crate::test_complete!("receiver_wait_before_shutdown");
329 }
330
331 #[test]
332 fn receiver_clone() {
333 init_test("receiver_clone");
334 let controller = ShutdownController::new();
335 let rx1 = controller.subscribe();
336 let rx2 = rx1.clone();
337
338 let rx1_shutdown = rx1.is_shutting_down();
339 crate::assert_with_log!(!rx1_shutdown, "rx1 not shutting down", false, rx1_shutdown);
340 let rx2_shutdown = rx2.is_shutting_down();
341 crate::assert_with_log!(!rx2_shutdown, "rx2 not shutting down", false, rx2_shutdown);
342
343 controller.shutdown();
344
345 let rx1_shutdown = rx1.is_shutting_down();
346 crate::assert_with_log!(rx1_shutdown, "rx1 shutting down", true, rx1_shutdown);
347 let rx2_shutdown = rx2.is_shutting_down();
348 crate::assert_with_log!(rx2_shutdown, "rx2 shutting down", true, rx2_shutdown);
349 crate::test_complete!("receiver_clone");
350 }
351
352 #[test]
353 fn receiver_clone_preserves_state() {
354 init_test("receiver_clone_preserves_state");
355 let controller = ShutdownController::new();
356 controller.shutdown();
357
358 let rx1 = controller.subscribe();
359 let rx2 = rx1.clone();
360
361 let rx1_shutdown = rx1.is_shutting_down();
363 crate::assert_with_log!(rx1_shutdown, "rx1 shutting down", true, rx1_shutdown);
364 let rx2_shutdown = rx2.is_shutting_down();
365 crate::assert_with_log!(rx2_shutdown, "rx2 shutting down", true, rx2_shutdown);
366 crate::test_complete!("receiver_clone_preserves_state");
367 }
368
369 #[test]
370 fn controller_clone() {
371 init_test("controller_clone");
372 let controller1 = ShutdownController::new();
373 let controller2 = controller1.clone();
374 let receiver = controller1.subscribe();
375
376 controller2.shutdown();
378
379 let ctrl1 = controller1.is_shutting_down();
381 crate::assert_with_log!(ctrl1, "controller1 shutting down", true, ctrl1);
382 let ctrl2 = controller2.is_shutting_down();
383 crate::assert_with_log!(ctrl2, "controller2 shutting down", true, ctrl2);
384 let rx_shutdown = receiver.is_shutting_down();
385 crate::assert_with_log!(rx_shutdown, "receiver shutting down", true, rx_shutdown);
386 crate::test_complete!("controller_clone");
387 }
388}