chainrpc_core/
shutdown.rs1use std::sync::Arc;
4use std::time::Duration;
5
6use tokio::sync::watch;
7
8#[derive(Clone)]
10pub struct ShutdownSignal {
11 rx: watch::Receiver<bool>,
12}
13
14impl ShutdownSignal {
15 pub fn is_shutdown(&self) -> bool {
17 *self.rx.borrow()
18 }
19
20 pub async fn wait(&mut self) {
22 while !*self.rx.borrow() {
23 if self.rx.changed().await.is_err() {
24 return; }
26 }
27 }
28}
29
30pub struct ShutdownController {
32 tx: watch::Sender<bool>,
33}
34
35impl ShutdownController {
36 pub fn new() -> (Self, ShutdownSignal) {
38 let (tx, rx) = watch::channel(false);
39 (Self { tx }, ShutdownSignal { rx })
40 }
41
42 pub fn shutdown(&self) {
44 let _ = self.tx.send(true);
45 tracing::info!("shutdown signaled");
46 }
47
48 pub fn signal(&self) -> ShutdownSignal {
50 ShutdownSignal {
51 rx: self.tx.subscribe(),
52 }
53 }
54}
55
56impl Default for ShutdownController {
57 fn default() -> Self {
58 Self::new().0
59 }
60}
61
62pub fn install_signal_handler() -> (Arc<ShutdownController>, ShutdownSignal) {
67 let (controller, signal) = ShutdownController::new();
68 let controller = Arc::new(controller);
69 let ctrl = controller.clone();
70
71 tokio::spawn(async move {
72 let ctrl_c = tokio::signal::ctrl_c();
73
74 #[cfg(unix)]
75 {
76 use tokio::signal::unix::{signal, SignalKind};
77 let mut sigterm =
78 signal(SignalKind::terminate()).expect("failed to install SIGTERM handler");
79 tokio::select! {
80 _ = ctrl_c => {
81 tracing::info!("received SIGINT");
82 }
83 _ = sigterm.recv() => {
84 tracing::info!("received SIGTERM");
85 }
86 }
87 }
88
89 #[cfg(not(unix))]
90 {
91 let _ = ctrl_c.await;
92 tracing::info!("received SIGINT");
93 }
94
95 ctrl.shutdown();
96 });
97
98 (controller, signal)
99}
100
101pub async fn shutdown_with_timeout<F, Fut>(
105 signal: &mut ShutdownSignal,
106 timeout: Duration,
107 drain_fn: F,
108) where
109 F: FnOnce() -> Fut,
110 Fut: std::future::Future<Output = ()>,
111{
112 signal.wait().await;
113 tracing::info!(
114 "shutdown initiated, draining in-flight requests (timeout: {}s)...",
115 timeout.as_secs()
116 );
117
118 match tokio::time::timeout(timeout, drain_fn()).await {
119 Ok(()) => {
120 tracing::info!("graceful shutdown complete");
121 }
122 Err(_) => {
123 tracing::warn!("shutdown timeout exceeded, forcing exit");
124 }
125 }
126}
127
128#[cfg(test)]
129mod tests {
130 use super::*;
131
132 #[test]
133 fn signal_starts_false() {
134 let (_controller, signal) = ShutdownController::new();
135 assert!(!signal.is_shutdown());
136 }
137
138 #[test]
139 fn signal_after_shutdown() {
140 let (controller, signal) = ShutdownController::new();
141 controller.shutdown();
142 assert!(signal.is_shutdown());
143 }
144
145 #[test]
146 fn multiple_signals() {
147 let (controller, _signal) = ShutdownController::new();
148 let s1 = controller.signal();
149 let s2 = controller.signal();
150
151 assert!(!s1.is_shutdown());
152 assert!(!s2.is_shutdown());
153
154 controller.shutdown();
155
156 assert!(s1.is_shutdown());
157 assert!(s2.is_shutdown());
158 }
159
160 #[tokio::test]
161 async fn wait_for_shutdown() {
162 let (controller, mut signal) = ShutdownController::new();
163
164 let handle = tokio::spawn(async move {
165 signal.wait().await;
166 true
167 });
168
169 tokio::time::sleep(Duration::from_millis(10)).await;
171 controller.shutdown();
172
173 let result = handle.await.unwrap();
174 assert!(result);
175 }
176
177 #[tokio::test]
178 async fn shutdown_with_timeout_completes() {
179 let (controller, mut signal) = ShutdownController::new();
180 controller.shutdown();
181
182 shutdown_with_timeout(&mut signal, Duration::from_secs(5), || async {
183 tokio::time::sleep(Duration::from_millis(10)).await;
185 })
186 .await;
187 }
189}