dynamo_runtime/
lib.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Dynamo
5
6#![allow(dead_code)]
7#![allow(unused_imports)]
8
9use std::{
10    collections::HashMap,
11    sync::{Arc, OnceLock, Weak},
12};
13
14pub use anyhow::{
15    Context as ErrorContext, Error, Ok as OK, Result, anyhow as error, bail as raise,
16};
17
18use async_once_cell::OnceCell;
19
20mod config;
21pub use config::RuntimeConfig;
22
23pub mod component;
24pub mod discovery;
25pub mod engine;
26pub mod health_check;
27pub mod system_status_server;
28pub use system_status_server::SystemStatusServerInfo;
29pub mod instances;
30pub mod logging;
31pub mod metrics;
32pub mod pipeline;
33pub mod prelude;
34pub mod protocols;
35pub mod runnable;
36pub mod runtime;
37pub mod service;
38pub mod slug;
39pub mod storage;
40pub mod system_health;
41pub mod traits;
42pub mod transports;
43pub mod utils;
44pub mod worker;
45
46pub mod distributed;
47pub use distributed::distributed_test_utils;
48pub use futures::stream;
49pub use system_health::{HealthCheckTarget, SystemHealth};
50pub use tokio_util::sync::CancellationToken;
51pub use worker::Worker;
52
53use crate::metrics::prometheus_names::distributed_runtime;
54
55use component::{Endpoint, InstanceSource};
56use utils::GracefulShutdownTracker;
57
58use config::HealthStatus;
59
60/// Types of Tokio runtimes that can be used to construct a Dynamo [Runtime].
61#[derive(Clone)]
62enum RuntimeType {
63    Shared(Arc<tokio::runtime::Runtime>),
64    External(tokio::runtime::Handle),
65}
66
67/// Local [Runtime] which provides access to shared resources local to the physical node/machine.
68#[derive(Debug, Clone)]
69pub struct Runtime {
70    id: Arc<String>,
71    primary: RuntimeType,
72    secondary: RuntimeType,
73    cancellation_token: CancellationToken,
74    endpoint_shutdown_token: CancellationToken,
75    graceful_shutdown_tracker: Arc<GracefulShutdownTracker>,
76}
77
78/// Type alias for runtime callback functions to reduce complexity
79///
80/// This type represents an Arc-wrapped callback function that can be:
81/// - Shared efficiently across multiple threads and contexts
82/// - Cloned without duplicating the underlying closure
83/// - Used in generic contexts requiring 'static lifetime
84///
85/// The Arc wrapper is included in the type to make sharing explicit.
86type RuntimeCallback = Arc<dyn Fn() -> anyhow::Result<()> + Send + Sync + 'static>;
87
88/// Structure to hold Prometheus registries and associated callbacks for a given hierarchy
89pub struct MetricsRegistryEntry {
90    /// The Prometheus registry for this prefix
91    pub prometheus_registry: prometheus::Registry,
92    /// List of function callbacks that receive a reference to any MetricsRegistry
93    pub runtime_callbacks: Vec<RuntimeCallback>,
94}
95
96impl MetricsRegistryEntry {
97    /// Create a new metrics registry entry with an empty registry and no callbacks
98    pub fn new() -> Self {
99        Self {
100            prometheus_registry: prometheus::Registry::new(),
101            runtime_callbacks: Vec::new(),
102        }
103    }
104
105    /// Add a callback function that receives a reference to any MetricsRegistry
106    pub fn add_callback(&mut self, callback: RuntimeCallback) {
107        self.runtime_callbacks.push(callback);
108    }
109
110    /// Execute all runtime callbacks and return their results
111    pub fn execute_callbacks(&self) -> Vec<anyhow::Result<()>> {
112        self.runtime_callbacks
113            .iter()
114            .map(|callback| callback())
115            .collect()
116    }
117
118    /// Returns true if a metric with the given name already exists in the Prometheus registry
119    pub fn has_metric_named(&self, metric_name: &str) -> bool {
120        self.prometheus_registry
121            .gather()
122            .iter()
123            .any(|mf| mf.name() == metric_name)
124    }
125}
126
127impl Default for MetricsRegistryEntry {
128    fn default() -> Self {
129        Self::new()
130    }
131}
132
133impl Clone for MetricsRegistryEntry {
134    fn clone(&self) -> Self {
135        Self {
136            prometheus_registry: self.prometheus_registry.clone(),
137            runtime_callbacks: Vec::new(), // Callbacks cannot be cloned, so we start with an empty list
138        }
139    }
140}
141
142/// Distributed [Runtime] which provides access to shared resources across the cluster, this includes
143/// communication protocols and transports.
144#[derive(Clone)]
145pub struct DistributedRuntime {
146    // local runtime
147    runtime: Runtime,
148
149    // we might consider a unifed transport manager here
150    etcd_client: Option<transports::etcd::Client>,
151    nats_client: transports::nats::Client,
152    tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>,
153    system_status_server: Arc<OnceLock<Arc<system_status_server::SystemStatusServerInfo>>>,
154
155    // local registry for components
156    // the registry allows us to use share runtime resources across instances of the same component object.
157    // take for example two instances of a client to the same remote component. The registry allows us to use
158    // a single endpoint watcher for both clients, this keeps the number background tasking watching specific
159    // paths in etcd to a minimum.
160    component_registry: component::Registry,
161
162    // Will only have static components that are not discoverable via etcd, they must be know at
163    // startup. Will not start etcd.
164    is_static: bool,
165
166    instance_sources: Arc<tokio::sync::Mutex<HashMap<Endpoint, Weak<InstanceSource>>>>,
167
168    // Health Status
169    system_health: Arc<std::sync::Mutex<SystemHealth>>,
170
171    // This map associates metric prefixes with their corresponding Prometheus registries and callbacks.
172    // Uses RwLock for better concurrency - multiple threads can read (execute callbacks) simultaneously.
173    hierarchy_to_metricsregistry: Arc<std::sync::RwLock<HashMap<String, MetricsRegistryEntry>>>,
174}