dynamo_runtime/
runtime.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! The [Runtime] module is the interface for [crate::component::Component]
17//! to access shared resources. These include thread pool, memory allocators and other shared resources.
18//!
19//! The [Runtime] holds the primary [`CancellationToken`] which can be used to terminate all attached
20//! [`crate::component::Component`].
21//!
22//! We expect in the future to offer topologically aware thread and memory resources, but for now the
23//! set of resources is limited to the thread pool and cancellation token.
24//!
25//! Notes: We will need to do an evaluation on what is fully public, what is pub(crate) and what is
26//! private; however, for now we are exposing most objects as fully public while the API is maturing.
27
28use super::{error, Result, Runtime, RuntimeType};
29use crate::config::{self, RuntimeConfig};
30
31use futures::Future;
32use once_cell::sync::OnceCell;
33use std::sync::{Arc, Mutex};
34use tokio::{signal, task::JoinHandle};
35
36pub use tokio_util::sync::CancellationToken;
37
38impl Runtime {
39    fn new(runtime: RuntimeType, secondary: Option<RuntimeType>) -> Result<Runtime> {
40        // worker id
41        let id = Arc::new(uuid::Uuid::new_v4().to_string());
42
43        // create a cancellation token
44        let cancellation_token = CancellationToken::new();
45
46        // secondary runtime for background ectd/nats tasks
47        let secondary = match secondary {
48            Some(secondary) => secondary,
49            None => {
50                RuntimeType::Shared(Arc::new(RuntimeConfig::single_threaded().create_runtime()?))
51            }
52        };
53
54        Ok(Runtime {
55            id,
56            primary: runtime,
57            secondary,
58            cancellation_token,
59        })
60    }
61
62    pub fn from_current() -> Result<Runtime> {
63        let handle = tokio::runtime::Handle::current();
64        let primary = RuntimeType::External(handle.clone());
65        let secondary = RuntimeType::External(handle);
66        Runtime::new(primary, Some(secondary))
67    }
68
69    pub fn from_handle(handle: tokio::runtime::Handle) -> Result<Runtime> {
70        let runtime = RuntimeType::External(handle);
71        Runtime::new(runtime, None)
72    }
73
74    /// Create a [`Runtime`] instance from the settings
75    /// See [`config::RuntimeConfig::from_settings`]
76    pub fn from_settings() -> Result<Runtime> {
77        let config = config::RuntimeConfig::from_settings()?;
78        let owned = RuntimeType::Shared(Arc::new(config.create_runtime()?));
79        Runtime::new(owned, None)
80    }
81
82    /// Create a [`Runtime`] with a single-threaded primary async tokio runtime
83    pub fn single_threaded() -> Result<Runtime> {
84        let config = config::RuntimeConfig::single_threaded();
85        let owned = RuntimeType::Shared(Arc::new(config.create_runtime()?));
86        Runtime::new(owned, None)
87    }
88
89    /// Returns the unique identifier for the [`Runtime`]
90    pub fn id(&self) -> &str {
91        &self.id
92    }
93
94    /// Returns a [`tokio::runtime::Handle`] for the primary/application thread pool
95    pub fn primary(&self) -> tokio::runtime::Handle {
96        self.primary.handle()
97    }
98
99    /// Returns a [`tokio::runtime::Handle`] for the secondary/background thread pool
100    pub fn secondary(&self) -> tokio::runtime::Handle {
101        self.secondary.handle()
102    }
103
104    /// Access the primary [`CancellationToken`] for the [`Runtime`]
105    pub fn primary_token(&self) -> CancellationToken {
106        self.cancellation_token.clone()
107    }
108
109    /// Creates a child [`CancellationToken`] tied to the life-cycle of the [`Runtime`]'s root [`CancellationToken::child_token`] method.
110    pub fn child_token(&self) -> CancellationToken {
111        self.cancellation_token.child_token()
112    }
113
114    /// Shuts down the [`Runtime`] instance
115    pub fn shutdown(&self) {
116        self.cancellation_token.cancel();
117    }
118}
119
120impl RuntimeType {
121    /// Get [`tokio::runtime::Handle`] to runtime
122    pub fn handle(&self) -> tokio::runtime::Handle {
123        match self {
124            RuntimeType::External(rt) => rt.clone(),
125            RuntimeType::Shared(rt) => rt.handle().clone(),
126        }
127    }
128}
129
130impl std::fmt::Debug for RuntimeType {
131    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132        match self {
133            RuntimeType::External(_) => write!(f, "RuntimeType::External"),
134            RuntimeType::Shared(_) => write!(f, "RuntimeType::Shared"),
135        }
136    }
137}