1use super::{CancellationToken, Runtime, RuntimeConfig};
24
25use futures::Future;
26use once_cell::sync::OnceCell;
27use parking_lot::Mutex;
28use std::time::Duration;
29use tokio::{signal, task::JoinHandle};
30
31static RT: OnceCell<tokio::runtime::Runtime> = OnceCell::new();
32static RTHANDLE: OnceCell<tokio::runtime::Handle> = OnceCell::new();
33static INIT: OnceCell<Mutex<Option<tokio::task::JoinHandle<anyhow::Result<()>>>>> = OnceCell::new();
34
35use crate::config::environment_names::worker as env_worker;
36
37const SHUTDOWN_MESSAGE: &str =
38 "Application received shutdown signal; attempting to gracefully shutdown";
39const SHUTDOWN_TIMEOUT_MESSAGE: &str =
40 "Use DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT to control the graceful shutdown timeout";
41
42pub const DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_DEBUG: u64 = 5;
44
45pub const DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_RELEASE: u64 = 30;
47
48#[derive(Debug, Clone)]
49pub struct Worker {
50 runtime: Runtime,
51 config: RuntimeConfig,
52}
53
54impl Worker {
55 pub fn from_settings() -> anyhow::Result<Worker> {
57 let config = RuntimeConfig::from_settings()?;
58 Worker::from_config(config)
59 }
60
61 pub fn from_config(config: RuntimeConfig) -> anyhow::Result<Worker> {
63 if RT.get().is_some() || RTHANDLE.get().is_some() {
65 return Err(anyhow::anyhow!("Worker already initialized"));
66 }
67
68 let rt = RT.try_insert(config.create_runtime()?).map_err(|_| {
72 anyhow::anyhow!("Failed to create worker; Only a single Worker should ever be created")
73 })?;
74
75 let runtime = Runtime::from_handle(rt.handle().clone())?;
76 Ok(Worker { runtime, config })
77 }
78
79 pub fn runtime_from_existing() -> anyhow::Result<Runtime> {
80 if let Some(rt) = RT.get() {
81 Ok(Runtime::from_handle(rt.handle().clone())?)
82 } else if let Some(rt) = RTHANDLE.get() {
83 Ok(Runtime::from_handle(rt.clone())?)
84 } else {
85 let runtime = Runtime::from_settings()?;
93 let _ = RTHANDLE.set(runtime.primary());
94 Ok(runtime)
95 }
96 }
97
98 pub fn has_existing_runtime() -> bool {
103 RT.get().is_some() || RTHANDLE.get().is_some()
104 }
105
106 pub fn tokio_runtime(&self) -> anyhow::Result<&'static tokio::runtime::Runtime> {
107 RT.get()
108 .ok_or_else(|| anyhow::anyhow!("Worker not initialized"))
109 }
110
111 pub fn runtime(&self) -> &Runtime {
112 &self.runtime
113 }
114
115 pub fn execute<F, Fut>(self, f: F) -> anyhow::Result<()>
116 where
117 F: FnOnce(Runtime) -> Fut + Send + 'static,
118 Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
119 {
120 let runtime = self.runtime.clone();
121 runtime.secondary().block_on(self.execute_internal(f))??;
122 runtime.shutdown();
123 Ok(())
124 }
125
126 pub async fn execute_async<F, Fut>(self, f: F) -> anyhow::Result<()>
127 where
128 F: FnOnce(Runtime) -> Fut + Send + 'static,
129 Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
130 {
131 let runtime = self.runtime.clone();
132 let task = self.execute_internal(f);
133 task.await??;
134 runtime.shutdown();
135 Ok(())
136 }
137
138 fn execute_internal<F, Fut>(self, f: F) -> JoinHandle<anyhow::Result<()>>
141 where
142 F: FnOnce(Runtime) -> Fut + Send + 'static,
143 Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
144 {
145 let runtime = self.runtime.clone();
146 let primary = runtime.primary();
147 let secondary = runtime.secondary();
148
149 let timeout = std::env::var(env_worker::DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT)
150 .ok()
151 .and_then(|s| s.parse::<u64>().ok())
152 .unwrap_or({
153 if cfg!(debug_assertions) {
154 DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_DEBUG
155 } else {
156 DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_RELEASE
157 }
158 });
159
160 INIT.set(Mutex::new(Some(secondary.spawn(async move {
161 tokio::spawn(signal_handler(runtime.primary_token().clone()));
163
164 let cancel_token = runtime.child_token();
165 let (mut app_tx, app_rx) = tokio::sync::oneshot::channel::<()>();
166
167 let task: JoinHandle<anyhow::Result<()>> = primary.spawn(async move {
169 let _rx = app_rx;
170 f(runtime).await
171 });
172
173 tokio::select! {
174 _ = cancel_token.cancelled() => {
175 tracing::debug!("{SHUTDOWN_MESSAGE}");
176 tracing::debug!("{} {} seconds", SHUTDOWN_TIMEOUT_MESSAGE, timeout);
177 }
178
179 _ = app_tx.closed() => {
180 }
181 };
182
183 let result = tokio::select! {
184 result = task => {
185 result
186 }
187
188 _ = tokio::time::sleep(tokio::time::Duration::from_secs(timeout)) => {
189 tracing::debug!("Application did not shutdown in time; terminating");
190 std::process::exit(911);
191 }
192 }?;
193
194 match &result {
195 Ok(_) => {
196 tracing::debug!("Application shutdown successfully");
197 }
198 Err(e) => {
199 tracing::error!("Application shutdown with error: {:?}", e);
200 }
201 }
202
203 result
204 }))))
205 .expect("Failed to spawn application task");
206
207 INIT
208 .get()
209 .expect("Application task not initialized")
210 .lock()
211 .take()
212 .expect("Application initialized; but another thread is awaiting it; Worker.execute() can only be called once")
213 }
214
215 pub fn from_current() -> anyhow::Result<Worker> {
216 if RT.get().is_some() || RTHANDLE.get().is_some() {
217 return Err(anyhow::anyhow!("Worker already initialized"));
218 }
219 let runtime = Runtime::from_current()?;
220 let config = RuntimeConfig::from_settings()?;
221 Ok(Worker { runtime, config })
222 }
223}
224
225async fn signal_handler(cancel_token: CancellationToken) -> anyhow::Result<()> {
227 let ctrl_c = async {
228 signal::ctrl_c().await?;
229 anyhow::Ok(())
230 };
231
232 let sigterm = async {
233 signal::unix::signal(signal::unix::SignalKind::terminate())?
234 .recv()
235 .await;
236 anyhow::Ok(())
237 };
238
239 tokio::select! {
240 _ = ctrl_c => {
241 tracing::info!("Ctrl+C received, starting graceful shutdown");
242 },
243 _ = sigterm => {
244 tracing::info!("SIGTERM received, starting graceful shutdown");
245 },
246 _ = cancel_token.cancelled() => {
247 tracing::debug!("CancellationToken triggered; shutting down");
248 },
249 }
250
251 cancel_token.cancel();
253
254 Ok(())
255}