Skip to main content

liminal_server/server/
shutdown.rs

1use std::fmt;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::{Arc, Condvar, Mutex};
4use std::thread::{self, JoinHandle};
5use std::time::{Duration, Instant};
6
7use signal_hook::consts::signal::{SIGINT, SIGTERM};
8use signal_hook::iterator::{Handle as SignalIteratorHandle, Signals};
9
10use crate::ServerError;
11use crate::server::connection::ConnectionSupervisor;
12use crate::server::listener::ServerListener;
13
14const DRAIN_PROGRESS_INTERVAL: Duration = Duration::from_millis(100);
15const FORCE_CLOSE_SETTLE_TIMEOUT: Duration = Duration::from_millis(500);
16const FORCE_CLOSE_POLL_INTERVAL: Duration = Duration::from_millis(10);
17
18/// Idempotent shutdown activation handle shared by the runtime and signal thread.
19#[derive(Clone)]
20pub struct ShutdownHandle {
21    inner: Arc<ShutdownState>,
22}
23
24impl ShutdownHandle {
25    /// Creates a new inactive shutdown handle.
26    #[must_use]
27    pub fn new() -> Self {
28        Self {
29            inner: Arc::new(ShutdownState::new()),
30        }
31    }
32
33    /// Initiates shutdown exactly once.
34    ///
35    /// Returns `true` for the first caller that transitions the handle to active,
36    /// and `false` for subsequent calls.
37    pub fn initiate(&self) -> bool {
38        if self.inner.initiated.swap(true, Ordering::SeqCst) {
39            tracing::debug!("shutdown request ignored because shutdown is already active");
40            return false;
41        }
42
43        tracing::info!("shutdown requested");
44        self.inner.notify();
45        true
46    }
47
48    /// Blocks until shutdown is initiated.
49    pub fn wait(&self) {
50        if self.is_initiated() {
51            return;
52        }
53        let Ok(mut guard) = self.inner.wait_lock.lock() else {
54            return;
55        };
56        while !self.is_initiated() {
57            match self.inner.waiter.wait(guard) {
58                Ok(next_guard) => guard = next_guard,
59                Err(_) => return,
60            }
61        }
62    }
63
64    /// Returns whether shutdown has been initiated.
65    #[must_use]
66    pub fn is_initiated(&self) -> bool {
67        self.inner.initiated.load(Ordering::SeqCst)
68    }
69}
70
71impl Default for ShutdownHandle {
72    fn default() -> Self {
73        Self::new()
74    }
75}
76
77impl fmt::Debug for ShutdownHandle {
78    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
79        formatter
80            .debug_struct("ShutdownHandle")
81            .field("initiated", &self.is_initiated())
82            .finish()
83    }
84}
85
86#[derive(Debug)]
87struct ShutdownState {
88    initiated: AtomicBool,
89    wait_lock: Mutex<()>,
90    waiter: Condvar,
91}
92
93impl ShutdownState {
94    const fn new() -> Self {
95        Self {
96            initiated: AtomicBool::new(false),
97            wait_lock: Mutex::new(()),
98            waiter: Condvar::new(),
99        }
100    }
101
102    fn notify(&self) {
103        if let Ok(_guard) = self.wait_lock.lock() {
104            self.waiter.notify_all();
105        }
106    }
107}
108
109/// Process-global OS signal registration for graceful shutdown.
110#[derive(Debug)]
111pub struct SignalShutdownRegistration {
112    signal_handle: SignalIteratorHandle,
113    worker: Option<JoinHandle<()>>,
114}
115
116impl SignalShutdownRegistration {
117    const fn new(signal_handle: SignalIteratorHandle, worker: JoinHandle<()>) -> Self {
118        Self {
119            signal_handle,
120            worker: Some(worker),
121        }
122    }
123}
124
125impl Drop for SignalShutdownRegistration {
126    fn drop(&mut self) {
127        self.signal_handle.close();
128        let Some(worker) = self.worker.take() else {
129            return;
130        };
131        if worker.join().is_err() {
132            tracing::debug!("shutdown signal worker terminated unexpectedly");
133        }
134    }
135}
136
137/// Registers SIGTERM and SIGINT handlers that initiate the supplied handle.
138///
139/// # Errors
140/// Returns [`ServerError::ListenerAccept`] when the OS signal registration fails.
141pub fn register_signal_handlers(
142    handle: ShutdownHandle,
143) -> Result<SignalShutdownRegistration, ServerError> {
144    let mut signals =
145        Signals::new([SIGTERM, SIGINT]).map_err(|error| ServerError::ListenerAccept {
146            message: format!("failed to register shutdown signal handlers: {error}"),
147        })?;
148    let signal_handle = signals.handle();
149    let worker = thread::spawn(move || {
150        for signal in signals.forever() {
151            tracing::info!(signal, "received shutdown signal");
152            handle.initiate();
153        }
154    });
155    Ok(SignalShutdownRegistration::new(signal_handle, worker))
156}
157
158/// Runs the graceful shutdown sequence after the handle has been activated.
159///
160/// # Errors
161/// Returns [`ServerError`] when stop-accepting or durable flush fails.
162pub fn run_shutdown_sequence(
163    listener: &mut ServerListener,
164    supervisor: &ConnectionSupervisor,
165    drain_timeout: Duration,
166) -> Result<(), ServerError> {
167    tracing::info!(?drain_timeout, "starting graceful shutdown sequence");
168    // Stop accepting new connections first so none can slip into the accept
169    // window after shutdown begins and miss the notification broadcast below.
170    listener.stop_accepting()?;
171    supervisor.notify_shutdown_subscribers();
172
173    let drained = drain_connections(supervisor, drain_timeout);
174    if !drained {
175        supervisor.force_close_active_connections();
176        wait_after_force_close(supervisor);
177    }
178
179    flush_durable_state(supervisor)?;
180    supervisor.shutdown();
181    tracing::info!("graceful shutdown sequence complete");
182    Ok(())
183}
184
185fn drain_connections(supervisor: &ConnectionSupervisor, drain_timeout: Duration) -> bool {
186    let deadline = Instant::now() + drain_timeout;
187    let mut last_log = Instant::now()
188        .checked_sub(DRAIN_PROGRESS_INTERVAL)
189        .unwrap_or_else(Instant::now);
190
191    loop {
192        let reaped = supervisor.reap_crashed_connections();
193        if reaped > 0 {
194            tracing::debug!(
195                reaped_connections = reaped,
196                "reaped connections during drain"
197            );
198        }
199
200        let active = supervisor.active_connection_count();
201        if active == 0 {
202            tracing::info!("all connections drained before timeout");
203            return true;
204        }
205
206        let now = Instant::now();
207        if now >= deadline {
208            tracing::warn!(
209                active_connections = active,
210                ?drain_timeout,
211                "drain timeout expired with active connections"
212            );
213            return false;
214        }
215
216        if now.duration_since(last_log) >= DRAIN_PROGRESS_INTERVAL {
217            tracing::info!(
218                active_connections = active,
219                "waiting for active connections to drain"
220            );
221            last_log = now;
222        }
223
224        let remaining = deadline.saturating_duration_since(now);
225        thread::sleep(remaining.min(FORCE_CLOSE_POLL_INTERVAL));
226    }
227}
228
229fn wait_after_force_close(supervisor: &ConnectionSupervisor) {
230    let deadline = Instant::now() + FORCE_CLOSE_SETTLE_TIMEOUT;
231    while Instant::now() < deadline {
232        let reaped = supervisor.reap_crashed_connections();
233        let active = supervisor.active_connection_count();
234        if active == 0 {
235            return;
236        }
237        if reaped > 0 {
238            tracing::debug!(
239                reaped_connections = reaped,
240                active_connections = active,
241                "reaped connections after force close"
242            );
243        }
244        thread::sleep(FORCE_CLOSE_POLL_INTERVAL);
245    }
246
247    let remaining = supervisor.active_connection_count();
248    if remaining > 0 {
249        tracing::warn!(
250            active_connections = remaining,
251            "connections remained active after force-close settle window"
252        );
253    }
254}
255
256fn flush_durable_state(supervisor: &ConnectionSupervisor) -> Result<(), ServerError> {
257    tracing::info!("flushing durable channel state");
258    supervisor.flush_durable_state().map_err(|error| {
259        tracing::error!(%error, "durable state flush failed during shutdown");
260        match error {
261            ServerError::ShutdownFlush { .. } => error,
262            other => ServerError::ShutdownFlush {
263                message: other.to_string(),
264            },
265        }
266    })?;
267    tracing::info!("durable channel state flushed");
268    Ok(())
269}
270
271#[cfg(test)]
272mod tests {
273    use std::thread;
274    use std::time::Duration;
275
276    use super::{ShutdownHandle, drain_connections};
277    use crate::server::connection::ConnectionSupervisor;
278
279    #[test]
280    fn shutdown_handle_initiates_once() {
281        let handle = ShutdownHandle::new();
282
283        assert!(!handle.is_initiated());
284        assert!(handle.initiate());
285        assert!(handle.is_initiated());
286        assert!(!handle.initiate());
287    }
288
289    #[test]
290    fn shutdown_handle_wait_unblocks_on_initiate() -> Result<(), Box<dyn std::error::Error>> {
291        let handle = ShutdownHandle::new();
292        let waiter = handle.clone();
293        let worker = thread::spawn(move || {
294            waiter.wait();
295            waiter.is_initiated()
296        });
297
298        thread::sleep(Duration::from_millis(10));
299        assert!(handle.initiate());
300        let observed = worker.join().map_err(|_| "wait worker panicked")?;
301
302        assert!(observed);
303        Ok(())
304    }
305
306    #[test]
307    fn drain_returns_immediately_when_no_connections_are_active()
308    -> Result<(), Box<dyn std::error::Error>> {
309        let supervisor = ConnectionSupervisor::new()?;
310
311        let drained = drain_connections(&supervisor, Duration::from_secs(5));
312
313        assert!(drained);
314        supervisor.shutdown();
315        Ok(())
316    }
317}