dynamo_runtime/
lib.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Dynamo
5
6#![allow(dead_code)]
7#![allow(unused_imports)]
8
9use std::{
10    collections::HashMap,
11    sync::{Arc, OnceLock, Weak},
12};
13
14pub use anyhow::{
15    Context as ErrorContext, Error, Ok as OK, Result, anyhow as error, bail as raise,
16};
17
18use async_once_cell::OnceCell;
19
20mod config;
21pub use config::RuntimeConfig;
22
23pub mod component;
24pub mod compute;
25pub mod discovery;
26pub mod engine;
27pub mod health_check;
28pub mod system_status_server;
29pub use system_status_server::SystemStatusServerInfo;
30pub mod instances;
31pub mod logging;
32pub mod metrics;
33pub mod pipeline;
34pub mod prelude;
35pub mod protocols;
36pub mod runnable;
37pub mod runtime;
38pub mod service;
39pub mod slug;
40pub mod storage;
41pub mod system_health;
42pub mod traits;
43pub mod transports;
44pub mod utils;
45pub mod worker;
46
47pub mod distributed;
48pub use distributed::distributed_test_utils;
49pub use futures::stream;
50pub use system_health::{HealthCheckTarget, SystemHealth};
51pub use tokio_util::sync::CancellationToken;
52pub use worker::Worker;
53
54use crate::{
55    metrics::prometheus_names::distributed_runtime, storage::key_value_store::KeyValueStore,
56};
57
58use component::{Endpoint, InstanceSource};
59use utils::GracefulShutdownTracker;
60
61use config::HealthStatus;
62
63/// Types of Tokio runtimes that can be used to construct a Dynamo [Runtime].
64#[derive(Clone)]
65enum RuntimeType {
66    Shared(Arc<tokio::runtime::Runtime>),
67    External(tokio::runtime::Handle),
68}
69
70/// Local [Runtime] which provides access to shared resources local to the physical node/machine.
71#[derive(Debug, Clone)]
72pub struct Runtime {
73    id: Arc<String>,
74    primary: RuntimeType,
75    secondary: RuntimeType,
76    cancellation_token: CancellationToken,
77    endpoint_shutdown_token: CancellationToken,
78    graceful_shutdown_tracker: Arc<GracefulShutdownTracker>,
79    compute_pool: Option<Arc<compute::ComputePool>>,
80    block_in_place_permits: Option<Arc<tokio::sync::Semaphore>>,
81}
82
83/// Type alias for runtime callback functions to reduce complexity
84///
85/// This type represents an Arc-wrapped callback function that can be:
86/// - Shared efficiently across multiple threads and contexts
87/// - Cloned without duplicating the underlying closure
88/// - Used in generic contexts requiring 'static lifetime
89///
90/// The Arc wrapper is included in the type to make sharing explicit.
91type PrometheusUpdateCallback = Arc<dyn Fn() -> anyhow::Result<()> + Send + Sync + 'static>;
92
93/// Type alias for exposition text callback functions that return Prometheus text
94type PrometheusExpositionFormatCallback =
95    Arc<dyn Fn() -> anyhow::Result<String> + Send + Sync + 'static>;
96
97/// Structure to hold Prometheus registries and associated callbacks for a given hierarchy
98pub struct MetricsRegistryEntry {
99    /// The Prometheus registry for this prefix
100    pub prometheus_registry: prometheus::Registry,
101    /// List of update callbacks invoked before metrics are scraped
102    pub prometheus_update_callbacks: Vec<PrometheusUpdateCallback>,
103    /// List of callbacks that return Prometheus exposition text to be appended to metrics output
104    pub prometheus_expfmt_callbacks: Vec<PrometheusExpositionFormatCallback>,
105}
106
107impl MetricsRegistryEntry {
108    /// Create a new metrics registry entry with an empty registry and no callbacks
109    pub fn new() -> Self {
110        Self {
111            prometheus_registry: prometheus::Registry::new(),
112            prometheus_update_callbacks: Vec::new(),
113            prometheus_expfmt_callbacks: Vec::new(),
114        }
115    }
116
117    /// Add a callback function that receives a reference to any MetricsRegistry
118    pub fn add_prometheus_update_callback(&mut self, callback: PrometheusUpdateCallback) {
119        self.prometheus_update_callbacks.push(callback);
120    }
121
122    /// Add an exposition text callback that returns Prometheus text
123    pub fn add_prometheus_expfmt_callback(&mut self, callback: PrometheusExpositionFormatCallback) {
124        self.prometheus_expfmt_callbacks.push(callback);
125    }
126
127    /// Execute all update callbacks and return their results
128    pub fn execute_prometheus_update_callbacks(&self) -> Vec<anyhow::Result<()>> {
129        self.prometheus_update_callbacks
130            .iter()
131            .map(|callback| callback())
132            .collect()
133    }
134
135    /// Execute all exposition text callbacks and return their concatenated text
136    pub fn execute_prometheus_expfmt_callbacks(&self) -> String {
137        let mut result = String::new();
138        for callback in &self.prometheus_expfmt_callbacks {
139            match callback() {
140                Ok(text) => {
141                    if !text.is_empty() {
142                        if !result.is_empty() && !result.ends_with('\n') {
143                            result.push('\n');
144                        }
145                        result.push_str(&text);
146                    }
147                }
148                Err(e) => {
149                    tracing::error!("Error executing exposition text callback: {}", e);
150                }
151            }
152        }
153        result
154    }
155
156    /// Returns true if a metric with the given name already exists in the Prometheus registry
157    pub fn has_metric_named(&self, metric_name: &str) -> bool {
158        self.prometheus_registry
159            .gather()
160            .iter()
161            .any(|mf| mf.name() == metric_name)
162    }
163}
164
165impl Default for MetricsRegistryEntry {
166    fn default() -> Self {
167        Self::new()
168    }
169}
170
171impl Clone for MetricsRegistryEntry {
172    fn clone(&self) -> Self {
173        Self {
174            prometheus_registry: self.prometheus_registry.clone(),
175            prometheus_update_callbacks: Vec::new(), // Callbacks cannot be cloned, so we start with an empty list
176            prometheus_expfmt_callbacks: Vec::new(), // Callbacks cannot be cloned, so we start with an empty list
177        }
178    }
179}
180
181/// Distributed [Runtime] which provides access to shared resources across the cluster, this includes
182/// communication protocols and transports.
183#[derive(Clone)]
184pub struct DistributedRuntime {
185    // local runtime
186    runtime: Runtime,
187
188    // we might consider a unifed transport manager here
189    etcd_client: Option<transports::etcd::Client>,
190    nats_client: transports::nats::Client,
191    store: Arc<dyn KeyValueStore>,
192    tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>,
193    system_status_server: Arc<OnceLock<Arc<system_status_server::SystemStatusServerInfo>>>,
194
195    // local registry for components
196    // the registry allows us to use share runtime resources across instances of the same component object.
197    // take for example two instances of a client to the same remote component. The registry allows us to use
198    // a single endpoint watcher for both clients, this keeps the number background tasking watching specific
199    // paths in etcd to a minimum.
200    component_registry: component::Registry,
201
202    // Will only have static components that are not discoverable via etcd, they must be know at
203    // startup. Will not start etcd.
204    is_static: bool,
205
206    instance_sources: Arc<tokio::sync::Mutex<HashMap<Endpoint, Weak<InstanceSource>>>>,
207
208    // Health Status
209    system_health: Arc<std::sync::Mutex<SystemHealth>>,
210
211    // This map associates metric prefixes with their corresponding Prometheus registries and callbacks.
212    // Uses RwLock for better concurrency - multiple threads can read (execute callbacks) simultaneously.
213    hierarchy_to_metricsregistry: Arc<std::sync::RwLock<HashMap<String, MetricsRegistryEntry>>>,
214}