Skip to main content

dynamo_runtime/
worker.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! The [Worker] class is a convenience wrapper around the construction of the [Runtime]
5//! and execution of the users application.
6//!
7//! In the future, the [Worker] should probably be moved to a procedural macro similar
8//! to the `#[tokio::main]` attribute, where we might annotate an async main function with
9//! `#[dynamo::main]` or similar.
10//!
11//! The [Worker::execute] method is designed to be called once from main and will block
12//! the calling thread until the application completes or is canceled. The method initialized
13//! the signal handler used to trap `SIGINT` and `SIGTERM` signals and trigger a graceful shutdown.
14//!
15//! On termination, the user application is given a graceful shutdown period of controlled by
16//! the [DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT] environment variable. If the application does not
17//! shutdown in time, the worker will terminate the application with an exit code of 911.
18//!
19//! The default values of [DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT] differ between the development
20//! and release builds. In development, the default is [DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_DEBUG] and
21//! in release, the default is [DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_RELEASE].
22
23use super::{CancellationToken, Runtime, RuntimeConfig};
24
25use futures::Future;
26use once_cell::sync::OnceCell;
27use parking_lot::Mutex;
28use std::time::Duration;
29use tokio::{signal, task::JoinHandle};
30
31static RT: OnceCell<tokio::runtime::Runtime> = OnceCell::new();
32static RTHANDLE: OnceCell<tokio::runtime::Handle> = OnceCell::new();
33static INIT: OnceCell<Mutex<Option<tokio::task::JoinHandle<anyhow::Result<()>>>>> = OnceCell::new();
34
35use crate::config::environment_names::worker as env_worker;
36
37const SHUTDOWN_MESSAGE: &str =
38    "Application received shutdown signal; attempting to gracefully shutdown";
39const SHUTDOWN_TIMEOUT_MESSAGE: &str =
40    "Use DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT to control the graceful shutdown timeout";
41
42/// Default graceful shutdown timeout in seconds in debug mode
43pub const DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_DEBUG: u64 = 5;
44
45/// Default graceful shutdown timeout in seconds in release mode
46pub const DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_RELEASE: u64 = 30;
47
48#[derive(Debug, Clone)]
49pub struct Worker {
50    runtime: Runtime,
51    config: RuntimeConfig,
52}
53
54impl Worker {
55    /// Create a new [`Worker`] instance from [`RuntimeConfig`] settings which is sourced from the environment
56    pub fn from_settings() -> anyhow::Result<Worker> {
57        let config = RuntimeConfig::from_settings()?;
58        Worker::from_config(config)
59    }
60
61    /// Create a new [`Worker`] instance from a provided [`RuntimeConfig`]
62    pub fn from_config(config: RuntimeConfig) -> anyhow::Result<Worker> {
63        // if the runtime is already initialized, return an error
64        if RT.get().is_some() || RTHANDLE.get().is_some() {
65            return Err(anyhow::anyhow!("Worker already initialized"));
66        }
67
68        // create a new runtime and insert it into the OnceCell
69        // there is still a potential race-condition here, two threads cou have passed the first check
70        // but only one will succeed in inserting the runtime
71        let rt = RT.try_insert(config.create_runtime()?).map_err(|_| {
72            anyhow::anyhow!("Failed to create worker; Only a single Worker should ever be created")
73        })?;
74
75        let runtime = Runtime::from_handle(rt.handle().clone())?;
76        Ok(Worker { runtime, config })
77    }
78
79    pub fn runtime_from_existing() -> anyhow::Result<Runtime> {
80        if let Some(rt) = RT.get() {
81            Ok(Runtime::from_handle(rt.handle().clone())?)
82        } else if let Some(rt) = RTHANDLE.get() {
83            Ok(Runtime::from_handle(rt.clone())?)
84        } else {
85            // Fallback: build a fresh Runtime and publish its handle so
86            // subsequent `has_existing_runtime()` callers (e.g. the
87            // backend Python `Worker`) can tell that a runtime now
88            // exists in the process — even though we didn't go through
89            // `Worker::from_*`. Without this the cell stays empty and
90            // a later `Worker` would falsely conclude it owns the
91            // runtime and create a second one.
92            let runtime = Runtime::from_settings()?;
93            let _ = RTHANDLE.set(runtime.primary());
94            Ok(runtime)
95        }
96    }
97
98    /// Whether a process-wide runtime has already been initialized
99    /// (`RT` populated by `Worker::from_*`, or `RTHANDLE` populated by
100    /// `runtime_from_existing`'s fallback / external callers).
101    /// Does *not* fall back to `Runtime::from_settings()`.
102    pub fn has_existing_runtime() -> bool {
103        RT.get().is_some() || RTHANDLE.get().is_some()
104    }
105
106    pub fn tokio_runtime(&self) -> anyhow::Result<&'static tokio::runtime::Runtime> {
107        RT.get()
108            .ok_or_else(|| anyhow::anyhow!("Worker not initialized"))
109    }
110
111    pub fn runtime(&self) -> &Runtime {
112        &self.runtime
113    }
114
115    pub fn execute<F, Fut>(self, f: F) -> anyhow::Result<()>
116    where
117        F: FnOnce(Runtime) -> Fut + Send + 'static,
118        Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
119    {
120        let runtime = self.runtime.clone();
121        runtime.secondary().block_on(self.execute_internal(f))??;
122        runtime.shutdown();
123        Ok(())
124    }
125
126    pub async fn execute_async<F, Fut>(self, f: F) -> anyhow::Result<()>
127    where
128        F: FnOnce(Runtime) -> Fut + Send + 'static,
129        Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
130    {
131        let runtime = self.runtime.clone();
132        let task = self.execute_internal(f);
133        task.await??;
134        runtime.shutdown();
135        Ok(())
136    }
137
138    /// Executes the provided application/closure on the [`Runtime`].
139    /// This is designed to be called once from main and will block the calling thread until the application completes.
140    fn execute_internal<F, Fut>(self, f: F) -> JoinHandle<anyhow::Result<()>>
141    where
142        F: FnOnce(Runtime) -> Fut + Send + 'static,
143        Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
144    {
145        let runtime = self.runtime.clone();
146        let primary = runtime.primary();
147        let secondary = runtime.secondary();
148
149        let timeout = std::env::var(env_worker::DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT)
150            .ok()
151            .and_then(|s| s.parse::<u64>().ok())
152            .unwrap_or({
153                if cfg!(debug_assertions) {
154                    DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_DEBUG
155                } else {
156                    DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_RELEASE
157                }
158            });
159
160        INIT.set(Mutex::new(Some(secondary.spawn(async move {
161            // start signal handler
162            tokio::spawn(signal_handler(runtime.primary_token().clone()));
163
164            let cancel_token = runtime.child_token();
165            let (mut app_tx, app_rx) = tokio::sync::oneshot::channel::<()>();
166
167            // spawn a task to run the application
168            let task: JoinHandle<anyhow::Result<()>> = primary.spawn(async move {
169                let _rx = app_rx;
170                f(runtime).await
171            });
172
173            tokio::select! {
174                _ = cancel_token.cancelled() => {
175                    tracing::debug!("{SHUTDOWN_MESSAGE}");
176                    tracing::debug!("{} {} seconds", SHUTDOWN_TIMEOUT_MESSAGE, timeout);
177                }
178
179                _ = app_tx.closed() => {
180                }
181            };
182
183            let result = tokio::select! {
184                result = task => {
185                    result
186                }
187
188                _ = tokio::time::sleep(tokio::time::Duration::from_secs(timeout)) => {
189                    tracing::debug!("Application did not shutdown in time; terminating");
190                    std::process::exit(911);
191                }
192            }?;
193
194            match &result {
195                Ok(_) => {
196                    tracing::debug!("Application shutdown successfully");
197                }
198                Err(e) => {
199                    tracing::error!("Application shutdown with error: {:?}", e);
200                }
201            }
202
203            result
204        }))))
205        .expect("Failed to spawn application task");
206
207        INIT
208            .get()
209            .expect("Application task not initialized")
210            .lock()
211            .take()
212            .expect("Application initialized; but another thread is awaiting it; Worker.execute() can only be called once")
213    }
214
215    pub fn from_current() -> anyhow::Result<Worker> {
216        if RT.get().is_some() || RTHANDLE.get().is_some() {
217            return Err(anyhow::anyhow!("Worker already initialized"));
218        }
219        let runtime = Runtime::from_current()?;
220        let config = RuntimeConfig::from_settings()?;
221        Ok(Worker { runtime, config })
222    }
223}
224
225/// Catch signals and trigger a shutdown
226async fn signal_handler(cancel_token: CancellationToken) -> anyhow::Result<()> {
227    let ctrl_c = async {
228        signal::ctrl_c().await?;
229        anyhow::Ok(())
230    };
231
232    let sigterm = async {
233        signal::unix::signal(signal::unix::SignalKind::terminate())?
234            .recv()
235            .await;
236        anyhow::Ok(())
237    };
238
239    tokio::select! {
240        _ = ctrl_c => {
241            tracing::info!("Ctrl+C received, starting graceful shutdown");
242        },
243        _ = sigterm => {
244            tracing::info!("SIGTERM received, starting graceful shutdown");
245        },
246        _ = cancel_token.cancelled() => {
247            tracing::debug!("CancellationToken triggered; shutting down");
248        },
249    }
250
251    // trigger a shutdown
252    cancel_token.cancel();
253
254    Ok(())
255}