Skip to main content

ringkernel_core/
shutdown.rs

1//! Graceful shutdown and signal handling for persistent GPU kernels.
2//!
3//! Persistent GPU kernels run indefinitely and must be shut down cleanly to
4//! avoid GPU resource leaks, corrupted state, or orphaned device memory.
5//! This module provides infrastructure for capturing OS signals (SIGTERM,
6//! SIGINT / Ctrl-C) and propagating a shutdown request to all interested
7//! parties via a lightweight, clone-able [`ShutdownSignal`].
8//!
9//! # Overview
10//!
11//! - [`GracefulShutdown`] -- registers signal handlers and owns the shutdown
12//!   lifecycle (signal capture, grace period, force termination).
13//! - [`ShutdownSignal`] -- a cheap, clone-able handle that kernels and
14//!   background tasks can poll or `.await` to learn about a pending shutdown.
15//! - [`ShutdownGuard`] -- returned by [`GracefulShutdown::install`]; dropping
16//!   the guard cancels the signal listener.
17//!
18//! # Example
19//!
20//! ```rust,ignore
21//! use ringkernel_core::shutdown::GracefulShutdown;
22//! use std::time::Duration;
23//!
24//! #[tokio::main]
25//! async fn main() {
26//!     let shutdown = GracefulShutdown::new(Duration::from_secs(5));
27//!     let signal = shutdown.signal();
28//!
29//!     // Hand `signal` clones to kernel loops, background tasks, etc.
30//!     let guard = shutdown.install();
31//!
32//!     // In a kernel loop:
33//!     loop {
34//!         if signal.is_shutdown_requested() {
35//!             break;
36//!         }
37//!         // ... do work ...
38//!     }
39//!
40//!     // Or await the signal:
41//!     // signal.wait().await;
42//!
43//!     guard.wait().await;
44//! }
45//! ```
46
47use std::sync::atomic::{AtomicBool, Ordering};
48use std::sync::Arc;
49use std::time::Duration;
50
51use tokio::sync::watch;
52use tokio::task::JoinHandle;
53
54// ============================================================================
55// ShutdownSignal
56// ============================================================================
57
58/// A lightweight, clone-able handle for checking or awaiting shutdown.
59///
60/// Multiple kernels and background tasks can each hold a clone. Checking
61/// [`is_shutdown_requested`](ShutdownSignal::is_shutdown_requested) is a
62/// single atomic load and therefore safe to call on hot paths.
63#[derive(Clone)]
64pub struct ShutdownSignal {
65    /// Atomic flag -- fast, non-blocking check.
66    requested: Arc<AtomicBool>,
67    /// Watch receiver -- enables `.await`-based notification.
68    rx: watch::Receiver<bool>,
69}
70
71impl ShutdownSignal {
72    /// Returns `true` once shutdown has been requested.
73    ///
74    /// This is a single atomic load and can be called on hot paths without
75    /// any overhead.
76    pub fn is_shutdown_requested(&self) -> bool {
77        self.requested.load(Ordering::SeqCst)
78    }
79
80    /// Wait until a shutdown signal is received.
81    ///
82    /// This future completes as soon as any signal (SIGTERM, SIGINT, or a
83    /// manual trigger) fires. It is cancel-safe.
84    pub async fn wait(&self) {
85        // Fast path: already triggered.
86        if self.is_shutdown_requested() {
87            return;
88        }
89
90        let mut rx = self.rx.clone();
91        // `changed()` returns when the sender writes a new value.
92        // Ignore the error case (sender dropped) -- treat it as shutdown.
93        loop {
94            if *rx.borrow_and_update() {
95                return;
96            }
97            if rx.changed().await.is_err() {
98                // Sender dropped -- treat as shutdown.
99                return;
100            }
101        }
102    }
103}
104
105impl std::fmt::Debug for ShutdownSignal {
106    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107        f.debug_struct("ShutdownSignal")
108            .field("requested", &self.is_shutdown_requested())
109            .finish()
110    }
111}
112
113// ============================================================================
114// GracefulShutdown
115// ============================================================================
116
117/// Builder and coordinator for graceful shutdown of persistent GPU kernels.
118///
119/// Captures SIGTERM and SIGINT (Ctrl-C), notifies all holders of
120/// [`ShutdownSignal`], waits for a configurable grace period, then
121/// force-terminates.
122pub struct GracefulShutdown {
123    /// Shared flag.
124    requested: Arc<AtomicBool>,
125    /// Watch channel sender (triggers `ShutdownSignal::wait()`).
126    tx: watch::Sender<bool>,
127    /// Watch channel receiver (cloned into each `ShutdownSignal`).
128    rx: watch::Receiver<bool>,
129    /// How long to wait after the signal before force termination.
130    grace_period: Duration,
131}
132
133impl GracefulShutdown {
134    /// Create a new shutdown coordinator with the given grace period.
135    ///
136    /// The grace period determines how long [`ShutdownGuard::wait`] will
137    /// allow after the signal fires before it returns (allowing the caller
138    /// to force-terminate remaining work).
139    pub fn new(grace_period: Duration) -> Self {
140        let (tx, rx) = watch::channel(false);
141        Self {
142            requested: Arc::new(AtomicBool::new(false)),
143            tx,
144            rx,
145            grace_period,
146        }
147    }
148
149    /// Create a shutdown coordinator with the default grace period (5 seconds).
150    pub fn with_default_grace_period() -> Self {
151        Self::new(Duration::from_secs(5))
152    }
153
154    /// Get the configured grace period.
155    pub fn grace_period(&self) -> Duration {
156        self.grace_period
157    }
158
159    /// Check if shutdown has already been requested.
160    pub fn is_shutdown_requested(&self) -> bool {
161        self.requested.load(Ordering::SeqCst)
162    }
163
164    /// Obtain a [`ShutdownSignal`] that can be cloned and distributed to
165    /// kernels and background tasks.
166    pub fn signal(&self) -> ShutdownSignal {
167        ShutdownSignal {
168            requested: Arc::clone(&self.requested),
169            rx: self.rx.clone(),
170        }
171    }
172
173    /// Manually trigger shutdown (useful for programmatic shutdown or testing).
174    pub fn trigger(&self) {
175        self.requested.store(true, Ordering::SeqCst);
176        let _ = self.tx.send(true);
177    }
178
179    /// Install OS signal handlers and return a [`ShutdownGuard`].
180    ///
181    /// The guard spawns a background task that listens for SIGTERM and
182    /// SIGINT (Ctrl-C). When a signal is received the shutdown flag is set
183    /// and all [`ShutdownSignal`] holders are notified.
184    ///
185    /// Call [`ShutdownGuard::wait`] to block until the grace period elapses
186    /// after a signal (or until all work completes, whichever comes first).
187    pub fn install(self) -> ShutdownGuard {
188        let requested = Arc::clone(&self.requested);
189        let tx = self.tx;
190        let grace_period = self.grace_period;
191        let signal = ShutdownSignal {
192            requested: Arc::clone(&self.requested),
193            rx: self.rx.clone(),
194        };
195
196        let handle = tokio::spawn(async move {
197            // Wait for either SIGINT (Ctrl-C) or SIGTERM.
198            let sigint = tokio::signal::ctrl_c();
199
200            #[cfg(unix)]
201            {
202                let mut sigterm =
203                    tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
204                        .expect("failed to register SIGTERM handler");
205
206                tokio::select! {
207                    _ = sigint => {
208                        tracing::info!("Received SIGINT (Ctrl-C), initiating graceful shutdown");
209                    }
210                    _ = sigterm.recv() => {
211                        tracing::info!("Received SIGTERM, initiating graceful shutdown");
212                    }
213                }
214            }
215
216            #[cfg(not(unix))]
217            {
218                // On non-Unix platforms only Ctrl-C is available.
219                let _ = sigint.await;
220                tracing::info!("Received Ctrl-C, initiating graceful shutdown");
221            }
222
223            // Set the shutdown flag and notify all watchers.
224            requested.store(true, Ordering::SeqCst);
225            let _ = tx.send(true);
226
227            // Allow the grace period for cleanup.
228            tracing::info!(
229                grace_period_secs = grace_period.as_secs_f64(),
230                "Shutdown signal sent; grace period started"
231            );
232            tokio::time::sleep(grace_period).await;
233            tracing::warn!("Grace period elapsed; force termination may follow");
234        });
235
236        ShutdownGuard { handle, signal }
237    }
238}
239
240impl std::fmt::Debug for GracefulShutdown {
241    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
242        f.debug_struct("GracefulShutdown")
243            .field("requested", &self.is_shutdown_requested())
244            .field("grace_period", &self.grace_period)
245            .finish()
246    }
247}
248
249// ============================================================================
250// ShutdownGuard
251// ============================================================================
252
253/// Guard returned by [`GracefulShutdown::install`].
254///
255/// Holds the background signal-listening task. Call [`wait`](Self::wait) to
256/// block until the signal fires and the grace period elapses (or until the
257/// task is aborted on drop).
258pub struct ShutdownGuard {
259    /// Background task listening for OS signals.
260    handle: JoinHandle<()>,
261    /// A signal handle for callers to check / await.
262    signal: ShutdownSignal,
263}
264
265impl ShutdownGuard {
266    /// Get a [`ShutdownSignal`] from this guard.
267    pub fn signal(&self) -> ShutdownSignal {
268        self.signal.clone()
269    }
270
271    /// Wait for the signal listener task to complete.
272    ///
273    /// This resolves after a signal has been received **and** the grace
274    /// period has elapsed. If no signal is ever received this future
275    /// never completes (you can `select!` it against your main workload).
276    pub async fn wait(self) {
277        let _ = self.handle.await;
278    }
279
280    /// Cancel the signal listener without waiting.
281    pub fn cancel(self) {
282        self.handle.abort();
283    }
284}
285
286impl std::fmt::Debug for ShutdownGuard {
287    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
288        f.debug_struct("ShutdownGuard")
289            .field("signal", &self.signal)
290            .field("finished", &self.handle.is_finished())
291            .finish()
292    }
293}
294
295// ============================================================================
296// Tests
297// ============================================================================
298
299#[cfg(test)]
300mod tests {
301    use super::*;
302
303    #[test]
304    fn test_default_grace_period() {
305        let shutdown = GracefulShutdown::with_default_grace_period();
306        assert_eq!(shutdown.grace_period(), Duration::from_secs(5));
307    }
308
309    #[test]
310    fn test_custom_grace_period() {
311        let shutdown = GracefulShutdown::new(Duration::from_secs(30));
312        assert_eq!(shutdown.grace_period(), Duration::from_secs(30));
313    }
314
315    #[test]
316    fn test_not_requested_initially() {
317        let shutdown = GracefulShutdown::new(Duration::from_secs(1));
318        assert!(!shutdown.is_shutdown_requested());
319
320        let signal = shutdown.signal();
321        assert!(!signal.is_shutdown_requested());
322    }
323
324    #[test]
325    fn test_manual_trigger() {
326        let shutdown = GracefulShutdown::new(Duration::from_secs(1));
327        let signal = shutdown.signal();
328
329        assert!(!signal.is_shutdown_requested());
330        shutdown.trigger();
331        assert!(signal.is_shutdown_requested());
332        assert!(shutdown.is_shutdown_requested());
333    }
334
335    #[test]
336    fn test_signal_clone_shares_state() {
337        let shutdown = GracefulShutdown::new(Duration::from_secs(1));
338        let s1 = shutdown.signal();
339        let s2 = s1.clone();
340        let s3 = shutdown.signal();
341
342        assert!(!s1.is_shutdown_requested());
343        assert!(!s2.is_shutdown_requested());
344        assert!(!s3.is_shutdown_requested());
345
346        shutdown.trigger();
347
348        assert!(s1.is_shutdown_requested());
349        assert!(s2.is_shutdown_requested());
350        assert!(s3.is_shutdown_requested());
351    }
352
353    #[tokio::test]
354    async fn test_signal_wait_resolves_on_trigger() {
355        let shutdown = GracefulShutdown::new(Duration::from_secs(1));
356        let signal = shutdown.signal();
357
358        // Trigger from a separate task after a short delay.
359        let trigger_handle = tokio::spawn(async move {
360            tokio::time::sleep(Duration::from_millis(50)).await;
361            shutdown.trigger();
362        });
363
364        // This should resolve once trigger fires.
365        tokio::time::timeout(Duration::from_secs(2), signal.wait())
366            .await
367            .expect("signal.wait() should have resolved within timeout");
368
369        trigger_handle.await.unwrap();
370    }
371
372    #[tokio::test]
373    async fn test_signal_wait_returns_immediately_if_already_triggered() {
374        let shutdown = GracefulShutdown::new(Duration::from_secs(1));
375        let signal = shutdown.signal();
376        shutdown.trigger();
377
378        // Should return immediately since already triggered.
379        tokio::time::timeout(Duration::from_millis(100), signal.wait())
380            .await
381            .expect("signal.wait() should return immediately when already triggered");
382    }
383
384    #[tokio::test]
385    async fn test_guard_signal() {
386        let shutdown = GracefulShutdown::new(Duration::from_millis(50));
387        let signal = shutdown.signal();
388        let guard = shutdown.install();
389
390        // Guard's signal should share state.
391        let guard_signal = guard.signal();
392        assert!(!guard_signal.is_shutdown_requested());
393        assert!(!signal.is_shutdown_requested());
394
395        // Cancel so the test doesn't block.
396        guard.cancel();
397    }
398
399    #[tokio::test]
400    async fn test_guard_cancel() {
401        let shutdown = GracefulShutdown::new(Duration::from_secs(60));
402        let guard = shutdown.install();
403
404        // Cancel should not panic and should abort the listener task.
405        guard.cancel();
406    }
407
408    #[tokio::test]
409    async fn test_multiple_signals_from_same_shutdown() {
410        let shutdown = GracefulShutdown::new(Duration::from_secs(1));
411
412        let signals: Vec<_> = (0..10).map(|_| shutdown.signal()).collect();
413
414        for s in &signals {
415            assert!(!s.is_shutdown_requested());
416        }
417
418        shutdown.trigger();
419
420        for s in &signals {
421            assert!(s.is_shutdown_requested());
422        }
423    }
424
425    #[tokio::test]
426    async fn test_wait_with_dropped_sender() {
427        // If the GracefulShutdown (which owns the sender) is dropped,
428        // ShutdownSignal::wait() should still resolve (treat as shutdown).
429        let signal = {
430            let shutdown = GracefulShutdown::new(Duration::from_secs(1));
431            shutdown.signal()
432            // shutdown dropped here
433        };
434
435        tokio::time::timeout(Duration::from_millis(100), signal.wait())
436            .await
437            .expect("signal.wait() should resolve when sender is dropped");
438    }
439
440    #[test]
441    fn test_debug_impls() {
442        let shutdown = GracefulShutdown::new(Duration::from_secs(5));
443        let debug_str = format!("{:?}", shutdown);
444        assert!(debug_str.contains("GracefulShutdown"));
445        assert!(debug_str.contains("requested"));
446
447        let signal = shutdown.signal();
448        let debug_str = format!("{:?}", signal);
449        assert!(debug_str.contains("ShutdownSignal"));
450    }
451
452    #[tokio::test]
453    async fn test_guard_debug() {
454        let shutdown = GracefulShutdown::new(Duration::from_secs(1));
455        let guard = shutdown.install();
456        let debug_str = format!("{:?}", guard);
457        assert!(debug_str.contains("ShutdownGuard"));
458        guard.cancel();
459    }
460
461    #[tokio::test]
462    async fn test_concurrent_trigger_is_safe() {
463        let shutdown = GracefulShutdown::new(Duration::from_secs(1));
464        let signal = shutdown.signal();
465
466        // Trigger from multiple tasks concurrently -- should not panic.
467        let shutdown = Arc::new(shutdown);
468        let mut handles = Vec::new();
469        for _ in 0..10 {
470            let s = Arc::clone(&shutdown);
471            handles.push(tokio::spawn(async move {
472                s.trigger();
473            }));
474        }
475        for h in handles {
476            h.await.unwrap();
477        }
478
479        assert!(signal.is_shutdown_requested());
480    }
481
482    #[tokio::test]
483    async fn test_signal_wait_multiple_waiters() {
484        let shutdown = GracefulShutdown::new(Duration::from_secs(1));
485        let mut handles = Vec::new();
486
487        for _ in 0..5 {
488            let signal = shutdown.signal();
489            handles.push(tokio::spawn(async move {
490                signal.wait().await;
491            }));
492        }
493
494        // Let waiters register, then trigger.
495        tokio::time::sleep(Duration::from_millis(50)).await;
496        shutdown.trigger();
497
498        // All waiters should resolve.
499        for h in handles {
500            tokio::time::timeout(Duration::from_secs(2), h)
501                .await
502                .expect("waiter should complete")
503                .expect("waiter task should not panic");
504        }
505    }
506}