triton_distributed/
runtime.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! The [Runtime] module is the interface for [crate::component::Component][crate::component::Component]
17//! to access shared resources. These include thread pool, memory allocators and other shared resources.
18//!
19//! The [Runtime] holds the primary [`CancellationToken`] which can be used to terminate all attached
20//! [crate::component::Component][crate::component::Component].
21//!
22//! We expect in the future to offer topologically aware thread and memory resources, but for now the
23//! set of resources is limited to the thread pool and cancellation token.
24//!
25//! Notes: We will need to do an evaluation on what is fully public, what is pub(crate) and what is
26//! private; however, for now we are exposing most objects as fully public while the API is maturing.
27
28use super::{error, log, Result, Runtime, RuntimeType};
29use crate::config::{self, RuntimeConfig};
30
31use futures::Future;
32use once_cell::sync::OnceCell;
33use std::sync::{Arc, Mutex};
34use tokio::{signal, task::JoinHandle};
35
36pub use tokio_util::sync::CancellationToken;
37
38impl Runtime {
39    fn new(runtime: RuntimeType) -> Result<Runtime> {
40        // worker id
41        let id = Arc::new(uuid::Uuid::new_v4().to_string());
42
43        // create a cancellation token
44        let cancellation_token = CancellationToken::new();
45
46        // secondary runtime for background ectd/nats tasks
47        let secondary = RuntimeConfig::single_threaded().create_runtime()?;
48
49        Ok(Runtime {
50            id,
51            primary: runtime,
52            secondary: Arc::new(secondary),
53            cancellation_token,
54        })
55    }
56
57    pub fn from_handle(handle: tokio::runtime::Handle) -> Result<Runtime> {
58        let runtime = RuntimeType::External(handle);
59        Runtime::new(runtime)
60    }
61
62    /// Create a [`Runtime`] instance from the settings
63    /// See [`config::RuntimeConfig::from_settings`]
64    pub fn from_settings() -> Result<Runtime> {
65        let config = config::RuntimeConfig::from_settings()?;
66        let owned = RuntimeType::Shared(Arc::new(config.create_runtime()?));
67        Runtime::new(owned)
68    }
69
70    /// Create a [`Runtime`] with a single-threaded primary async tokio runtime
71    pub fn single_threaded() -> Result<Runtime> {
72        let config = config::RuntimeConfig::single_threaded();
73        let owned = RuntimeType::Shared(Arc::new(config.create_runtime()?));
74        Runtime::new(owned)
75    }
76
77    /// Returns the unique identifier for the [`Runtime`]
78    pub fn id(&self) -> &str {
79        &self.id
80    }
81
82    /// Returns a [`tokio::runtime::Handle`] for the primary/application thread pool
83    pub fn primary(&self) -> tokio::runtime::Handle {
84        self.primary.handle()
85    }
86
87    /// Returns a [`tokio::runtime::Handle`] for the secondary/background thread pool
88    pub fn secondary(&self) -> &Arc<tokio::runtime::Runtime> {
89        &self.secondary
90    }
91
92    /// Access the primary [`CancellationToken`] for the [`Runtime`]
93    pub fn primary_token(&self) -> CancellationToken {
94        self.cancellation_token.clone()
95    }
96
97    /// Creates a child [`CancellationToken`] tied to the life-cycle of the [`Runtime`]'s root [`CancellationToken::child_token`] method.
98    pub fn child_token(&self) -> CancellationToken {
99        self.cancellation_token.child_token()
100    }
101
102    /// Shuts down the [`Runtime`] instance
103    pub fn shutdown(&self) {
104        self.cancellation_token.cancel();
105    }
106}
107
108impl RuntimeType {
109    /// Get [`tokio::runtime::Handle`] to runtime
110    pub fn handle(&self) -> tokio::runtime::Handle {
111        match self {
112            RuntimeType::External(rt) => rt.clone(),
113            RuntimeType::Shared(rt) => rt.handle().clone(),
114        }
115    }
116}
117
118impl std::fmt::Debug for RuntimeType {
119    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120        match self {
121            RuntimeType::External(_) => write!(f, "RuntimeType::External"),
122            RuntimeType::Shared(_) => write!(f, "RuntimeType::Shared"),
123        }
124    }
125}