dynamo_runtime/
worker.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! The [Worker] class is a convenience wrapper around the construction of the [Runtime]
17//! and execution of the users application.
18//!
19//! In the future, the [Worker] should probably be moved to a procedural macro similar
20//! to the `#[tokio::main]` attribute, where we might annotate an async main function with
21//! `#[dynamo::main]` or similar.
22//!
23//! The [Worker::execute] method is designed to be called once from main and will block
24//! the calling thread until the application completes or is canceled. The method initialized
25//! the signal handler used to trap `SIGINT` and `SIGTERM` signals and trigger a graceful shutdown.
26//!
27//! On termination, the user application is given a graceful shutdown period of controlled by
28//! the [DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT] environment variable. If the application does not
29//! shutdown in time, the worker will terminate the application with an exit code of 911.
30//!
31//! The default values of [DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT] differ between the development
32//! and release builds. In development, the default is [DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_DEBUG] and
33//! in release, the default is [DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_RELEASE].
34
35use super::{error, CancellationToken, Result, Runtime, RuntimeConfig};
36
37use futures::Future;
38use once_cell::sync::OnceCell;
39use std::{sync::Mutex, time::Duration};
40use tokio::{signal, task::JoinHandle};
41
42static RT: OnceCell<tokio::runtime::Runtime> = OnceCell::new();
43static INIT: OnceCell<Mutex<Option<tokio::task::JoinHandle<Result<()>>>>> = OnceCell::new();
44
45const SHUTDOWN_MESSAGE: &str =
46    "Application received shutdown signal; attempting to gracefully shutdown";
47const SHUTDOWN_TIMEOUT_MESSAGE: &str =
48    "Use DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT to control the graceful shutdown timeout";
49
50/// Environment variable to control the graceful shutdown timeout
51pub const DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT: &str = "DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT";
52
53/// Default graceful shutdown timeout in seconds in debug mode
54pub const DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_DEBUG: u64 = 5;
55
56/// Default graceful shutdown timeout in seconds in release mode
57pub const DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_RELEASE: u64 = 30;
58
59#[derive(Debug, Clone)]
60pub struct Worker {
61    runtime: Runtime,
62}
63
64impl Worker {
65    /// Create a new [`Worker`] instance from [`RuntimeConfig`] settings which is sourced from the environment
66    pub fn from_settings() -> Result<Worker> {
67        let config = RuntimeConfig::from_settings()?;
68        Worker::from_config(config)
69    }
70
71    /// Create a new [`Worker`] instance from a provided [`RuntimeConfig`]
72    pub fn from_config(config: RuntimeConfig) -> Result<Worker> {
73        // if the runtime is already initialized, return an error
74        if RT.get().is_some() {
75            return Err(error!("Worker already initialized"));
76        }
77
78        // create a new runtime and insert it into the OnceCell
79        // there is still a potential race-condition here, two threads cou have passed the first check
80        // but only one will succeed in inserting the runtime
81        let rt = RT.try_insert(config.create_runtime()?).map_err(|_| {
82            error!("Failed to create worker; Only a single Worker should ever be created")
83        })?;
84
85        let runtime = Runtime::from_handle(rt.handle().clone())?;
86        Ok(Worker { runtime })
87    }
88
89    pub fn tokio_runtime(&self) -> Result<&'static tokio::runtime::Runtime> {
90        RT.get().ok_or_else(|| error!("Worker not initialized"))
91    }
92
93    pub fn runtime(&self) -> &Runtime {
94        &self.runtime
95    }
96
97    /// Executes the provided application/closure on the [`Runtime`].
98    /// This is designed to be called once from main and will block the calling thread until the application completes.
99    pub fn execute<F, Fut>(self, f: F) -> Result<()>
100    where
101        F: FnOnce(Runtime) -> Fut + Send + 'static,
102        Fut: Future<Output = Result<()>> + Send + 'static,
103    {
104        let runtime = self.runtime;
105        let local_runtime = runtime.clone();
106        let primary = runtime.primary();
107        let secondary = runtime.secondary();
108
109        let timeout = std::env::var(DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT)
110            .ok()
111            .and_then(|s| s.parse::<u64>().ok())
112            .unwrap_or({
113                if cfg!(debug_assertions) {
114                    DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_DEBUG
115                } else {
116                    DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_RELEASE
117                }
118            });
119
120        INIT.set(Mutex::new(Some(secondary.spawn(async move {
121            // start signal handler
122            tokio::spawn(signal_handler(runtime.cancellation_token.clone()));
123
124            let cancel_token = runtime.child_token();
125            let (mut app_tx, app_rx) = tokio::sync::oneshot::channel::<()>();
126
127            // spawn a task to run the application
128            let task: JoinHandle<Result<()>> = primary.spawn(async move {
129                let _rx = app_rx;
130                f(runtime).await
131            });
132
133            tokio::select! {
134                _ = cancel_token.cancelled() => {
135                    tracing::debug!("{}", SHUTDOWN_MESSAGE);
136                    tracing::debug!("{} {} seconds", SHUTDOWN_TIMEOUT_MESSAGE, timeout);
137                }
138
139                _ = app_tx.closed() => {
140                }
141            };
142
143            let result = tokio::select! {
144                result = task => {
145                    result
146                }
147
148                _ = tokio::time::sleep(tokio::time::Duration::from_secs(timeout)) => {
149                    tracing::debug!("Application did not shutdown in time; terminating");
150                    std::process::exit(911);
151                }
152            }?;
153
154            match &result {
155                Ok(_) => {
156                    tracing::debug!("Application shutdown successfully");
157                }
158                Err(e) => {
159                    tracing::error!("Application shutdown with error: {:?}", e);
160                }
161            }
162
163            result
164        }))))
165        .map_err(|e| error!("Failed to spawn application task: {:?}", e))?;
166
167        let task = INIT
168            .get()
169            .expect("Application task not initialized")
170            .lock()
171            .unwrap()
172            .take()
173            .expect("Application initialized; but another thread is awaiting it; Worker.execute() can only be called once");
174
175        secondary.block_on(task)??;
176        local_runtime.shutdown();
177        Ok(())
178    }
179}
180
181/// Catch signals and trigger a shutdown
182async fn signal_handler(cancel_token: CancellationToken) -> Result<()> {
183    let ctrl_c = async {
184        signal::ctrl_c().await?;
185        anyhow::Ok(())
186    };
187
188    let sigterm = async {
189        signal::unix::signal(signal::unix::SignalKind::terminate())?
190            .recv()
191            .await;
192        anyhow::Ok(())
193    };
194
195    tokio::select! {
196        _ = ctrl_c => {
197            tracing::info!("Ctrl+C received, starting graceful shutdown");
198        },
199        _ = sigterm => {
200            tracing::info!("SIGTERM received, starting graceful shutdown");
201        },
202        _ = cancel_token.cancelled() => {
203            tracing::debug!("CancellationToken triggered; shutting down");
204        },
205    }
206
207    // trigger a shutdown
208    cancel_token.cancel();
209
210    Ok(())
211}