dynamo_runtime/
distributed.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::component::Component;
5use crate::pipeline::PipelineError;
6use crate::storage::key_value_store::{
7    EtcdStore, KeyValueStore, KeyValueStoreEnum, KeyValueStoreManager, KeyValueStoreSelect,
8    MemoryStore,
9};
10use crate::transports::nats::DRTNatsClientPrometheusMetrics;
11use crate::{
12    component::{self, ComponentBuilder, Endpoint, InstanceSource, Namespace},
13    discovery::Discovery,
14    metrics::PrometheusUpdateCallback,
15    metrics::{MetricsHierarchy, MetricsRegistry},
16    service::ServiceClient,
17    transports::{etcd, nats, tcp},
18};
19use crate::{discovery, system_status_server, transports};
20
21use super::utils::GracefulShutdownTracker;
22use crate::SystemHealth;
23use crate::runtime::Runtime;
24
25use async_once_cell::OnceCell;
26use std::sync::{Arc, OnceLock, Weak};
27
28use anyhow::Result;
29use derive_getters::Dissolve;
30use figment::error;
31use std::collections::HashMap;
32use tokio::sync::Mutex;
33use tokio_util::sync::CancellationToken;
34
35/// Distributed [Runtime] which provides access to shared resources across the cluster, this includes
36/// communication protocols and transports.
37#[derive(Clone)]
38pub struct DistributedRuntime {
39    // local runtime
40    runtime: Runtime,
41
42    // Unified transport manager
43    etcd_client: Option<transports::etcd::Client>,
44    nats_client: Option<transports::nats::Client>,
45    store: KeyValueStoreManager,
46    tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>,
47    network_manager: Arc<OnceCell<Arc<crate::pipeline::network::manager::NetworkManager>>>,
48    system_status_server: Arc<OnceLock<Arc<system_status_server::SystemStatusServerInfo>>>,
49
50    // Service discovery client
51    discovery_client: Arc<dyn discovery::Discovery>,
52
53    // Discovery metadata (only used for Kubernetes backend)
54    // Shared with system status server to expose via /metadata endpoint
55    discovery_metadata: Option<Arc<tokio::sync::RwLock<discovery::DiscoveryMetadata>>>,
56
57    // local registry for components
58    // the registry allows us to use share runtime resources across instances of the same component object.
59    // take for example two instances of a client to the same remote component. The registry allows us to use
60    // a single endpoint watcher for both clients, this keeps the number background tasking watching specific
61    // paths in etcd to a minimum.
62    component_registry: component::Registry,
63
64    // Will only have static components that are not discoverable via etcd, they must be know at
65    // startup. Will not start etcd.
66    is_static: bool,
67
68    instance_sources: Arc<tokio::sync::Mutex<HashMap<Endpoint, Weak<InstanceSource>>>>,
69
70    // Health Status
71    system_health: Arc<parking_lot::Mutex<SystemHealth>>,
72
73    // This hierarchy's own metrics registry
74    metrics_registry: MetricsRegistry,
75}
76
77impl MetricsHierarchy for DistributedRuntime {
78    fn basename(&self) -> String {
79        "".to_string() // drt has no basename. Basename only begins with the Namespace.
80    }
81
82    fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
83        vec![] // drt is the root, so no parent hierarchies
84    }
85
86    fn get_metrics_registry(&self) -> &MetricsRegistry {
87        &self.metrics_registry
88    }
89}
90
91impl std::fmt::Debug for DistributedRuntime {
92    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93        write!(f, "DistributedRuntime")
94    }
95}
96
97impl DistributedRuntime {
98    pub async fn new(runtime: Runtime, config: DistributedConfig) -> Result<Self> {
99        let (selected_kv_store, nats_config, is_static) = config.dissolve();
100
101        let runtime_clone = runtime.clone();
102
103        let (etcd_client, store) = match (is_static, selected_kv_store) {
104            (false, KeyValueStoreSelect::Etcd(etcd_config)) => {
105                let etcd_client = etcd::Client::new(*etcd_config, runtime_clone).await.inspect_err(|err|
106                    // The returned error doesn't show because of a dropped runtime error, so
107                    // log it first.
108                    tracing::error!(%err, "Could not connect to etcd. Pass `--store-kv ..` to use a different backend or start etcd."))?;
109                let store = KeyValueStoreManager::etcd(etcd_client.clone());
110                (Some(etcd_client), store)
111            }
112            (false, KeyValueStoreSelect::File(root)) => (None, KeyValueStoreManager::file(root)),
113            (true, _) | (false, KeyValueStoreSelect::Memory) => {
114                (None, KeyValueStoreManager::memory())
115            }
116        };
117
118        let nats_client = Some(nats_config.clone().connect().await?);
119
120        // Start system status server for health and metrics if enabled in configuration
121        let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default();
122        // IMPORTANT: We must extract cancel_token from runtime BEFORE moving runtime into the struct below.
123        // This is because after moving, runtime is no longer accessible in this scope (ownership rules).
124        let cancel_token = if config.system_server_enabled() {
125            Some(runtime.clone().child_token())
126        } else {
127            None
128        };
129        let starting_health_status = config.starting_health_status.clone();
130        let use_endpoint_health_status = config.use_endpoint_health_status.clone();
131        let health_endpoint_path = config.system_health_path.clone();
132        let live_endpoint_path = config.system_live_path.clone();
133        let system_health = Arc::new(parking_lot::Mutex::new(SystemHealth::new(
134            starting_health_status,
135            use_endpoint_health_status,
136            health_endpoint_path,
137            live_endpoint_path,
138        )));
139
140        let nats_client_for_metrics = nats_client.clone();
141
142        // Initialize discovery client based on backend configuration
143        let discovery_backend =
144            std::env::var("DYN_DISCOVERY_BACKEND").unwrap_or_else(|_| "kv_store".to_string());
145
146        let (discovery_client, discovery_metadata) = match discovery_backend.as_str() {
147            "kubernetes" => {
148                tracing::info!("Initializing Kubernetes discovery backend");
149                let metadata = Arc::new(tokio::sync::RwLock::new(
150                    crate::discovery::DiscoveryMetadata::new(),
151                ));
152                let client = crate::discovery::KubeDiscoveryClient::new(
153                    metadata.clone(),
154                    runtime.primary_token(),
155                )
156                .await
157                .inspect_err(
158                    |err| tracing::error!(%err, "Failed to initialize Kubernetes discovery client"),
159                )?;
160                (Arc::new(client) as Arc<dyn Discovery>, Some(metadata))
161            }
162            _ => {
163                tracing::info!("Initializing KV store discovery backend");
164                use crate::discovery::KVStoreDiscovery;
165                (
166                    Arc::new(KVStoreDiscovery::new(
167                        store.clone(),
168                        runtime.primary_token(),
169                    )) as Arc<dyn Discovery>,
170                    None,
171                )
172            }
173        };
174
175        let distributed_runtime = Self {
176            runtime,
177            etcd_client,
178            store,
179            nats_client,
180            tcp_server: Arc::new(OnceCell::new()),
181            network_manager: Arc::new(OnceCell::new()),
182            system_status_server: Arc::new(OnceLock::new()),
183            discovery_client,
184            discovery_metadata,
185            component_registry: component::Registry::new_with_static(is_static),
186            is_static,
187            instance_sources: Arc::new(Mutex::new(HashMap::new())),
188            metrics_registry: crate::MetricsRegistry::new(),
189            system_health,
190        };
191
192        if let Some(nats_client_for_metrics) = nats_client_for_metrics {
193            let nats_client_metrics = DRTNatsClientPrometheusMetrics::new(
194                &distributed_runtime,
195                nats_client_for_metrics.client().clone(),
196            )?;
197            // Register a callback to update NATS client metrics on the DRT's metrics registry
198            let nats_client_callback = Arc::new({
199                let nats_client_clone = nats_client_metrics.clone();
200                move || {
201                    nats_client_clone.set_from_client_stats();
202                    Ok(())
203                }
204            });
205            distributed_runtime
206                .metrics_registry
207                .add_update_callback(nats_client_callback);
208        }
209
210        // Initialize the uptime gauge in SystemHealth
211        distributed_runtime
212            .system_health
213            .lock()
214            .initialize_uptime_gauge(&distributed_runtime)?;
215
216        // Handle system status server initialization
217        if let Some(cancel_token) = cancel_token {
218            // System server is enabled - start both the state and HTTP server
219            let host = config.system_host.clone();
220            let port = config.system_port as u16;
221
222            // Start system status server (it creates SystemStatusState internally)
223            match crate::system_status_server::spawn_system_status_server(
224                &host,
225                port,
226                cancel_token,
227                Arc::new(distributed_runtime.clone()),
228                distributed_runtime.discovery_metadata.clone(),
229            )
230            .await
231            {
232                Ok((addr, handle)) => {
233                    tracing::info!("System status server started successfully on {}", addr);
234
235                    // Store system status server information
236                    let system_status_server_info =
237                        crate::system_status_server::SystemStatusServerInfo::new(
238                            addr,
239                            Some(handle),
240                        );
241
242                    // Initialize the system_status_server field
243                    distributed_runtime
244                        .system_status_server
245                        .set(Arc::new(system_status_server_info))
246                        .expect("System status server info should only be set once");
247                }
248                Err(e) => {
249                    tracing::error!("System status server startup failed: {}", e);
250                }
251            }
252        } else {
253            // System server HTTP is disabled, but uptime metrics are still being tracked via SystemHealth
254            tracing::debug!(
255                "System status server HTTP endpoints disabled, but uptime metrics are being tracked"
256            );
257        }
258
259        // Start health check manager if enabled
260        if config.health_check_enabled {
261            let health_check_config = crate::health_check::HealthCheckConfig {
262                canary_wait_time: std::time::Duration::from_secs(config.canary_wait_time_secs),
263                request_timeout: std::time::Duration::from_secs(
264                    config.health_check_request_timeout_secs,
265                ),
266            };
267
268            // Start the health check manager (spawns per-endpoint monitoring tasks)
269            match crate::health_check::start_health_check_manager(
270                distributed_runtime.clone(),
271                Some(health_check_config),
272            )
273            .await
274            {
275                Ok(()) => tracing::info!(
276                    "Health check manager started (canary_wait_time: {}s, request_timeout: {}s)",
277                    config.canary_wait_time_secs,
278                    config.health_check_request_timeout_secs
279                ),
280                Err(e) => tracing::error!("Health check manager failed to start: {}", e),
281            }
282        }
283
284        Ok(distributed_runtime)
285    }
286
287    pub async fn from_settings(runtime: Runtime) -> Result<Self> {
288        let config = DistributedConfig::from_settings(false);
289        Self::new(runtime, config).await
290    }
291
292    // Call this if you are using static workers that do not need etcd-based discovery.
293    pub async fn from_settings_without_discovery(runtime: Runtime) -> Result<Self> {
294        let config = DistributedConfig::from_settings(true);
295        Self::new(runtime, config).await
296    }
297
298    pub fn runtime(&self) -> &Runtime {
299        &self.runtime
300    }
301
302    pub fn primary_token(&self) -> CancellationToken {
303        self.runtime.primary_token()
304    }
305
306    // TODO: Don't hand out pointers, instead have methods to use the registry in friendly ways
307    // (without being aware of async locks and so on)
308    pub fn component_registry(&self) -> &component::Registry {
309        &self.component_registry
310    }
311
312    // TODO: Don't hand out pointers, instead provide system health related services.
313    pub fn system_health(&self) -> Arc<parking_lot::Mutex<SystemHealth>> {
314        self.system_health.clone()
315    }
316
317    pub fn connection_id(&self) -> u64 {
318        self.discovery_client.instance_id()
319    }
320
321    pub fn shutdown(&self) {
322        self.runtime.shutdown();
323        self.store.shutdown();
324    }
325
326    /// Create a [`Namespace`]
327    pub fn namespace(&self, name: impl Into<String>) -> Result<Namespace> {
328        Namespace::new(self.clone(), name.into(), self.is_static)
329    }
330
331    /// Returns the discovery interface for service registration and discovery
332    pub fn discovery(&self) -> Arc<dyn Discovery> {
333        self.discovery_client.clone()
334    }
335
336    pub(crate) fn service_client(&self) -> Option<ServiceClient> {
337        self.nats_client().map(|nc| ServiceClient::new(nc.clone()))
338    }
339
340    pub async fn tcp_server(&self) -> Result<Arc<tcp::server::TcpStreamServer>> {
341        Ok(self
342            .tcp_server
343            .get_or_try_init(async move {
344                let options = tcp::server::ServerOptions::default();
345                let server = tcp::server::TcpStreamServer::new(options).await?;
346                Ok::<_, PipelineError>(server)
347            })
348            .await?
349            .clone())
350    }
351
352    /// Get the network manager (lazy initialization)
353    ///
354    /// The network manager consolidates all network configuration and provides
355    /// unified access to request plane servers and clients.
356    pub async fn network_manager(
357        &self,
358    ) -> Result<Arc<crate::pipeline::network::manager::NetworkManager>> {
359        use crate::pipeline::network::manager::NetworkManager;
360
361        let manager = self
362            .network_manager
363            .get_or_try_init(async {
364                // Get NATS client if available
365                let nats_client = self.nats_client().map(|c| c.client().clone());
366
367                // NetworkManager handles all config reading and mode selection
368                anyhow::Ok(NetworkManager::new(
369                    self.child_token(),
370                    nats_client,
371                    self.component_registry.clone(),
372                ))
373            })
374            .await?;
375
376        Ok(manager.clone())
377    }
378
379    /// Get the request plane server (convenience method)
380    ///
381    /// This is a shortcut for `network_manager().await?.server().await`.
382    pub async fn request_plane_server(
383        &self,
384    ) -> Result<Arc<dyn crate::pipeline::network::ingress::unified_server::RequestPlaneServer>>
385    {
386        let manager = self.network_manager().await?;
387        manager.server().await
388    }
389
390    /// DEPRECATED: Use network_manager().server() instead
391    #[deprecated(note = "Use request_plane_server() or network_manager().server() instead")]
392    pub async fn http_server(
393        &self,
394    ) -> Result<Arc<crate::pipeline::network::ingress::http_endpoint::SharedHttpServer>> {
395        // For backward compatibility, try to downcast
396        let _server = self.request_plane_server().await?;
397        // This will only work if we're actually in HTTP mode
398        // For now, just return an error suggesting the new API
399        anyhow::bail!(
400            "http_server() is deprecated. Use request_plane_server() instead, which returns a trait object that works with all transport types."
401        )
402    }
403
404    /// DEPRECATED: Use network_manager().server() instead
405    #[deprecated(note = "Use request_plane_server() or network_manager().server() instead")]
406    pub async fn shared_tcp_server(
407        &self,
408    ) -> Result<Arc<crate::pipeline::network::ingress::shared_tcp_endpoint::SharedTcpServer>> {
409        // For backward compatibility, try to downcast
410        let _server = self.request_plane_server().await?;
411        // This will only work if we're actually in TCP mode
412        // For now, just return an error suggesting the new API
413        anyhow::bail!(
414            "shared_tcp_server() is deprecated. Use request_plane_server() instead, which returns a trait object that works with all transport types."
415        )
416    }
417
418    pub fn nats_client(&self) -> Option<&nats::Client> {
419        self.nats_client.as_ref()
420    }
421
422    /// Get system status server information if available
423    pub fn system_status_server_info(
424        &self,
425    ) -> Option<Arc<crate::system_status_server::SystemStatusServerInfo>> {
426        self.system_status_server.get().cloned()
427    }
428
429    // todo(ryan): deprecate this as we move to Discovery traits and Component Identifiers
430    //
431    // Try to use `store()` instead of this. Only use this if you have not been able to migrate
432    // yet, or if you require etcd-specific features like distributed locking (rare).
433    pub fn etcd_client(&self) -> Option<etcd::Client> {
434        self.etcd_client.clone()
435    }
436
437    /// An interface to store things. Will eventually replace `etcd_client`.
438    /// Currently does key-value, but will grow to include whatever we need to store.
439    pub fn store(&self) -> &KeyValueStoreManager {
440        &self.store
441    }
442
443    pub fn child_token(&self) -> CancellationToken {
444        self.runtime.child_token()
445    }
446
447    pub(crate) fn graceful_shutdown_tracker(&self) -> Arc<GracefulShutdownTracker> {
448        self.runtime.graceful_shutdown_tracker()
449    }
450
451    pub fn instance_sources(&self) -> Arc<Mutex<HashMap<Endpoint, Weak<InstanceSource>>>> {
452        self.instance_sources.clone()
453    }
454}
455
456#[derive(Dissolve)]
457pub struct DistributedConfig {
458    pub store_backend: KeyValueStoreSelect,
459    pub nats_config: nats::ClientOptions,
460    pub is_static: bool,
461}
462
463impl DistributedConfig {
464    pub fn from_settings(is_static: bool) -> DistributedConfig {
465        DistributedConfig {
466            store_backend: KeyValueStoreSelect::Etcd(Box::default()),
467            nats_config: nats::ClientOptions::default(),
468            is_static,
469        }
470    }
471
472    pub fn for_cli() -> DistributedConfig {
473        let etcd_config = etcd::ClientOptions {
474            attach_lease: false,
475            ..Default::default()
476        };
477        DistributedConfig {
478            store_backend: KeyValueStoreSelect::Etcd(Box::new(etcd_config)),
479            nats_config: nats::ClientOptions::default(),
480            is_static: false,
481        }
482    }
483}
484
485pub mod distributed_test_utils {
486    //! Common test helper functions for DistributedRuntime tests
487    // TODO: Use in-memory DistributedRuntime for tests instead of full runtime when available.
488
489    /// Helper function to create a DRT instance for integration-only tests.
490    /// Uses from_current to leverage existing tokio runtime
491    /// Note: Settings are read from environment variables inside DistributedRuntime::from_settings_without_discovery
492    #[cfg(feature = "integration")]
493    pub async fn create_test_drt_async() -> crate::DistributedRuntime {
494        let rt = crate::Runtime::from_current().unwrap();
495        crate::DistributedRuntime::from_settings_without_discovery(rt)
496            .await
497            .unwrap()
498    }
499}
500
501#[cfg(all(test, feature = "integration"))]
502mod tests {
503    use super::distributed_test_utils::create_test_drt_async;
504
505    #[tokio::test]
506    async fn test_drt_uptime_after_delay_system_disabled() {
507        // Test uptime with system status server disabled
508        temp_env::async_with_vars([("DYN_SYSTEM_PORT", None::<&str>)], async {
509            // Start a DRT
510            let drt = create_test_drt_async().await;
511
512            // Wait 50ms
513            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
514
515            // Check that uptime is 50+ ms
516            let uptime = drt.system_health.lock().uptime();
517            assert!(
518                uptime >= std::time::Duration::from_millis(50),
519                "Expected uptime to be at least 50ms, but got {:?}",
520                uptime
521            );
522
523            println!(
524                "✓ DRT uptime test passed (system disabled): uptime = {:?}",
525                uptime
526            );
527        })
528        .await;
529    }
530
531    #[tokio::test]
532    async fn test_drt_uptime_after_delay_system_enabled() {
533        // Test uptime with system status server enabled
534        temp_env::async_with_vars([("DYN_SYSTEM_PORT", Some("8081"))], async {
535            // Start a DRT
536            let drt = create_test_drt_async().await;
537
538            // Wait 50ms
539            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
540
541            // Check that uptime is 50+ ms
542            let uptime = drt.system_health.lock().uptime();
543            assert!(
544                uptime >= std::time::Duration::from_millis(50),
545                "Expected uptime to be at least 50ms, but got {:?}",
546                uptime
547            );
548
549            println!(
550                "✓ DRT uptime test passed (system enabled): uptime = {:?}",
551                uptime
552            );
553        })
554        .await;
555    }
556}