liminal_server/server/
shutdown.rs1use std::fmt;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::{Arc, Condvar, Mutex};
4use std::thread::{self, JoinHandle};
5use std::time::{Duration, Instant};
6
7use signal_hook::consts::signal::{SIGINT, SIGTERM};
8use signal_hook::iterator::{Handle as SignalIteratorHandle, Signals};
9
10use crate::ServerError;
11use crate::server::connection::ConnectionSupervisor;
12use crate::server::listener::ServerListener;
13
14const DRAIN_PROGRESS_INTERVAL: Duration = Duration::from_millis(100);
15const FORCE_CLOSE_SETTLE_TIMEOUT: Duration = Duration::from_millis(500);
16const FORCE_CLOSE_POLL_INTERVAL: Duration = Duration::from_millis(10);
17
18#[derive(Clone)]
20pub struct ShutdownHandle {
21 inner: Arc<ShutdownState>,
22}
23
24impl ShutdownHandle {
25 #[must_use]
27 pub fn new() -> Self {
28 Self {
29 inner: Arc::new(ShutdownState::new()),
30 }
31 }
32
33 pub fn initiate(&self) -> bool {
38 if self.inner.initiated.swap(true, Ordering::SeqCst) {
39 tracing::debug!("shutdown request ignored because shutdown is already active");
40 return false;
41 }
42
43 tracing::info!("shutdown requested");
44 self.inner.notify();
45 true
46 }
47
48 pub fn wait(&self) {
50 if self.is_initiated() {
51 return;
52 }
53 let Ok(mut guard) = self.inner.wait_lock.lock() else {
54 return;
55 };
56 while !self.is_initiated() {
57 match self.inner.waiter.wait(guard) {
58 Ok(next_guard) => guard = next_guard,
59 Err(_) => return,
60 }
61 }
62 }
63
64 #[must_use]
66 pub fn is_initiated(&self) -> bool {
67 self.inner.initiated.load(Ordering::SeqCst)
68 }
69}
70
71impl Default for ShutdownHandle {
72 fn default() -> Self {
73 Self::new()
74 }
75}
76
77impl fmt::Debug for ShutdownHandle {
78 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
79 formatter
80 .debug_struct("ShutdownHandle")
81 .field("initiated", &self.is_initiated())
82 .finish()
83 }
84}
85
86#[derive(Debug)]
87struct ShutdownState {
88 initiated: AtomicBool,
89 wait_lock: Mutex<()>,
90 waiter: Condvar,
91}
92
93impl ShutdownState {
94 const fn new() -> Self {
95 Self {
96 initiated: AtomicBool::new(false),
97 wait_lock: Mutex::new(()),
98 waiter: Condvar::new(),
99 }
100 }
101
102 fn notify(&self) {
103 if let Ok(_guard) = self.wait_lock.lock() {
104 self.waiter.notify_all();
105 }
106 }
107}
108
109#[derive(Debug)]
111pub struct SignalShutdownRegistration {
112 signal_handle: SignalIteratorHandle,
113 worker: Option<JoinHandle<()>>,
114}
115
116impl SignalShutdownRegistration {
117 const fn new(signal_handle: SignalIteratorHandle, worker: JoinHandle<()>) -> Self {
118 Self {
119 signal_handle,
120 worker: Some(worker),
121 }
122 }
123}
124
125impl Drop for SignalShutdownRegistration {
126 fn drop(&mut self) {
127 self.signal_handle.close();
128 let Some(worker) = self.worker.take() else {
129 return;
130 };
131 if worker.join().is_err() {
132 tracing::debug!("shutdown signal worker terminated unexpectedly");
133 }
134 }
135}
136
137pub fn register_signal_handlers(
142 handle: ShutdownHandle,
143) -> Result<SignalShutdownRegistration, ServerError> {
144 let mut signals =
145 Signals::new([SIGTERM, SIGINT]).map_err(|error| ServerError::ListenerAccept {
146 message: format!("failed to register shutdown signal handlers: {error}"),
147 })?;
148 let signal_handle = signals.handle();
149 let worker = thread::spawn(move || {
150 for signal in signals.forever() {
151 tracing::info!(signal, "received shutdown signal");
152 handle.initiate();
153 }
154 });
155 Ok(SignalShutdownRegistration::new(signal_handle, worker))
156}
157
158pub fn run_shutdown_sequence(
163 listener: &mut ServerListener,
164 supervisor: &ConnectionSupervisor,
165 drain_timeout: Duration,
166) -> Result<(), ServerError> {
167 tracing::info!(?drain_timeout, "starting graceful shutdown sequence");
168 listener.stop_accepting()?;
171 supervisor.notify_shutdown_subscribers();
172
173 let drained = drain_connections(supervisor, drain_timeout);
174 if !drained {
175 supervisor.force_close_active_connections();
176 wait_after_force_close(supervisor);
177 }
178
179 flush_durable_state(supervisor)?;
180 supervisor.shutdown();
181 tracing::info!("graceful shutdown sequence complete");
182 Ok(())
183}
184
185fn drain_connections(supervisor: &ConnectionSupervisor, drain_timeout: Duration) -> bool {
186 let deadline = Instant::now() + drain_timeout;
187 let mut last_log = Instant::now()
188 .checked_sub(DRAIN_PROGRESS_INTERVAL)
189 .unwrap_or_else(Instant::now);
190
191 loop {
192 let reaped = supervisor.reap_crashed_connections();
193 if reaped > 0 {
194 tracing::debug!(
195 reaped_connections = reaped,
196 "reaped connections during drain"
197 );
198 }
199
200 let active = supervisor.active_connection_count();
201 if active == 0 {
202 tracing::info!("all connections drained before timeout");
203 return true;
204 }
205
206 let now = Instant::now();
207 if now >= deadline {
208 tracing::warn!(
209 active_connections = active,
210 ?drain_timeout,
211 "drain timeout expired with active connections"
212 );
213 return false;
214 }
215
216 if now.duration_since(last_log) >= DRAIN_PROGRESS_INTERVAL {
217 tracing::info!(
218 active_connections = active,
219 "waiting for active connections to drain"
220 );
221 last_log = now;
222 }
223
224 let remaining = deadline.saturating_duration_since(now);
225 thread::sleep(remaining.min(FORCE_CLOSE_POLL_INTERVAL));
226 }
227}
228
229fn wait_after_force_close(supervisor: &ConnectionSupervisor) {
230 let deadline = Instant::now() + FORCE_CLOSE_SETTLE_TIMEOUT;
231 while Instant::now() < deadline {
232 let reaped = supervisor.reap_crashed_connections();
233 let active = supervisor.active_connection_count();
234 if active == 0 {
235 return;
236 }
237 if reaped > 0 {
238 tracing::debug!(
239 reaped_connections = reaped,
240 active_connections = active,
241 "reaped connections after force close"
242 );
243 }
244 thread::sleep(FORCE_CLOSE_POLL_INTERVAL);
245 }
246
247 let remaining = supervisor.active_connection_count();
248 if remaining > 0 {
249 tracing::warn!(
250 active_connections = remaining,
251 "connections remained active after force-close settle window"
252 );
253 }
254}
255
256fn flush_durable_state(supervisor: &ConnectionSupervisor) -> Result<(), ServerError> {
257 tracing::info!("flushing durable channel state");
258 supervisor.flush_durable_state().map_err(|error| {
259 tracing::error!(%error, "durable state flush failed during shutdown");
260 match error {
261 ServerError::ShutdownFlush { .. } => error,
262 other => ServerError::ShutdownFlush {
263 message: other.to_string(),
264 },
265 }
266 })?;
267 tracing::info!("durable channel state flushed");
268 Ok(())
269}
270
271#[cfg(test)]
272mod tests {
273 use std::thread;
274 use std::time::Duration;
275
276 use super::{ShutdownHandle, drain_connections};
277 use crate::server::connection::ConnectionSupervisor;
278
279 #[test]
280 fn shutdown_handle_initiates_once() {
281 let handle = ShutdownHandle::new();
282
283 assert!(!handle.is_initiated());
284 assert!(handle.initiate());
285 assert!(handle.is_initiated());
286 assert!(!handle.initiate());
287 }
288
289 #[test]
290 fn shutdown_handle_wait_unblocks_on_initiate() -> Result<(), Box<dyn std::error::Error>> {
291 let handle = ShutdownHandle::new();
292 let waiter = handle.clone();
293 let worker = thread::spawn(move || {
294 waiter.wait();
295 waiter.is_initiated()
296 });
297
298 thread::sleep(Duration::from_millis(10));
299 assert!(handle.initiate());
300 let observed = worker.join().map_err(|_| "wait worker panicked")?;
301
302 assert!(observed);
303 Ok(())
304 }
305
306 #[test]
307 fn drain_returns_immediately_when_no_connections_are_active()
308 -> Result<(), Box<dyn std::error::Error>> {
309 let supervisor = ConnectionSupervisor::new()?;
310
311 let drained = drain_connections(&supervisor, Duration::from_secs(5));
312
313 assert!(drained);
314 supervisor.shutdown();
315 Ok(())
316 }
317}