1use super::{CancellationToken, Result, Runtime, RuntimeConfig, error};
24
25use futures::Future;
26use once_cell::sync::OnceCell;
27use std::{sync::Mutex, time::Duration};
28use tokio::{signal, task::JoinHandle};
29
30static RT: OnceCell<tokio::runtime::Runtime> = OnceCell::new();
31static RTHANDLE: OnceCell<tokio::runtime::Handle> = OnceCell::new();
32static INIT: OnceCell<Mutex<Option<tokio::task::JoinHandle<Result<()>>>>> = OnceCell::new();
33
34const SHUTDOWN_MESSAGE: &str =
35 "Application received shutdown signal; attempting to gracefully shutdown";
36const SHUTDOWN_TIMEOUT_MESSAGE: &str =
37 "Use DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT to control the graceful shutdown timeout";
38
39pub const DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT: &str = "DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT";
41
42pub const DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_DEBUG: u64 = 5;
44
45pub const DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_RELEASE: u64 = 30;
47
48#[derive(Debug, Clone)]
49pub struct Worker {
50 runtime: Runtime,
51 config: RuntimeConfig,
52}
53
54impl Worker {
55 pub fn from_settings() -> Result<Worker> {
57 let config = RuntimeConfig::from_settings()?;
58 Worker::from_config(config)
59 }
60
61 pub fn from_config(config: RuntimeConfig) -> Result<Worker> {
63 if RT.get().is_some() || RTHANDLE.get().is_some() {
65 return Err(error!("Worker already initialized"));
66 }
67
68 let rt = RT.try_insert(config.create_runtime()?).map_err(|_| {
72 error!("Failed to create worker; Only a single Worker should ever be created")
73 })?;
74
75 let runtime = Runtime::from_handle(rt.handle().clone())?;
76 Ok(Worker { runtime, config })
77 }
78
79 pub fn runtime_from_existing() -> Result<Runtime> {
80 if let Some(rt) = RT.get() {
81 Ok(Runtime::from_handle(rt.handle().clone())?)
82 } else if let Some(rt) = RTHANDLE.get() {
83 Ok(Runtime::from_handle(rt.clone())?)
84 } else {
85 Runtime::from_settings()
86 }
87 }
88
89 pub fn tokio_runtime(&self) -> Result<&'static tokio::runtime::Runtime> {
90 RT.get().ok_or_else(|| error!("Worker not initialized"))
91 }
92
93 pub fn runtime(&self) -> &Runtime {
94 &self.runtime
95 }
96
97 pub fn execute<F, Fut>(self, f: F) -> Result<()>
98 where
99 F: FnOnce(Runtime) -> Fut + Send + 'static,
100 Fut: Future<Output = Result<()>> + Send + 'static,
101 {
102 let runtime = self.runtime.clone();
103 runtime.secondary().block_on(self.execute_internal(f))??;
104 runtime.shutdown();
105 Ok(())
106 }
107
108 pub async fn execute_async<F, Fut>(self, f: F) -> Result<()>
109 where
110 F: FnOnce(Runtime) -> Fut + Send + 'static,
111 Fut: Future<Output = Result<()>> + Send + 'static,
112 {
113 let runtime = self.runtime.clone();
114 let task = self.execute_internal(f);
115 task.await??;
116 runtime.shutdown();
117 Ok(())
118 }
119
120 fn execute_internal<F, Fut>(self, f: F) -> JoinHandle<Result<()>>
123 where
124 F: FnOnce(Runtime) -> Fut + Send + 'static,
125 Fut: Future<Output = Result<()>> + Send + 'static,
126 {
127 let runtime = self.runtime.clone();
128 let primary = runtime.primary();
129 let secondary = runtime.secondary();
130
131 let timeout = std::env::var(DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT)
132 .ok()
133 .and_then(|s| s.parse::<u64>().ok())
134 .unwrap_or({
135 if cfg!(debug_assertions) {
136 DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_DEBUG
137 } else {
138 DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_RELEASE
139 }
140 });
141
142 INIT.set(Mutex::new(Some(secondary.spawn(async move {
143 tokio::spawn(signal_handler(runtime.cancellation_token.clone()));
145
146 let cancel_token = runtime.child_token();
147 let (mut app_tx, app_rx) = tokio::sync::oneshot::channel::<()>();
148
149 let task: JoinHandle<Result<()>> = primary.spawn(async move {
151 let _rx = app_rx;
152 f(runtime).await
153 });
154
155 tokio::select! {
156 _ = cancel_token.cancelled() => {
157 tracing::debug!("{}", SHUTDOWN_MESSAGE);
158 tracing::debug!("{} {} seconds", SHUTDOWN_TIMEOUT_MESSAGE, timeout);
159 }
160
161 _ = app_tx.closed() => {
162 }
163 };
164
165 let result = tokio::select! {
166 result = task => {
167 result
168 }
169
170 _ = tokio::time::sleep(tokio::time::Duration::from_secs(timeout)) => {
171 tracing::debug!("Application did not shutdown in time; terminating");
172 std::process::exit(911);
173 }
174 }?;
175
176 match &result {
177 Ok(_) => {
178 tracing::debug!("Application shutdown successfully");
179 }
180 Err(e) => {
181 tracing::error!("Application shutdown with error: {:?}", e);
182 }
183 }
184
185 result
186 }))))
187 .expect("Failed to spawn application task");
188
189 INIT
190 .get()
191 .expect("Application task not initialized")
192 .lock()
193 .unwrap()
194 .take()
195 .expect("Application initialized; but another thread is awaiting it; Worker.execute() can only be called once")
196 }
197
198 pub fn from_current() -> Result<Worker> {
199 if RT.get().is_some() || RTHANDLE.get().is_some() {
200 return Err(error!("Worker already initialized"));
201 }
202 let runtime = Runtime::from_current()?;
203 let config = RuntimeConfig::from_settings()?;
204 Ok(Worker { runtime, config })
205 }
206}
207
208async fn signal_handler(cancel_token: CancellationToken) -> Result<()> {
210 let ctrl_c = async {
211 signal::ctrl_c().await?;
212 anyhow::Ok(())
213 };
214
215 let sigterm = async {
216 signal::unix::signal(signal::unix::SignalKind::terminate())?
217 .recv()
218 .await;
219 anyhow::Ok(())
220 };
221
222 tokio::select! {
223 _ = ctrl_c => {
224 tracing::info!("Ctrl+C received, starting graceful shutdown");
225 },
226 _ = sigterm => {
227 tracing::info!("SIGTERM received, starting graceful shutdown");
228 },
229 _ = cancel_token.cancelled() => {
230 tracing::debug!("CancellationToken triggered; shutting down");
231 },
232 }
233
234 cancel_token.cancel();
236
237 Ok(())
238}