1use super::{error, CancellationToken, Result, Runtime, RuntimeConfig};
36
37use futures::Future;
38use once_cell::sync::OnceCell;
39use std::{sync::Mutex, time::Duration};
40use tokio::{signal, task::JoinHandle};
41
42static RT: OnceCell<tokio::runtime::Runtime> = OnceCell::new();
43static INIT: OnceCell<Mutex<Option<tokio::task::JoinHandle<Result<()>>>>> = OnceCell::new();
44
45const SHUTDOWN_MESSAGE: &str =
46 "Application received shutdown signal; attempting to gracefully shutdown";
47const SHUTDOWN_TIMEOUT_MESSAGE: &str =
48 "Use DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT to control the graceful shutdown timeout";
49
50pub const DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT: &str = "DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT";
52
53pub const DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_DEBUG: u64 = 5;
55
56pub const DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_RELEASE: u64 = 30;
58
59#[derive(Debug, Clone)]
60pub struct Worker {
61 runtime: Runtime,
62}
63
64impl Worker {
65 pub fn from_settings() -> Result<Worker> {
67 let config = RuntimeConfig::from_settings()?;
68 Worker::from_config(config)
69 }
70
71 pub fn from_config(config: RuntimeConfig) -> Result<Worker> {
73 if RT.get().is_some() {
75 return Err(error!("Worker already initialized"));
76 }
77
78 let rt = RT.try_insert(config.create_runtime()?).map_err(|_| {
82 error!("Failed to create worker; Only a single Worker should ever be created")
83 })?;
84
85 let runtime = Runtime::from_handle(rt.handle().clone())?;
86 Ok(Worker { runtime })
87 }
88
89 pub fn tokio_runtime(&self) -> Result<&'static tokio::runtime::Runtime> {
90 RT.get().ok_or_else(|| error!("Worker not initialized"))
91 }
92
93 pub fn runtime(&self) -> &Runtime {
94 &self.runtime
95 }
96
97 pub fn execute<F, Fut>(self, f: F) -> Result<()>
100 where
101 F: FnOnce(Runtime) -> Fut + Send + 'static,
102 Fut: Future<Output = Result<()>> + Send + 'static,
103 {
104 let runtime = self.runtime;
105 let local_runtime = runtime.clone();
106 let primary = runtime.primary();
107 let secondary = runtime.secondary();
108
109 let timeout = std::env::var(DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT)
110 .ok()
111 .and_then(|s| s.parse::<u64>().ok())
112 .unwrap_or({
113 if cfg!(debug_assertions) {
114 DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_DEBUG
115 } else {
116 DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_RELEASE
117 }
118 });
119
120 INIT.set(Mutex::new(Some(secondary.spawn(async move {
121 tokio::spawn(signal_handler(runtime.cancellation_token.clone()));
123
124 let cancel_token = runtime.child_token();
125 let (mut app_tx, app_rx) = tokio::sync::oneshot::channel::<()>();
126
127 let task: JoinHandle<Result<()>> = primary.spawn(async move {
129 let _rx = app_rx;
130 f(runtime).await
131 });
132
133 tokio::select! {
134 _ = cancel_token.cancelled() => {
135 tracing::debug!("{}", SHUTDOWN_MESSAGE);
136 tracing::debug!("{} {} seconds", SHUTDOWN_TIMEOUT_MESSAGE, timeout);
137 }
138
139 _ = app_tx.closed() => {
140 }
141 };
142
143 let result = tokio::select! {
144 result = task => {
145 result
146 }
147
148 _ = tokio::time::sleep(tokio::time::Duration::from_secs(timeout)) => {
149 tracing::debug!("Application did not shutdown in time; terminating");
150 std::process::exit(911);
151 }
152 }?;
153
154 match &result {
155 Ok(_) => {
156 tracing::debug!("Application shutdown successfully");
157 }
158 Err(e) => {
159 tracing::error!("Application shutdown with error: {:?}", e);
160 }
161 }
162
163 result
164 }))))
165 .map_err(|e| error!("Failed to spawn application task: {:?}", e))?;
166
167 let task = INIT
168 .get()
169 .expect("Application task not initialized")
170 .lock()
171 .unwrap()
172 .take()
173 .expect("Application initialized; but another thread is awaiting it; Worker.execute() can only be called once");
174
175 secondary.block_on(task)??;
176 local_runtime.shutdown();
177 Ok(())
178 }
179}
180
181async fn signal_handler(cancel_token: CancellationToken) -> Result<()> {
183 let ctrl_c = async {
184 signal::ctrl_c().await?;
185 anyhow::Ok(())
186 };
187
188 let sigterm = async {
189 signal::unix::signal(signal::unix::SignalKind::terminate())?
190 .recv()
191 .await;
192 anyhow::Ok(())
193 };
194
195 tokio::select! {
196 _ = ctrl_c => {
197 tracing::info!("Ctrl+C received, starting graceful shutdown");
198 },
199 _ = sigterm => {
200 tracing::info!("SIGTERM received, starting graceful shutdown");
201 },
202 _ = cancel_token.cancelled() => {
203 tracing::debug!("CancellationToken triggered; shutting down");
204 },
205 }
206
207 cancel_token.cancel();
209
210 Ok(())
211}