Skip to main content

asupersync/signal/
shutdown.rs

1//! Coordinated shutdown controller using sync primitives.
2//!
3//! Provides a centralized mechanism for initiating and propagating shutdown
4//! signals throughout an application. Uses our sync primitives (Notify) to
5//! coordinate without external dependencies.
6
7use std::sync::Arc;
8use std::sync::atomic::{AtomicBool, Ordering};
9
10use crate::sync::Notify;
11
12/// Internal state shared between controller and receivers.
13#[derive(Debug)]
14struct ShutdownState {
15    /// Tracks whether shutdown has been initiated.
16    initiated: AtomicBool,
17    /// Notifier for broadcast notifications.
18    notify: Notify,
19}
20
21/// Controller for coordinated graceful shutdown.
22///
23/// This provides a clean way to propagate shutdown signals through an application.
24/// Multiple receivers can subscribe to receive shutdown notifications.
25///
26/// # Example
27///
28/// ```ignore
29/// use asupersync::signal::ShutdownController;
30///
31/// async fn run_server() {
32///     let controller = ShutdownController::new();
33///     let mut receiver = controller.subscribe();
34///
35///     // Spawn a task that will receive the shutdown signal
36///     let handle = async move {
37///         receiver.wait().await;
38///         println!("Shutting down...");
39///     };
40///
41///     // Later, initiate shutdown
42///     controller.shutdown();
43/// }
44/// ```
45#[derive(Debug)]
46pub struct ShutdownController {
47    /// Shared state between controller and receivers.
48    state: Arc<ShutdownState>,
49}
50
51impl ShutdownController {
52    /// Creates a new shutdown controller.
53    #[must_use]
54    pub fn new() -> Self {
55        Self {
56            state: Arc::new(ShutdownState {
57                initiated: AtomicBool::new(false),
58                notify: Notify::new(),
59            }),
60        }
61    }
62
63    /// Gets a handle for receiving shutdown notifications.
64    ///
65    /// Multiple receivers can be created and they will all be notified
66    /// when shutdown is initiated.
67    #[must_use]
68    pub fn subscribe(&self) -> ShutdownReceiver {
69        ShutdownReceiver {
70            state: Arc::clone(&self.state),
71        }
72    }
73
74    /// Initiates shutdown.
75    ///
76    /// This wakes all receivers that are currently waiting for shutdown.
77    /// The shutdown state is persistent - once initiated, it cannot be reset.
78    pub fn shutdown(&self) {
79        // Only initiate once.
80        if self
81            .state
82            .initiated
83            .compare_exchange(false, true, Ordering::Release, Ordering::Relaxed)
84            .is_ok()
85        {
86            // Wake all waiters.
87            self.state.notify.notify_waiters();
88        }
89    }
90
91    /// Checks if shutdown has been initiated.
92    #[must_use]
93    pub fn is_shutting_down(&self) -> bool {
94        self.state.initiated.load(Ordering::Acquire)
95    }
96
97    /// Spawns a background task to listen for shutdown signals.
98    ///
99    /// This is a convenience method that sets up signal handling
100    /// (when available) to automatically trigger shutdown.
101    ///
102    /// # Note
103    ///
104    /// In Phase 0, signal handling is not available, so this method
105    /// only sets up the controller for manual shutdown calls.
106    pub fn listen_for_signals(self: &Arc<Self>) {
107        // Phase 0: Signal handling not available.
108        // In Phase 1, this will:
109        // - Register SIGTERM handler
110        // - Register SIGINT/Ctrl+C handler
111        // - Call self.shutdown() when signal received
112        //
113        // For now, this is a no-op. Applications should call
114        // shutdown() manually or use their own signal handling.
115    }
116}
117
118impl Default for ShutdownController {
119    fn default() -> Self {
120        Self::new()
121    }
122}
123
124impl Clone for ShutdownController {
125    fn clone(&self) -> Self {
126        Self {
127            state: Arc::clone(&self.state),
128        }
129    }
130}
131
132/// Receiver for shutdown notifications.
133///
134/// This is a handle that can wait for shutdown to be initiated.
135/// Multiple receivers can be created from a single controller.
136#[derive(Debug)]
137pub struct ShutdownReceiver {
138    /// Shared state with the controller.
139    state: Arc<ShutdownState>,
140}
141
142impl ShutdownReceiver {
143    /// Waits for shutdown to be initiated.
144    ///
145    /// This method returns immediately if shutdown has already been initiated.
146    /// Otherwise, it waits until the controller's `shutdown()` method is called.
147    pub async fn wait(&mut self) {
148        // Create the notification future first to avoid missing a shutdown
149        // that happens between the check and registration.
150        let notified = self.state.notify.notified();
151
152        // Check if already shut down.
153        if self.is_shutting_down() {
154            return;
155        }
156
157        // Wait for notification.
158        notified.await;
159    }
160
161    /// Checks if shutdown has been initiated.
162    #[must_use]
163    pub fn is_shutting_down(&self) -> bool {
164        self.state.initiated.load(Ordering::Acquire)
165    }
166}
167
168impl Clone for ShutdownReceiver {
169    fn clone(&self) -> Self {
170        Self {
171            state: Arc::clone(&self.state),
172        }
173    }
174}
175
176#[cfg(test)]
177mod tests {
178    use super::*;
179    use std::sync::Arc;
180    use std::task::{Context, Poll, Wake, Waker};
181    use std::thread;
182    use std::time::Duration;
183
184    struct NoopWaker;
185
186    impl Wake for NoopWaker {
187        fn wake(self: Arc<Self>) {}
188        fn wake_by_ref(self: &Arc<Self>) {}
189    }
190
191    fn noop_waker() -> Waker {
192        Arc::new(NoopWaker).into()
193    }
194
195    fn poll_once<F: std::future::Future + Unpin>(fut: &mut F) -> Poll<F::Output> {
196        let waker = noop_waker();
197        let mut cx = Context::from_waker(&waker);
198        std::pin::Pin::new(fut).poll(&mut cx)
199    }
200
201    fn init_test(name: &str) {
202        crate::test_utils::init_test_logging();
203        crate::test_phase!(name);
204    }
205
206    #[test]
207    fn shutdown_controller_initial_state() {
208        init_test("shutdown_controller_initial_state");
209        let controller = ShutdownController::new();
210        let shutting_down = controller.is_shutting_down();
211        crate::assert_with_log!(
212            !shutting_down,
213            "controller not shutting down",
214            false,
215            shutting_down
216        );
217
218        let receiver = controller.subscribe();
219        let rx_shutdown = receiver.is_shutting_down();
220        crate::assert_with_log!(
221            !rx_shutdown,
222            "receiver not shutting down",
223            false,
224            rx_shutdown
225        );
226        crate::test_complete!("shutdown_controller_initial_state");
227    }
228
229    #[test]
230    fn shutdown_controller_initiates() {
231        init_test("shutdown_controller_initiates");
232        let controller = ShutdownController::new();
233        let receiver = controller.subscribe();
234
235        controller.shutdown();
236
237        let ctrl_shutdown = controller.is_shutting_down();
238        crate::assert_with_log!(
239            ctrl_shutdown,
240            "controller shutting down",
241            true,
242            ctrl_shutdown
243        );
244        let rx_shutdown = receiver.is_shutting_down();
245        crate::assert_with_log!(rx_shutdown, "receiver shutting down", true, rx_shutdown);
246        crate::test_complete!("shutdown_controller_initiates");
247    }
248
249    #[test]
250    fn shutdown_only_once() {
251        init_test("shutdown_only_once");
252        let controller = ShutdownController::new();
253
254        // Multiple shutdown calls should be idempotent.
255        controller.shutdown();
256        controller.shutdown();
257        controller.shutdown();
258
259        let shutting_down = controller.is_shutting_down();
260        crate::assert_with_log!(shutting_down, "shutting down", true, shutting_down);
261        crate::test_complete!("shutdown_only_once");
262    }
263
264    #[test]
265    fn multiple_receivers() {
266        init_test("multiple_receivers");
267        let controller = ShutdownController::new();
268        let rx1 = controller.subscribe();
269        let rx2 = controller.subscribe();
270        let rx3 = controller.subscribe();
271
272        let rx1_shutdown = rx1.is_shutting_down();
273        crate::assert_with_log!(!rx1_shutdown, "rx1 not shutting down", false, rx1_shutdown);
274        let rx2_shutdown = rx2.is_shutting_down();
275        crate::assert_with_log!(!rx2_shutdown, "rx2 not shutting down", false, rx2_shutdown);
276        let rx3_shutdown = rx3.is_shutting_down();
277        crate::assert_with_log!(!rx3_shutdown, "rx3 not shutting down", false, rx3_shutdown);
278
279        controller.shutdown();
280
281        let rx1_shutdown = rx1.is_shutting_down();
282        crate::assert_with_log!(rx1_shutdown, "rx1 shutting down", true, rx1_shutdown);
283        let rx2_shutdown = rx2.is_shutting_down();
284        crate::assert_with_log!(rx2_shutdown, "rx2 shutting down", true, rx2_shutdown);
285        let rx3_shutdown = rx3.is_shutting_down();
286        crate::assert_with_log!(rx3_shutdown, "rx3 shutting down", true, rx3_shutdown);
287        crate::test_complete!("multiple_receivers");
288    }
289
290    #[test]
291    fn receiver_wait_after_shutdown() {
292        init_test("receiver_wait_after_shutdown");
293        let controller = ShutdownController::new();
294        let mut receiver = controller.subscribe();
295
296        controller.shutdown();
297
298        // Wait should return immediately.
299        let mut fut = Box::pin(receiver.wait());
300        let ready = poll_once(&mut fut).is_ready();
301        crate::assert_with_log!(ready, "wait ready", true, ready);
302        crate::test_complete!("receiver_wait_after_shutdown");
303    }
304
305    #[test]
306    fn receiver_wait_before_shutdown() {
307        init_test("receiver_wait_before_shutdown");
308        let controller = Arc::new(ShutdownController::new());
309        let controller2 = Arc::clone(&controller);
310        let mut receiver = controller.subscribe();
311
312        let handle = thread::spawn(move || {
313            thread::sleep(Duration::from_millis(50));
314            controller2.shutdown();
315        });
316
317        // First poll should be pending.
318        let mut fut = Box::pin(receiver.wait());
319        let pending = poll_once(&mut fut).is_pending();
320        crate::assert_with_log!(pending, "wait pending", true, pending);
321
322        // Wait for shutdown.
323        handle.join().expect("thread panicked");
324
325        // Now should be ready.
326        let ready = poll_once(&mut fut).is_ready();
327        crate::assert_with_log!(ready, "wait ready", true, ready);
328        crate::test_complete!("receiver_wait_before_shutdown");
329    }
330
331    #[test]
332    fn receiver_clone() {
333        init_test("receiver_clone");
334        let controller = ShutdownController::new();
335        let rx1 = controller.subscribe();
336        let rx2 = rx1.clone();
337
338        let rx1_shutdown = rx1.is_shutting_down();
339        crate::assert_with_log!(!rx1_shutdown, "rx1 not shutting down", false, rx1_shutdown);
340        let rx2_shutdown = rx2.is_shutting_down();
341        crate::assert_with_log!(!rx2_shutdown, "rx2 not shutting down", false, rx2_shutdown);
342
343        controller.shutdown();
344
345        let rx1_shutdown = rx1.is_shutting_down();
346        crate::assert_with_log!(rx1_shutdown, "rx1 shutting down", true, rx1_shutdown);
347        let rx2_shutdown = rx2.is_shutting_down();
348        crate::assert_with_log!(rx2_shutdown, "rx2 shutting down", true, rx2_shutdown);
349        crate::test_complete!("receiver_clone");
350    }
351
352    #[test]
353    fn receiver_clone_preserves_state() {
354        init_test("receiver_clone_preserves_state");
355        let controller = ShutdownController::new();
356        controller.shutdown();
357
358        let rx1 = controller.subscribe();
359        let rx2 = rx1.clone();
360
361        // Both should see shutdown already initiated.
362        let rx1_shutdown = rx1.is_shutting_down();
363        crate::assert_with_log!(rx1_shutdown, "rx1 shutting down", true, rx1_shutdown);
364        let rx2_shutdown = rx2.is_shutting_down();
365        crate::assert_with_log!(rx2_shutdown, "rx2 shutting down", true, rx2_shutdown);
366        crate::test_complete!("receiver_clone_preserves_state");
367    }
368
369    #[test]
370    fn controller_clone() {
371        init_test("controller_clone");
372        let controller1 = ShutdownController::new();
373        let controller2 = controller1.clone();
374        let receiver = controller1.subscribe();
375
376        // Shutdown via clone.
377        controller2.shutdown();
378
379        // All should see it.
380        let ctrl1 = controller1.is_shutting_down();
381        crate::assert_with_log!(ctrl1, "controller1 shutting down", true, ctrl1);
382        let ctrl2 = controller2.is_shutting_down();
383        crate::assert_with_log!(ctrl2, "controller2 shutting down", true, ctrl2);
384        let rx_shutdown = receiver.is_shutting_down();
385        crate::assert_with_log!(rx_shutdown, "receiver shutting down", true, rx_shutdown);
386        crate::test_complete!("controller_clone");
387    }
388}