dynamo_runtime/runtime.rs
1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! The [Runtime] module is the interface for [crate::component::Component]
17//! to access shared resources. These include thread pool, memory allocators and other shared resources.
18//!
19//! The [Runtime] holds the primary [`CancellationToken`] which can be used to terminate all attached
20//! [`crate::component::Component`].
21//!
22//! We expect in the future to offer topologically aware thread and memory resources, but for now the
23//! set of resources is limited to the thread pool and cancellation token.
24//!
25//! Notes: We will need to do an evaluation on what is fully public, what is pub(crate) and what is
26//! private; however, for now we are exposing most objects as fully public while the API is maturing.
27
28use super::{error, Result, Runtime, RuntimeType};
29use crate::config::{self, RuntimeConfig};
30
31use futures::Future;
32use once_cell::sync::OnceCell;
33use std::sync::{Arc, Mutex};
34use tokio::{signal, task::JoinHandle};
35
36pub use tokio_util::sync::CancellationToken;
37
38impl Runtime {
39 fn new(runtime: RuntimeType, secondary: Option<RuntimeType>) -> Result<Runtime> {
40 // worker id
41 let id = Arc::new(uuid::Uuid::new_v4().to_string());
42
43 // create a cancellation token
44 let cancellation_token = CancellationToken::new();
45
46 // secondary runtime for background ectd/nats tasks
47 let secondary = match secondary {
48 Some(secondary) => secondary,
49 None => {
50 RuntimeType::Shared(Arc::new(RuntimeConfig::single_threaded().create_runtime()?))
51 }
52 };
53
54 Ok(Runtime {
55 id,
56 primary: runtime,
57 secondary,
58 cancellation_token,
59 })
60 }
61
62 pub fn from_current() -> Result<Runtime> {
63 let handle = tokio::runtime::Handle::current();
64 let primary = RuntimeType::External(handle.clone());
65 let secondary = RuntimeType::External(handle);
66 Runtime::new(primary, Some(secondary))
67 }
68
69 pub fn from_handle(handle: tokio::runtime::Handle) -> Result<Runtime> {
70 let runtime = RuntimeType::External(handle);
71 Runtime::new(runtime, None)
72 }
73
74 /// Create a [`Runtime`] instance from the settings
75 /// See [`config::RuntimeConfig::from_settings`]
76 pub fn from_settings() -> Result<Runtime> {
77 let config = config::RuntimeConfig::from_settings()?;
78 let owned = RuntimeType::Shared(Arc::new(config.create_runtime()?));
79 Runtime::new(owned, None)
80 }
81
82 /// Create a [`Runtime`] with a single-threaded primary async tokio runtime
83 pub fn single_threaded() -> Result<Runtime> {
84 let config = config::RuntimeConfig::single_threaded();
85 let owned = RuntimeType::Shared(Arc::new(config.create_runtime()?));
86 Runtime::new(owned, None)
87 }
88
89 /// Returns the unique identifier for the [`Runtime`]
90 pub fn id(&self) -> &str {
91 &self.id
92 }
93
94 /// Returns a [`tokio::runtime::Handle`] for the primary/application thread pool
95 pub fn primary(&self) -> tokio::runtime::Handle {
96 self.primary.handle()
97 }
98
99 /// Returns a [`tokio::runtime::Handle`] for the secondary/background thread pool
100 pub fn secondary(&self) -> tokio::runtime::Handle {
101 self.secondary.handle()
102 }
103
104 /// Access the primary [`CancellationToken`] for the [`Runtime`]
105 pub fn primary_token(&self) -> CancellationToken {
106 self.cancellation_token.clone()
107 }
108
109 /// Creates a child [`CancellationToken`] tied to the life-cycle of the [`Runtime`]'s root [`CancellationToken::child_token`] method.
110 pub fn child_token(&self) -> CancellationToken {
111 self.cancellation_token.child_token()
112 }
113
114 /// Shuts down the [`Runtime`] instance
115 pub fn shutdown(&self) {
116 self.cancellation_token.cancel();
117 }
118}
119
120impl RuntimeType {
121 /// Get [`tokio::runtime::Handle`] to runtime
122 pub fn handle(&self) -> tokio::runtime::Handle {
123 match self {
124 RuntimeType::External(rt) => rt.clone(),
125 RuntimeType::Shared(rt) => rt.handle().clone(),
126 }
127 }
128}
129
130impl std::fmt::Debug for RuntimeType {
131 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132 match self {
133 RuntimeType::External(_) => write!(f, "RuntimeType::External"),
134 RuntimeType::Shared(_) => write!(f, "RuntimeType::Shared"),
135 }
136 }
137}