triton_distributed/
worker.rs1use super::{error, log, CancellationToken, Result, Runtime, RuntimeConfig};
36
37use futures::Future;
38use once_cell::sync::OnceCell;
39use std::{sync::Mutex, time::Duration};
40use tokio::{signal, task::JoinHandle};
41
42static RT: OnceCell<tokio::runtime::Runtime> = OnceCell::new();
43static INIT: OnceCell<Mutex<Option<tokio::task::JoinHandle<Result<()>>>>> = OnceCell::new();
44
45const SHUTDOWN_MESSAGE: &str =
46 "Application received shutdown signal; attempting to gracefully shutdown";
47const SHUTDOWN_TIMEOUT_MESSAGE: &str =
48 "Use TRITON_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT to control the graceful shutdown timeout";
49
50pub const TRITON_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT: &str = "TRITON_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT";
52
53pub const DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_DEBUG: u64 = 5;
55
56pub const DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_RELEASE: u64 = 30;
58
59pub struct Worker {
60 runtime: Runtime,
61}
62
63impl Worker {
64 pub fn from_settings() -> Result<Worker> {
66 let config = RuntimeConfig::from_settings()?;
67 Worker::from_config(config)
68 }
69
70 pub fn from_config(config: RuntimeConfig) -> Result<Worker> {
72 if RT.get().is_some() {
74 return Err(error!("Worker already initialized"));
75 }
76
77 let rt = RT.try_insert(config.create_runtime()?).map_err(|_| {
81 error!("Failed to create worker; Only a single Worker should ever be created")
82 })?;
83
84 let runtime = Runtime::from_handle(rt.handle().clone())?;
85 Ok(Worker { runtime })
86 }
87
88 pub fn tokio_runtime(&self) -> Result<&'static tokio::runtime::Runtime> {
89 RT.get().ok_or_else(|| error!("Worker not initialized"))
90 }
91
92 pub fn runtime(&self) -> &Runtime {
93 &self.runtime
94 }
95
96 pub fn execute<F, Fut>(self, f: F) -> Result<()>
99 where
100 F: FnOnce(Runtime) -> Fut + Send + 'static,
101 Fut: Future<Output = Result<()>> + Send + 'static,
102 {
103 let runtime = self.runtime;
104 let primary = runtime.primary();
105 let secondary = runtime.secondary.clone();
106
107 let timeout = std::env::var(TRITON_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT)
108 .ok()
109 .and_then(|s| s.parse::<u64>().ok())
110 .unwrap_or({
111 if cfg!(debug_assertions) {
112 DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_DEBUG
113 } else {
114 DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_RELEASE
115 }
116 });
117
118 INIT.set(Mutex::new(Some(secondary.spawn(async move {
119 tokio::spawn(signal_handler(runtime.cancellation_token.clone()));
121
122 let cancel_token = runtime.child_token();
123 let (mut app_tx, app_rx) = tokio::sync::oneshot::channel::<()>();
124
125 let task: JoinHandle<Result<()>> = primary.spawn(async move {
127 let _rx = app_rx;
128 f(runtime).await
129 });
130
131 tokio::select! {
132 _ = cancel_token.cancelled() => {
133 eprintln!("{}", SHUTDOWN_MESSAGE);
134 eprintln!("{} {} seconds", SHUTDOWN_TIMEOUT_MESSAGE, timeout);
135 }
136
137 _ = app_tx.closed() => {
138 }
139 };
140
141 let result = tokio::select! {
142 result = task => {
143 result
144 }
145
146 _ = tokio::time::sleep(tokio::time::Duration::from_secs(timeout)) => {
147 eprintln!("Application did not shutdown in time; terminating");
148 std::process::exit(911);
149 }
150 }?;
151
152 match &result {
153 Ok(_) => {
154 log::info!("Application shutdown successfully");
155 }
156 Err(e) => {
157 log::error!("Application shutdown with error: {:?}", e);
158 }
159 }
160
161 result
162 }))))
163 .map_err(|e| error!("Failed to spawn application task: {:?}", e))?;
164
165 let task = INIT
166 .get()
167 .expect("Application task not initialized")
168 .lock()
169 .unwrap()
170 .take()
171 .expect("Application initialized; but another thread is awaiting it; Worker.execute() can only be called once");
172
173 secondary.block_on(task)?
174 }
175}
176
177async fn signal_handler(cancel_token: CancellationToken) -> Result<()> {
179 let ctrl_c = async {
180 signal::ctrl_c().await?;
181 anyhow::Ok(())
182 };
183
184 let sigterm = async {
185 signal::unix::signal(signal::unix::SignalKind::terminate())?
186 .recv()
187 .await;
188 anyhow::Ok(())
189 };
190
191 tokio::select! {
192 _ = ctrl_c => {
193 tracing::info!("Ctrl+C received, starting graceful shutdown");
194 },
195 _ = sigterm => {
196 tracing::info!("SIGTERM received, starting graceful shutdown");
197 },
198 _ = cancel_token.cancelled() => {
199 tracing::info!("CancellationToken triggered; shutting down");
200 },
201 }
202
203 cancel_token.cancel();
205
206 Ok(())
207}