dynamo_runtime/
distributed.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4pub use crate::component::Component;
5use crate::storage::key_value_store::{
6    EtcdStore, KeyValueStore, KeyValueStoreEnum, KeyValueStoreManager, MemoryStore,
7};
8use crate::transports::nats::DRTNatsClientPrometheusMetrics;
9use crate::{
10    ErrorContext,
11    component::{self, ComponentBuilder, Endpoint, InstanceSource, Namespace},
12    discovery::DiscoveryClient,
13    metrics::PrometheusUpdateCallback,
14    metrics::{MetricsHierarchy, MetricsRegistry},
15    service::ServiceClient,
16    transports::{etcd, nats, tcp},
17};
18
19use super::utils::GracefulShutdownTracker;
20use super::{Arc, DistributedRuntime, OK, OnceCell, Result, Runtime, SystemHealth, Weak, error};
21use std::sync::OnceLock;
22
23use derive_getters::Dissolve;
24use figment::error;
25use std::collections::HashMap;
26use tokio::sync::Mutex;
27use tokio_util::sync::CancellationToken;
28
29impl MetricsHierarchy for DistributedRuntime {
30    fn basename(&self) -> String {
31        "".to_string() // drt has no basename. Basename only begins with the Namespace.
32    }
33
34    fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
35        vec![] // drt is the root, so no parent hierarchies
36    }
37
38    fn get_metrics_registry(&self) -> &MetricsRegistry {
39        &self.metrics_registry
40    }
41}
42
43impl std::fmt::Debug for DistributedRuntime {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        write!(f, "DistributedRuntime")
46    }
47}
48
49impl DistributedRuntime {
50    pub async fn new(runtime: Runtime, config: DistributedConfig) -> Result<Self> {
51        let (etcd_config, nats_config, is_static) = config.dissolve();
52
53        let runtime_clone = runtime.clone();
54
55        let (etcd_client, store) = if is_static {
56            (None, KeyValueStoreManager::memory())
57        } else {
58            let etcd_client = etcd::Client::new(etcd_config.clone(), runtime_clone).await?;
59            let store = KeyValueStoreManager::etcd(etcd_client.clone());
60            (Some(etcd_client), store)
61        };
62
63        let nats_client = Some(nats_config.clone().connect().await?);
64
65        // Start system status server for health and metrics if enabled in configuration
66        let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default();
67        // IMPORTANT: We must extract cancel_token from runtime BEFORE moving runtime into the struct below.
68        // This is because after moving, runtime is no longer accessible in this scope (ownership rules).
69        let cancel_token = if config.system_server_enabled() {
70            Some(runtime.clone().child_token())
71        } else {
72            None
73        };
74        let starting_health_status = config.starting_health_status.clone();
75        let use_endpoint_health_status = config.use_endpoint_health_status.clone();
76        let health_endpoint_path = config.system_health_path.clone();
77        let live_endpoint_path = config.system_live_path.clone();
78        let system_health = Arc::new(parking_lot::Mutex::new(SystemHealth::new(
79            starting_health_status,
80            use_endpoint_health_status,
81            health_endpoint_path,
82            live_endpoint_path,
83        )));
84
85        let nats_client_for_metrics = nats_client.clone();
86
87        let distributed_runtime = Self {
88            runtime,
89            etcd_client,
90            store,
91            nats_client,
92            tcp_server: Arc::new(OnceCell::new()),
93            system_status_server: Arc::new(OnceLock::new()),
94            component_registry: component::Registry::new(),
95            is_static,
96            instance_sources: Arc::new(Mutex::new(HashMap::new())),
97            metrics_registry: crate::MetricsRegistry::new(),
98            system_health,
99        };
100
101        if let Some(nats_client_for_metrics) = nats_client_for_metrics {
102            let nats_client_metrics = DRTNatsClientPrometheusMetrics::new(
103                &distributed_runtime,
104                nats_client_for_metrics.client().clone(),
105            )?;
106            // Register a callback to update NATS client metrics on the DRT's metrics registry
107            let nats_client_callback = Arc::new({
108                let nats_client_clone = nats_client_metrics.clone();
109                move || {
110                    nats_client_clone.set_from_client_stats();
111                    Ok(())
112                }
113            });
114            distributed_runtime
115                .metrics_registry
116                .add_update_callback(nats_client_callback);
117        }
118
119        // Initialize the uptime gauge in SystemHealth
120        distributed_runtime
121            .system_health
122            .lock()
123            .initialize_uptime_gauge(&distributed_runtime)?;
124
125        // Handle system status server initialization
126        if let Some(cancel_token) = cancel_token {
127            // System server is enabled - start both the state and HTTP server
128            let host = config.system_host.clone();
129            let port = config.system_port;
130
131            // Start system status server (it creates SystemStatusState internally)
132            match crate::system_status_server::spawn_system_status_server(
133                &host,
134                port,
135                cancel_token,
136                Arc::new(distributed_runtime.clone()),
137            )
138            .await
139            {
140                Ok((addr, handle)) => {
141                    tracing::info!("System status server started successfully on {}", addr);
142
143                    // Store system status server information
144                    let system_status_server_info =
145                        crate::system_status_server::SystemStatusServerInfo::new(
146                            addr,
147                            Some(handle),
148                        );
149
150                    // Initialize the system_status_server field
151                    distributed_runtime
152                        .system_status_server
153                        .set(Arc::new(system_status_server_info))
154                        .expect("System status server info should only be set once");
155                }
156                Err(e) => {
157                    tracing::error!("System status server startup failed: {}", e);
158                }
159            }
160        } else {
161            // System server HTTP is disabled, but uptime metrics are still being tracked via SystemHealth
162            tracing::debug!(
163                "System status server HTTP endpoints disabled, but uptime metrics are being tracked"
164            );
165        }
166
167        // Start health check manager if enabled
168        if config.health_check_enabled {
169            let health_check_config = crate::health_check::HealthCheckConfig {
170                canary_wait_time: std::time::Duration::from_secs(config.canary_wait_time_secs),
171                request_timeout: std::time::Duration::from_secs(
172                    config.health_check_request_timeout_secs,
173                ),
174            };
175
176            // Start the health check manager (spawns per-endpoint monitoring tasks)
177            match crate::health_check::start_health_check_manager(
178                distributed_runtime.clone(),
179                Some(health_check_config),
180            )
181            .await
182            {
183                Ok(()) => tracing::info!(
184                    "Health check manager started (canary_wait_time: {}s, request_timeout: {}s)",
185                    config.canary_wait_time_secs,
186                    config.health_check_request_timeout_secs
187                ),
188                Err(e) => tracing::error!("Health check manager failed to start: {}", e),
189            }
190        }
191
192        Ok(distributed_runtime)
193    }
194
195    pub async fn from_settings(runtime: Runtime) -> Result<Self> {
196        let config = DistributedConfig::from_settings(false);
197        Self::new(runtime, config).await
198    }
199
200    // Call this if you are using static workers that do not need etcd-based discovery.
201    pub async fn from_settings_without_discovery(runtime: Runtime) -> Result<Self> {
202        let config = DistributedConfig::from_settings(true);
203        Self::new(runtime, config).await
204    }
205
206    pub fn runtime(&self) -> &Runtime {
207        &self.runtime
208    }
209
210    pub fn primary_token(&self) -> CancellationToken {
211        self.runtime.primary_token()
212    }
213
214    /// The etcd lease all our components will be attached to.
215    /// Not available for static workers.
216    pub fn primary_lease(&self) -> Option<etcd::Lease> {
217        self.etcd_client.as_ref().map(|c| c.primary_lease())
218    }
219
220    pub fn shutdown(&self) {
221        self.runtime.shutdown();
222    }
223
224    /// Create a [`Namespace`]
225    pub fn namespace(&self, name: impl Into<String>) -> Result<Namespace> {
226        Namespace::new(self.clone(), name.into(), self.is_static)
227    }
228
229    // /// Create a [`Component`]
230    // pub fn component(
231    //     &self,
232    //     name: impl Into<String>,
233    //     namespace: impl Into<String>,
234    // ) -> Result<Component> {
235    //     Ok(ComponentBuilder::from_runtime(self.clone())
236    //         .name(name.into())
237    //         .namespace(namespace.into())
238    //         .build()?)
239    // }
240
241    pub(crate) fn discovery_client(&self, namespace: impl Into<String>) -> DiscoveryClient {
242        DiscoveryClient::new(
243            namespace.into(),
244            self.etcd_client
245                .clone()
246                .expect("Attempt to get discovery_client on static DistributedRuntime"),
247        )
248    }
249
250    pub(crate) fn service_client(&self) -> Option<ServiceClient> {
251        self.nats_client().map(|nc| ServiceClient::new(nc.clone()))
252    }
253
254    pub async fn tcp_server(&self) -> Result<Arc<tcp::server::TcpStreamServer>> {
255        Ok(self
256            .tcp_server
257            .get_or_try_init(async move {
258                let options = tcp::server::ServerOptions::default();
259                let server = tcp::server::TcpStreamServer::new(options).await?;
260                OK(server)
261            })
262            .await?
263            .clone())
264    }
265
266    pub fn nats_client(&self) -> Option<&nats::Client> {
267        self.nats_client.as_ref()
268    }
269
270    /// Get system status server information if available
271    pub fn system_status_server_info(
272        &self,
273    ) -> Option<Arc<crate::system_status_server::SystemStatusServerInfo>> {
274        self.system_status_server.get().cloned()
275    }
276
277    // todo(ryan): deprecate this as we move to Discovery traits and Component Identifiers
278    pub fn etcd_client(&self) -> Option<etcd::Client> {
279        self.etcd_client.clone()
280    }
281
282    // Deprecated but our CI blocks us using the feature currently.
283    //#[deprecated(note = "Use KeyValueStoreManager via store(); this will be removed")]
284    pub fn deprecated_etcd_client(&self) -> Option<etcd::Client> {
285        self.etcd_client.clone()
286    }
287
288    /// An interface to store things. Will eventually replace `etcd_client`.
289    /// Currently does key-value, but will grow to include whatever we need to store.
290    pub fn store(&self) -> &KeyValueStoreManager {
291        &self.store
292    }
293
294    pub fn child_token(&self) -> CancellationToken {
295        self.runtime.child_token()
296    }
297
298    pub(crate) fn graceful_shutdown_tracker(&self) -> Arc<GracefulShutdownTracker> {
299        self.runtime.graceful_shutdown_tracker()
300    }
301
302    pub fn instance_sources(&self) -> Arc<Mutex<HashMap<Endpoint, Weak<InstanceSource>>>> {
303        self.instance_sources.clone()
304    }
305}
306
307#[derive(Dissolve)]
308pub struct DistributedConfig {
309    pub etcd_config: etcd::ClientOptions,
310    pub nats_config: nats::ClientOptions,
311    pub is_static: bool,
312}
313
314impl DistributedConfig {
315    pub fn from_settings(is_static: bool) -> DistributedConfig {
316        DistributedConfig {
317            etcd_config: etcd::ClientOptions::default(),
318            nats_config: nats::ClientOptions::default(),
319            is_static,
320        }
321    }
322
323    pub fn for_cli() -> DistributedConfig {
324        let mut config = DistributedConfig {
325            etcd_config: etcd::ClientOptions::default(),
326            nats_config: nats::ClientOptions::default(),
327            is_static: false,
328        };
329
330        config.etcd_config.attach_lease = false;
331
332        config
333    }
334}
335
336pub mod distributed_test_utils {
337    //! Common test helper functions for DistributedRuntime tests
338    // TODO: Use in-memory DistributedRuntime for tests instead of full runtime when available.
339
340    /// Helper function to create a DRT instance for integration-only tests.
341    /// Uses from_current to leverage existing tokio runtime
342    /// Note: Settings are read from environment variables inside DistributedRuntime::from_settings_without_discovery
343    #[cfg(feature = "integration")]
344    pub async fn create_test_drt_async() -> crate::DistributedRuntime {
345        let rt = crate::Runtime::from_current().unwrap();
346        crate::DistributedRuntime::from_settings_without_discovery(rt)
347            .await
348            .unwrap()
349    }
350}
351
352#[cfg(all(test, feature = "integration"))]
353mod tests {
354    use super::distributed_test_utils::create_test_drt_async;
355
356    #[tokio::test]
357    async fn test_drt_uptime_after_delay_system_disabled() {
358        // Test uptime with system status server disabled
359        temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async {
360            // Start a DRT
361            let drt = create_test_drt_async().await;
362
363            // Wait 50ms
364            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
365
366            // Check that uptime is 50+ ms
367            let uptime = drt.system_health.lock().uptime();
368            assert!(
369                uptime >= std::time::Duration::from_millis(50),
370                "Expected uptime to be at least 50ms, but got {:?}",
371                uptime
372            );
373
374            println!(
375                "✓ DRT uptime test passed (system disabled): uptime = {:?}",
376                uptime
377            );
378        })
379        .await;
380    }
381
382    #[tokio::test]
383    async fn test_drt_uptime_after_delay_system_enabled() {
384        // Test uptime with system status server enabled
385        temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("true"))], async {
386            // Start a DRT
387            let drt = create_test_drt_async().await;
388
389            // Wait 50ms
390            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
391
392            // Check that uptime is 50+ ms
393            let uptime = drt.system_health.lock().uptime();
394            assert!(
395                uptime >= std::time::Duration::from_millis(50),
396                "Expected uptime to be at least 50ms, but got {:?}",
397                uptime
398            );
399
400            println!(
401                "✓ DRT uptime test passed (system enabled): uptime = {:?}",
402                uptime
403            );
404        })
405        .await;
406    }
407}