dynamo_runtime/
worker.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! The [Worker] class is a convenience wrapper around the construction of the [Runtime]
5//! and execution of the users application.
6//!
7//! In the future, the [Worker] should probably be moved to a procedural macro similar
8//! to the `#[tokio::main]` attribute, where we might annotate an async main function with
9//! `#[dynamo::main]` or similar.
10//!
11//! The [Worker::execute] method is designed to be called once from main and will block
12//! the calling thread until the application completes or is canceled. The method initialized
13//! the signal handler used to trap `SIGINT` and `SIGTERM` signals and trigger a graceful shutdown.
14//!
15//! On termination, the user application is given a graceful shutdown period of controlled by
16//! the [DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT] environment variable. If the application does not
17//! shutdown in time, the worker will terminate the application with an exit code of 911.
18//!
19//! The default values of [DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT] differ between the development
20//! and release builds. In development, the default is [DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_DEBUG] and
21//! in release, the default is [DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_RELEASE].
22
23use super::{CancellationToken, Runtime, RuntimeConfig};
24
25use futures::Future;
26use once_cell::sync::OnceCell;
27use parking_lot::Mutex;
28use std::time::Duration;
29use tokio::{signal, task::JoinHandle};
30
31static RT: OnceCell<tokio::runtime::Runtime> = OnceCell::new();
32static RTHANDLE: OnceCell<tokio::runtime::Handle> = OnceCell::new();
33static INIT: OnceCell<Mutex<Option<tokio::task::JoinHandle<anyhow::Result<()>>>>> = OnceCell::new();
34
35const SHUTDOWN_MESSAGE: &str =
36    "Application received shutdown signal; attempting to gracefully shutdown";
37const SHUTDOWN_TIMEOUT_MESSAGE: &str =
38    "Use DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT to control the graceful shutdown timeout";
39
40/// Environment variable to control the graceful shutdown timeout
41pub const DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT: &str = "DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT";
42
43/// Default graceful shutdown timeout in seconds in debug mode
44pub const DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_DEBUG: u64 = 5;
45
46/// Default graceful shutdown timeout in seconds in release mode
47pub const DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_RELEASE: u64 = 30;
48
49#[derive(Debug, Clone)]
50pub struct Worker {
51    runtime: Runtime,
52    config: RuntimeConfig,
53}
54
55impl Worker {
56    /// Create a new [`Worker`] instance from [`RuntimeConfig`] settings which is sourced from the environment
57    pub fn from_settings() -> anyhow::Result<Worker> {
58        let config = RuntimeConfig::from_settings()?;
59        Worker::from_config(config)
60    }
61
62    /// Create a new [`Worker`] instance from a provided [`RuntimeConfig`]
63    pub fn from_config(config: RuntimeConfig) -> anyhow::Result<Worker> {
64        // if the runtime is already initialized, return an error
65        if RT.get().is_some() || RTHANDLE.get().is_some() {
66            return Err(anyhow::anyhow!("Worker already initialized"));
67        }
68
69        // create a new runtime and insert it into the OnceCell
70        // there is still a potential race-condition here, two threads cou have passed the first check
71        // but only one will succeed in inserting the runtime
72        let rt = RT.try_insert(config.create_runtime()?).map_err(|_| {
73            anyhow::anyhow!("Failed to create worker; Only a single Worker should ever be created")
74        })?;
75
76        let runtime = Runtime::from_handle(rt.handle().clone())?;
77        Ok(Worker { runtime, config })
78    }
79
80    pub fn runtime_from_existing() -> anyhow::Result<Runtime> {
81        if let Some(rt) = RT.get() {
82            Ok(Runtime::from_handle(rt.handle().clone())?)
83        } else if let Some(rt) = RTHANDLE.get() {
84            Ok(Runtime::from_handle(rt.clone())?)
85        } else {
86            Runtime::from_settings()
87        }
88    }
89
90    pub fn tokio_runtime(&self) -> anyhow::Result<&'static tokio::runtime::Runtime> {
91        RT.get()
92            .ok_or_else(|| anyhow::anyhow!("Worker not initialized"))
93    }
94
95    pub fn runtime(&self) -> &Runtime {
96        &self.runtime
97    }
98
99    pub fn execute<F, Fut>(self, f: F) -> anyhow::Result<()>
100    where
101        F: FnOnce(Runtime) -> Fut + Send + 'static,
102        Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
103    {
104        let runtime = self.runtime.clone();
105        runtime.secondary().block_on(self.execute_internal(f))??;
106        runtime.shutdown();
107        Ok(())
108    }
109
110    pub async fn execute_async<F, Fut>(self, f: F) -> anyhow::Result<()>
111    where
112        F: FnOnce(Runtime) -> Fut + Send + 'static,
113        Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
114    {
115        let runtime = self.runtime.clone();
116        let task = self.execute_internal(f);
117        task.await??;
118        runtime.shutdown();
119        Ok(())
120    }
121
122    /// Executes the provided application/closure on the [`Runtime`].
123    /// This is designed to be called once from main and will block the calling thread until the application completes.
124    fn execute_internal<F, Fut>(self, f: F) -> JoinHandle<anyhow::Result<()>>
125    where
126        F: FnOnce(Runtime) -> Fut + Send + 'static,
127        Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
128    {
129        let runtime = self.runtime.clone();
130        let primary = runtime.primary();
131        let secondary = runtime.secondary();
132
133        let timeout = std::env::var(DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT)
134            .ok()
135            .and_then(|s| s.parse::<u64>().ok())
136            .unwrap_or({
137                if cfg!(debug_assertions) {
138                    DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_DEBUG
139                } else {
140                    DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_RELEASE
141                }
142            });
143
144        INIT.set(Mutex::new(Some(secondary.spawn(async move {
145            // start signal handler
146            tokio::spawn(signal_handler(runtime.primary_token().clone()));
147
148            let cancel_token = runtime.child_token();
149            let (mut app_tx, app_rx) = tokio::sync::oneshot::channel::<()>();
150
151            // spawn a task to run the application
152            let task: JoinHandle<anyhow::Result<()>> = primary.spawn(async move {
153                let _rx = app_rx;
154                f(runtime).await
155            });
156
157            tokio::select! {
158                _ = cancel_token.cancelled() => {
159                    tracing::debug!("{}", SHUTDOWN_MESSAGE);
160                    tracing::debug!("{} {} seconds", SHUTDOWN_TIMEOUT_MESSAGE, timeout);
161                }
162
163                _ = app_tx.closed() => {
164                }
165            };
166
167            let result = tokio::select! {
168                result = task => {
169                    result
170                }
171
172                _ = tokio::time::sleep(tokio::time::Duration::from_secs(timeout)) => {
173                    tracing::debug!("Application did not shutdown in time; terminating");
174                    std::process::exit(911);
175                }
176            }?;
177
178            match &result {
179                Ok(_) => {
180                    tracing::debug!("Application shutdown successfully");
181                }
182                Err(e) => {
183                    tracing::error!("Application shutdown with error: {:?}", e);
184                }
185            }
186
187            result
188        }))))
189        .expect("Failed to spawn application task");
190
191        INIT
192            .get()
193            .expect("Application task not initialized")
194            .lock()
195            .take()
196            .expect("Application initialized; but another thread is awaiting it; Worker.execute() can only be called once")
197    }
198
199    pub fn from_current() -> anyhow::Result<Worker> {
200        if RT.get().is_some() || RTHANDLE.get().is_some() {
201            return Err(anyhow::anyhow!("Worker already initialized"));
202        }
203        let runtime = Runtime::from_current()?;
204        let config = RuntimeConfig::from_settings()?;
205        Ok(Worker { runtime, config })
206    }
207}
208
209/// Catch signals and trigger a shutdown
210async fn signal_handler(cancel_token: CancellationToken) -> anyhow::Result<()> {
211    let ctrl_c = async {
212        signal::ctrl_c().await?;
213        anyhow::Ok(())
214    };
215
216    let sigterm = async {
217        signal::unix::signal(signal::unix::SignalKind::terminate())?
218            .recv()
219            .await;
220        anyhow::Ok(())
221    };
222
223    tokio::select! {
224        _ = ctrl_c => {
225            tracing::info!("Ctrl+C received, starting graceful shutdown");
226        },
227        _ = sigterm => {
228            tracing::info!("SIGTERM received, starting graceful shutdown");
229        },
230        _ = cancel_token.cancelled() => {
231            tracing::debug!("CancellationToken triggered; shutting down");
232        },
233    }
234
235    // trigger a shutdown
236    cancel_token.cancel();
237
238    Ok(())
239}