dynamo_runtime/
runtime.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! The [Runtime] module is the interface for [crate::component::Component]
5//! to access shared resources. These include thread pool, memory allocators and other shared resources.
6//!
7//! The [Runtime] holds the primary [`CancellationToken`] which can be used to terminate all attached
8//! [`crate::component::Component`].
9//!
10//! We expect in the future to offer topologically aware thread and memory resources, but for now the
11//! set of resources is limited to the thread pool and cancellation token.
12//!
13//! Notes: We will need to do an evaluation on what is fully public, what is pub(crate) and what is
14//! private; however, for now we are exposing most objects as fully public while the API is maturing.
15
16use super::utils::GracefulShutdownTracker;
17use super::{Result, Runtime, RuntimeType, error};
18use crate::config::{self, RuntimeConfig};
19
20use futures::Future;
21use once_cell::sync::OnceCell;
22use std::sync::{Arc, atomic::Ordering};
23use tokio::{signal, sync::Mutex, task::JoinHandle};
24
25pub use tokio_util::sync::CancellationToken;
26
27impl Runtime {
28    fn new(runtime: RuntimeType, secondary: Option<RuntimeType>) -> Result<Runtime> {
29        // worker id
30        let id = Arc::new(uuid::Uuid::new_v4().to_string());
31
32        // create a cancellation token
33        let cancellation_token = CancellationToken::new();
34
35        // create endpoint shutdown token as a child of the main token
36        let endpoint_shutdown_token = cancellation_token.child_token();
37
38        // secondary runtime for background ectd/nats tasks
39        let secondary = match secondary {
40            Some(secondary) => secondary,
41            None => {
42                tracing::debug!("Created secondary runtime with single thread");
43                RuntimeType::Shared(Arc::new(RuntimeConfig::single_threaded().create_runtime()?))
44            }
45        };
46
47        Ok(Runtime {
48            id,
49            primary: runtime,
50            secondary,
51            cancellation_token,
52            endpoint_shutdown_token,
53            graceful_shutdown_tracker: Arc::new(GracefulShutdownTracker::new()),
54        })
55    }
56
57    pub fn from_current() -> Result<Runtime> {
58        Runtime::from_handle(tokio::runtime::Handle::current())
59    }
60
61    pub fn from_handle(handle: tokio::runtime::Handle) -> Result<Runtime> {
62        let primary = RuntimeType::External(handle.clone());
63        let secondary = RuntimeType::External(handle);
64        Runtime::new(primary, Some(secondary))
65    }
66
67    /// Create a [`Runtime`] instance from the settings
68    /// See [`config::RuntimeConfig::from_settings`]
69    pub fn from_settings() -> Result<Runtime> {
70        let config = config::RuntimeConfig::from_settings()?;
71        let runtime = Arc::new(config.create_runtime()?);
72        let primary = RuntimeType::Shared(runtime.clone());
73        let secondary = RuntimeType::External(runtime.handle().clone());
74        Runtime::new(primary, Some(secondary))
75    }
76
77    /// Create a [`Runtime`] with two single-threaded async tokio runtime
78    pub fn single_threaded() -> Result<Runtime> {
79        let config = config::RuntimeConfig::single_threaded();
80        let owned = RuntimeType::Shared(Arc::new(config.create_runtime()?));
81        Runtime::new(owned, None)
82    }
83
84    /// Returns the unique identifier for the [`Runtime`]
85    pub fn id(&self) -> &str {
86        &self.id
87    }
88
89    /// Returns a [`tokio::runtime::Handle`] for the primary/application thread pool
90    pub fn primary(&self) -> tokio::runtime::Handle {
91        self.primary.handle()
92    }
93
94    /// Returns a [`tokio::runtime::Handle`] for the secondary/background thread pool
95    pub fn secondary(&self) -> tokio::runtime::Handle {
96        self.secondary.handle()
97    }
98
99    /// Access the primary [`CancellationToken`] for the [`Runtime`]
100    pub fn primary_token(&self) -> CancellationToken {
101        self.cancellation_token.clone()
102    }
103
104    /// Creates a child [`CancellationToken`] tied to the life-cycle of the [`Runtime`]'s endpoint shutdown token.
105    pub fn child_token(&self) -> CancellationToken {
106        self.endpoint_shutdown_token.child_token()
107    }
108
109    /// Get access to the graceful shutdown tracker
110    pub(crate) fn graceful_shutdown_tracker(&self) -> Arc<GracefulShutdownTracker> {
111        self.graceful_shutdown_tracker.clone()
112    }
113
114    /// Shuts down the [`Runtime`] instance
115    pub fn shutdown(&self) {
116        tracing::info!("Runtime shutdown initiated");
117
118        // Spawn the shutdown coordination task BEFORE cancelling tokens
119        let tracker = self.graceful_shutdown_tracker.clone();
120        let main_token = self.cancellation_token.clone();
121        let endpoint_token = self.endpoint_shutdown_token.clone();
122
123        // Use the runtime handle to spawn the task
124        let handle = self.primary();
125        handle.spawn(async move {
126            // Phase 1: Cancel endpoint shutdown token to stop accepting new requests
127            tracing::info!("Phase 1: Cancelling endpoint shutdown token");
128            endpoint_token.cancel();
129
130            // Phase 2: Wait for all graceful endpoints to complete
131            tracing::info!("Phase 2: Waiting for graceful endpoints to complete");
132
133            let count = tracker.get_count();
134            tracing::info!("Active graceful endpoints: {}", count);
135
136            if count != 0 {
137                tracker.wait_for_completion().await;
138            }
139
140            // Phase 3: Now shutdown NATS/ETCD by cancelling the main token
141            tracing::info!(
142                "Phase 3: All graceful endpoints completed, shutting down NATS/ETCD connections"
143            );
144            main_token.cancel();
145        });
146    }
147}
148
149impl RuntimeType {
150    /// Get [`tokio::runtime::Handle`] to runtime
151    pub fn handle(&self) -> tokio::runtime::Handle {
152        match self {
153            RuntimeType::External(rt) => rt.clone(),
154            RuntimeType::Shared(rt) => rt.handle().clone(),
155        }
156    }
157}
158
159impl std::fmt::Debug for RuntimeType {
160    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161        match self {
162            RuntimeType::External(_) => write!(f, "RuntimeType::External"),
163            RuntimeType::Shared(_) => write!(f, "RuntimeType::Shared"),
164        }
165    }
166}