triton_distributed/runtime.rs
1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! The [Runtime] module is the interface for [crate::component::Component][crate::component::Component]
17//! to access shared resources. These include thread pool, memory allocators and other shared resources.
18//!
19//! The [Runtime] holds the primary [`CancellationToken`] which can be used to terminate all attached
20//! [crate::component::Component][crate::component::Component].
21//!
22//! We expect in the future to offer topologically aware thread and memory resources, but for now the
23//! set of resources is limited to the thread pool and cancellation token.
24//!
25//! Notes: We will need to do an evaluation on what is fully public, what is pub(crate) and what is
26//! private; however, for now we are exposing most objects as fully public while the API is maturing.
27
28use super::{error, log, Result, Runtime, RuntimeType};
29use crate::config::{self, RuntimeConfig};
30
31use futures::Future;
32use once_cell::sync::OnceCell;
33use std::sync::{Arc, Mutex};
34use tokio::{signal, task::JoinHandle};
35
36pub use tokio_util::sync::CancellationToken;
37
38impl Runtime {
39 fn new(runtime: RuntimeType) -> Result<Runtime> {
40 // worker id
41 let id = Arc::new(uuid::Uuid::new_v4().to_string());
42
43 // create a cancellation token
44 let cancellation_token = CancellationToken::new();
45
46 // secondary runtime for background ectd/nats tasks
47 let secondary = RuntimeConfig::single_threaded().create_runtime()?;
48
49 Ok(Runtime {
50 id,
51 primary: runtime,
52 secondary: Arc::new(secondary),
53 cancellation_token,
54 })
55 }
56
57 pub fn from_handle(handle: tokio::runtime::Handle) -> Result<Runtime> {
58 let runtime = RuntimeType::External(handle);
59 Runtime::new(runtime)
60 }
61
62 /// Create a [`Runtime`] instance from the settings
63 /// See [`config::RuntimeConfig::from_settings`]
64 pub fn from_settings() -> Result<Runtime> {
65 let config = config::RuntimeConfig::from_settings()?;
66 let owned = RuntimeType::Shared(Arc::new(config.create_runtime()?));
67 Runtime::new(owned)
68 }
69
70 /// Create a [`Runtime`] with a single-threaded primary async tokio runtime
71 pub fn single_threaded() -> Result<Runtime> {
72 let config = config::RuntimeConfig::single_threaded();
73 let owned = RuntimeType::Shared(Arc::new(config.create_runtime()?));
74 Runtime::new(owned)
75 }
76
77 /// Returns the unique identifier for the [`Runtime`]
78 pub fn id(&self) -> &str {
79 &self.id
80 }
81
82 /// Returns a [`tokio::runtime::Handle`] for the primary/application thread pool
83 pub fn primary(&self) -> tokio::runtime::Handle {
84 self.primary.handle()
85 }
86
87 /// Returns a [`tokio::runtime::Handle`] for the secondary/background thread pool
88 pub fn secondary(&self) -> &Arc<tokio::runtime::Runtime> {
89 &self.secondary
90 }
91
92 /// Access the primary [`CancellationToken`] for the [`Runtime`]
93 pub fn primary_token(&self) -> CancellationToken {
94 self.cancellation_token.clone()
95 }
96
97 /// Creates a child [`CancellationToken`] tied to the life-cycle of the [`Runtime`]'s root [`CancellationToken::child_token`] method.
98 pub fn child_token(&self) -> CancellationToken {
99 self.cancellation_token.child_token()
100 }
101
102 /// Shuts down the [`Runtime`] instance
103 pub fn shutdown(&self) {
104 self.cancellation_token.cancel();
105 }
106}
107
108impl RuntimeType {
109 /// Get [`tokio::runtime::Handle`] to runtime
110 pub fn handle(&self) -> tokio::runtime::Handle {
111 match self {
112 RuntimeType::External(rt) => rt.clone(),
113 RuntimeType::Shared(rt) => rt.handle().clone(),
114 }
115 }
116}
117
118impl std::fmt::Debug for RuntimeType {
119 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120 match self {
121 RuntimeType::External(_) => write!(f, "RuntimeType::External"),
122 RuntimeType::Shared(_) => write!(f, "RuntimeType::Shared"),
123 }
124 }
125}