1use super::{CancellationToken, Runtime, RuntimeConfig};
24
25use futures::Future;
26use once_cell::sync::OnceCell;
27use parking_lot::Mutex;
28use std::time::Duration;
29use tokio::{signal, task::JoinHandle};
30
31static RT: OnceCell<tokio::runtime::Runtime> = OnceCell::new();
32static RTHANDLE: OnceCell<tokio::runtime::Handle> = OnceCell::new();
33static INIT: OnceCell<Mutex<Option<tokio::task::JoinHandle<anyhow::Result<()>>>>> = OnceCell::new();
34
35const SHUTDOWN_MESSAGE: &str =
36 "Application received shutdown signal; attempting to gracefully shutdown";
37const SHUTDOWN_TIMEOUT_MESSAGE: &str =
38 "Use DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT to control the graceful shutdown timeout";
39
40pub const DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT: &str = "DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT";
42
43pub const DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_DEBUG: u64 = 5;
45
46pub const DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_RELEASE: u64 = 30;
48
49#[derive(Debug, Clone)]
50pub struct Worker {
51 runtime: Runtime,
52 config: RuntimeConfig,
53}
54
55impl Worker {
56 pub fn from_settings() -> anyhow::Result<Worker> {
58 let config = RuntimeConfig::from_settings()?;
59 Worker::from_config(config)
60 }
61
62 pub fn from_config(config: RuntimeConfig) -> anyhow::Result<Worker> {
64 if RT.get().is_some() || RTHANDLE.get().is_some() {
66 return Err(anyhow::anyhow!("Worker already initialized"));
67 }
68
69 let rt = RT.try_insert(config.create_runtime()?).map_err(|_| {
73 anyhow::anyhow!("Failed to create worker; Only a single Worker should ever be created")
74 })?;
75
76 let runtime = Runtime::from_handle(rt.handle().clone())?;
77 Ok(Worker { runtime, config })
78 }
79
80 pub fn runtime_from_existing() -> anyhow::Result<Runtime> {
81 if let Some(rt) = RT.get() {
82 Ok(Runtime::from_handle(rt.handle().clone())?)
83 } else if let Some(rt) = RTHANDLE.get() {
84 Ok(Runtime::from_handle(rt.clone())?)
85 } else {
86 Runtime::from_settings()
87 }
88 }
89
90 pub fn tokio_runtime(&self) -> anyhow::Result<&'static tokio::runtime::Runtime> {
91 RT.get()
92 .ok_or_else(|| anyhow::anyhow!("Worker not initialized"))
93 }
94
95 pub fn runtime(&self) -> &Runtime {
96 &self.runtime
97 }
98
99 pub fn execute<F, Fut>(self, f: F) -> anyhow::Result<()>
100 where
101 F: FnOnce(Runtime) -> Fut + Send + 'static,
102 Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
103 {
104 let runtime = self.runtime.clone();
105 runtime.secondary().block_on(self.execute_internal(f))??;
106 runtime.shutdown();
107 Ok(())
108 }
109
110 pub async fn execute_async<F, Fut>(self, f: F) -> anyhow::Result<()>
111 where
112 F: FnOnce(Runtime) -> Fut + Send + 'static,
113 Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
114 {
115 let runtime = self.runtime.clone();
116 let task = self.execute_internal(f);
117 task.await??;
118 runtime.shutdown();
119 Ok(())
120 }
121
122 fn execute_internal<F, Fut>(self, f: F) -> JoinHandle<anyhow::Result<()>>
125 where
126 F: FnOnce(Runtime) -> Fut + Send + 'static,
127 Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
128 {
129 let runtime = self.runtime.clone();
130 let primary = runtime.primary();
131 let secondary = runtime.secondary();
132
133 let timeout = std::env::var(DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT)
134 .ok()
135 .and_then(|s| s.parse::<u64>().ok())
136 .unwrap_or({
137 if cfg!(debug_assertions) {
138 DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_DEBUG
139 } else {
140 DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_RELEASE
141 }
142 });
143
144 INIT.set(Mutex::new(Some(secondary.spawn(async move {
145 tokio::spawn(signal_handler(runtime.primary_token().clone()));
147
148 let cancel_token = runtime.child_token();
149 let (mut app_tx, app_rx) = tokio::sync::oneshot::channel::<()>();
150
151 let task: JoinHandle<anyhow::Result<()>> = primary.spawn(async move {
153 let _rx = app_rx;
154 f(runtime).await
155 });
156
157 tokio::select! {
158 _ = cancel_token.cancelled() => {
159 tracing::debug!("{}", SHUTDOWN_MESSAGE);
160 tracing::debug!("{} {} seconds", SHUTDOWN_TIMEOUT_MESSAGE, timeout);
161 }
162
163 _ = app_tx.closed() => {
164 }
165 };
166
167 let result = tokio::select! {
168 result = task => {
169 result
170 }
171
172 _ = tokio::time::sleep(tokio::time::Duration::from_secs(timeout)) => {
173 tracing::debug!("Application did not shutdown in time; terminating");
174 std::process::exit(911);
175 }
176 }?;
177
178 match &result {
179 Ok(_) => {
180 tracing::debug!("Application shutdown successfully");
181 }
182 Err(e) => {
183 tracing::error!("Application shutdown with error: {:?}", e);
184 }
185 }
186
187 result
188 }))))
189 .expect("Failed to spawn application task");
190
191 INIT
192 .get()
193 .expect("Application task not initialized")
194 .lock()
195 .take()
196 .expect("Application initialized; but another thread is awaiting it; Worker.execute() can only be called once")
197 }
198
199 pub fn from_current() -> anyhow::Result<Worker> {
200 if RT.get().is_some() || RTHANDLE.get().is_some() {
201 return Err(anyhow::anyhow!("Worker already initialized"));
202 }
203 let runtime = Runtime::from_current()?;
204 let config = RuntimeConfig::from_settings()?;
205 Ok(Worker { runtime, config })
206 }
207}
208
209async fn signal_handler(cancel_token: CancellationToken) -> anyhow::Result<()> {
211 let ctrl_c = async {
212 signal::ctrl_c().await?;
213 anyhow::Ok(())
214 };
215
216 let sigterm = async {
217 signal::unix::signal(signal::unix::SignalKind::terminate())?
218 .recv()
219 .await;
220 anyhow::Ok(())
221 };
222
223 tokio::select! {
224 _ = ctrl_c => {
225 tracing::info!("Ctrl+C received, starting graceful shutdown");
226 },
227 _ = sigterm => {
228 tracing::info!("SIGTERM received, starting graceful shutdown");
229 },
230 _ = cancel_token.cancelled() => {
231 tracing::debug!("CancellationToken triggered; shutting down");
232 },
233 }
234
235 cancel_token.cancel();
237
238 Ok(())
239}