Skip to main content

dynamo_runtime/
distributed.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::component::{
5    self, Component, ComponentBuilder, Endpoint, EndpointDiscoverySource, Instance, Namespace,
6    RoutingOccupancyState,
7};
8use crate::config::environment_names::tcp_response_stream;
9use crate::pipeline::PipelineError;
10use crate::pipeline::network::manager::NetworkManager;
11use crate::service::{ServiceClient, ServiceSet};
12use crate::storage::kv;
13use crate::{discovery, system_status_server, transports};
14use crate::{
15    discovery::Discovery,
16    metrics::PrometheusUpdateCallback,
17    metrics::{MetricsHierarchy, MetricsRegistry},
18    transports::{etcd, nats, tcp},
19};
20
21use super::utils::GracefulShutdownTracker;
22use crate::SystemHealth;
23use crate::runtime::Runtime;
24
25// Used instead of std::cell::OnceCell because get_or_try_init there is nightly
26use async_once_cell::OnceCell;
27
28use std::fmt;
29use std::sync::{Arc, OnceLock, Weak};
30use std::time::Duration;
31use tokio::sync::watch::Receiver;
32
33use anyhow::Result;
34use derive_getters::Dissolve;
35use figment::error;
36use std::collections::HashMap;
37use tokio::sync::Mutex;
38use tokio_util::sync::CancellationToken;
39
40type EndpointDiscoverySourceMap = HashMap<Endpoint, Weak<EndpointDiscoverySource>>;
41type RoutingOccupancyMap = HashMap<Endpoint, Weak<RoutingOccupancyState>>;
42
43/// Distributed [Runtime] which provides access to shared resources across the cluster, this includes
44/// communication protocols and transports.
45#[derive(Clone)]
46pub struct DistributedRuntime {
47    // local runtime
48    runtime: Runtime,
49
50    nats_client: Option<transports::nats::Client>,
51    network_manager: Arc<NetworkManager>,
52    tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>,
53    system_status_server: Arc<OnceLock<Arc<system_status_server::SystemStatusServerInfo>>>,
54    request_plane: RequestPlaneMode,
55
56    // Service discovery client
57    discovery_client: Arc<dyn discovery::Discovery>,
58
59    // Discovery metadata (only used for Kubernetes backend)
60    // Shared with system status server to expose via /metadata endpoint
61    discovery_metadata: Option<Arc<tokio::sync::RwLock<discovery::DiscoveryMetadata>>>,
62
63    // local registry for components
64    // the registry allows us to use share runtime resources across instances of the same component object.
65    // take for example two instances of a client to the same remote component. The registry allows us to use
66    // a single endpoint watcher for both clients, this keeps the number background tasking watching specific
67    // paths in etcd to a minimum.
68    component_registry: component::Registry,
69
70    endpoint_discovery_sources: Arc<tokio::sync::Mutex<EndpointDiscoverySourceMap>>,
71    routing_occupancy_states: Arc<tokio::sync::Mutex<RoutingOccupancyMap>>,
72
73    // Health Status
74    system_health: Arc<parking_lot::Mutex<SystemHealth>>,
75
76    // Local endpoint registry for in-process calls
77    local_endpoint_registry: crate::local_endpoint_registry::LocalEndpointRegistry,
78
79    // This hierarchy's own metrics registry
80    metrics_registry: MetricsRegistry,
81
82    // Registry for /engine/* route callbacks
83    engine_routes: crate::engine_routes::EngineRouteRegistry,
84
85    // Backs `/v1/metadata/{model_slug}/{model_suffix}/{filename}`.
86    metadata_artifacts: crate::metadata_registry::MetadataArtifactRegistry,
87
88    // Resolved event transport kind — set once at construction time from
89    // DYN_EVENT_PLANE + discovery backend; returned by default_event_transport_kind().
90    event_transport_kind: crate::discovery::EventTransportKind,
91}
92
93impl MetricsHierarchy for DistributedRuntime {
94    fn basename(&self) -> String {
95        "".to_string() // drt has no basename. Basename only begins with the Namespace.
96    }
97
98    fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
99        vec![] // drt is the root, so no parent hierarchies
100    }
101
102    fn get_metrics_registry(&self) -> &MetricsRegistry {
103        &self.metrics_registry
104    }
105
106    fn connection_id(&self) -> Option<u64> {
107        Some(self.discovery_client.instance_id())
108    }
109}
110
111impl std::fmt::Debug for DistributedRuntime {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        write!(f, "DistributedRuntime")
114    }
115}
116
117impl DistributedRuntime {
118    pub async fn new(runtime: Runtime, config: DistributedConfig) -> Result<Self> {
119        let (discovery_backend, nats_config, request_plane, event_transport_kind) =
120            config.dissolve();
121
122        let nats_client = match nats_config {
123            Some(nc) => Some(nc.connect().await?),
124            None => None,
125        };
126
127        // Start system status server for health and metrics if enabled in configuration
128        let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default();
129        // IMPORTANT: We must extract cancel_token from runtime BEFORE moving runtime into the struct below.
130        // This is because after moving, runtime is no longer accessible in this scope (ownership rules).
131        let cancel_token = if config.system_server_enabled() {
132            Some(runtime.clone().child_token())
133        } else {
134            None
135        };
136        let starting_health_status = config.starting_health_status.clone();
137        let use_endpoint_health_status = config.use_endpoint_health_status.clone();
138        let health_endpoint_path = config.system_health_path.clone();
139        let live_endpoint_path = config.system_live_path.clone();
140        let system_health = Arc::new(parking_lot::Mutex::new(SystemHealth::new(
141            starting_health_status,
142            use_endpoint_health_status,
143            config.health_check_enabled,
144            health_endpoint_path,
145            live_endpoint_path,
146        )));
147
148        // Initialize discovery client based on backend configuration
149        let (discovery_client, discovery_metadata) = match discovery_backend {
150            DiscoveryBackend::Kubernetes => {
151                tracing::info!("Initializing Kubernetes discovery backend");
152                let metadata = Arc::new(tokio::sync::RwLock::new(
153                    crate::discovery::DiscoveryMetadata::new(),
154                ));
155                let client = crate::discovery::KubeDiscoveryClient::new(
156                    metadata.clone(),
157                    runtime.primary_token(),
158                )
159                .await
160                .inspect_err(
161                    |err| tracing::error!(%err, "Failed to initialize Kubernetes discovery client"),
162                )?;
163                (Arc::new(client) as Arc<dyn Discovery>, Some(metadata))
164            }
165            DiscoveryBackend::KvStore(kv_selector) => {
166                tracing::info!("Initializing KV store discovery backend: {kv_selector}");
167                let runtime_clone = runtime.clone();
168                let store = match kv_selector {
169                    kv::Selector::Etcd(etcd_config) => {
170                        let etcd_client = etcd::Client::new(*etcd_config, runtime_clone).await.inspect_err(|err|
171                            tracing::error!(%err, "Could not connect to etcd. Pass `--discovery-backend ..` to use a different backend or start etcd."))?;
172                        kv::Manager::etcd(etcd_client)
173                    }
174                    kv::Selector::File(root) => kv::Manager::file(runtime.primary_token(), root),
175                    kv::Selector::Memory => kv::Manager::memory(),
176                };
177                use crate::discovery::KVStoreDiscovery;
178                (
179                    Arc::new(KVStoreDiscovery::new(store, runtime.primary_token()))
180                        as Arc<dyn Discovery>,
181                    None,
182                )
183            }
184        };
185
186        let component_registry = component::Registry::new();
187
188        // NetworkManager for request plane
189        let network_manager = NetworkManager::new(
190            runtime.child_token(),
191            nats_client.clone().map(|c| c.client().clone()),
192            component_registry.clone(),
193            request_plane,
194        );
195
196        let distributed_runtime = Self {
197            runtime,
198            network_manager: Arc::new(network_manager),
199            nats_client,
200            tcp_server: Arc::new(OnceCell::new()),
201            system_status_server: Arc::new(OnceLock::new()),
202            discovery_client,
203            discovery_metadata,
204            component_registry,
205            endpoint_discovery_sources: Arc::new(Mutex::new(HashMap::new())),
206            routing_occupancy_states: Arc::new(Mutex::new(HashMap::new())),
207            metrics_registry: crate::MetricsRegistry::new(),
208            system_health,
209            request_plane,
210            local_endpoint_registry: crate::local_endpoint_registry::LocalEndpointRegistry::new(),
211            engine_routes: crate::engine_routes::EngineRouteRegistry::new(),
212            metadata_artifacts: crate::metadata_registry::MetadataArtifactRegistry::new(),
213            event_transport_kind,
214        };
215
216        // Initialize the uptime gauge in SystemHealth
217        distributed_runtime
218            .system_health
219            .lock()
220            .initialize_uptime_gauge(&distributed_runtime)?;
221
222        // Register an update callback so the uptime gauge is refreshed before
223        // every Prometheus scrape (both system status server and frontend).
224        {
225            let system_health = distributed_runtime.system_health.clone();
226            distributed_runtime
227                .metrics_registry
228                .add_update_callback(std::sync::Arc::new(move || {
229                    system_health.lock().update_uptime_gauge();
230                    Ok(())
231                }));
232        }
233
234        // Handle system status server initialization
235        if let Some(cancel_token) = cancel_token {
236            // System server is enabled - start both the state and HTTP server
237            let host = config.system_host.clone();
238            let port = config.system_port as u16;
239
240            // Start system status server (it creates SystemStatusState internally)
241            match crate::system_status_server::spawn_system_status_server(
242                &host,
243                port,
244                cancel_token,
245                Arc::new(distributed_runtime.clone()),
246                distributed_runtime.discovery_metadata.clone(),
247            )
248            .await
249            {
250                Ok((addr, handle)) => {
251                    tracing::info!("System status server started successfully on {addr}");
252
253                    // Store system status server information
254                    let system_status_server_info =
255                        crate::system_status_server::SystemStatusServerInfo::new(
256                            addr,
257                            Some(handle),
258                        );
259
260                    // Initialize the system_status_server field
261                    distributed_runtime
262                        .system_status_server
263                        .set(Arc::new(system_status_server_info))
264                        .expect("System status server info should only be set once");
265                }
266                Err(e) => {
267                    tracing::error!("System status server startup failed: {e}");
268                }
269            }
270        } else {
271            // System server HTTP is disabled, but uptime metrics are still being tracked via SystemHealth
272            tracing::debug!(
273                "System status server HTTP endpoints disabled, but uptime metrics are being tracked"
274            );
275        }
276
277        // Start health check manager if enabled
278        if config.health_check_enabled {
279            let health_check_config = crate::health_check::HealthCheckConfig {
280                canary_wait_time: std::time::Duration::from_secs(config.canary_wait_time_secs),
281                request_timeout: std::time::Duration::from_secs(
282                    config.health_check_request_timeout_secs,
283                ),
284            };
285
286            // Start the health check manager (spawns per-endpoint monitoring tasks)
287            match crate::health_check::start_health_check_manager(
288                distributed_runtime.clone(),
289                Some(health_check_config),
290            )
291            .await
292            {
293                Ok(()) => tracing::info!(
294                    "Health check manager started (canary_wait_time: {}s, request_timeout: {}s)",
295                    config.canary_wait_time_secs,
296                    config.health_check_request_timeout_secs
297                ),
298                Err(e) => tracing::error!("Health check manager failed to start: {e}"),
299            }
300        }
301
302        Ok(distributed_runtime)
303    }
304
305    pub async fn from_settings(runtime: Runtime) -> Result<Self> {
306        let config = DistributedConfig::from_settings();
307        Self::new(runtime, config).await
308    }
309
310    pub fn runtime(&self) -> &Runtime {
311        &self.runtime
312    }
313
314    pub fn primary_token(&self) -> CancellationToken {
315        self.runtime.primary_token()
316    }
317
318    // TODO: Don't hand out pointers, instead have methods to use the registry in friendly ways
319    // (without being aware of async locks and so on)
320    pub fn component_registry(&self) -> &component::Registry {
321        &self.component_registry
322    }
323
324    // TODO: Don't hand out pointers, instead provide system health related services.
325    pub fn system_health(&self) -> Arc<parking_lot::Mutex<SystemHealth>> {
326        self.system_health.clone()
327    }
328
329    /// Get the local endpoint registry for in-process endpoint calls
330    pub fn local_endpoint_registry(
331        &self,
332    ) -> &crate::local_endpoint_registry::LocalEndpointRegistry {
333        &self.local_endpoint_registry
334    }
335
336    /// Get the engine route registry for registering custom /engine/* routes
337    pub fn engine_routes(&self) -> &crate::engine_routes::EngineRouteRegistry {
338        &self.engine_routes
339    }
340
341    pub fn metadata_artifacts(&self) -> &crate::metadata_registry::MetadataArtifactRegistry {
342        &self.metadata_artifacts
343    }
344
345    pub fn connection_id(&self) -> u64 {
346        self.discovery_client.instance_id()
347    }
348
349    pub fn shutdown(&self) {
350        self.runtime.shutdown();
351        self.discovery_client.shutdown();
352    }
353
354    /// Create a [`Namespace`]
355    pub fn namespace(&self, name: impl Into<String>) -> Result<Namespace> {
356        Namespace::new(self.clone(), name.into())
357    }
358
359    /// Returns the discovery interface for service registration and discovery
360    pub fn discovery(&self) -> Arc<dyn Discovery> {
361        self.discovery_client.clone()
362    }
363
364    pub async fn tcp_server(&self) -> Result<Arc<tcp::server::TcpStreamServer>> {
365        Ok(self
366            .tcp_server
367            .get_or_try_init(async move {
368                let port = match std::env::var(tcp_response_stream::DYN_TCP_RESPONSE_STREAM_PORT) {
369                    Ok(p) => p.parse::<u16>().map_err(|_| {
370                        PipelineError::Generic(format!(
371                            "invalid {}: '{}' is not a valid port number",
372                            tcp_response_stream::DYN_TCP_RESPONSE_STREAM_PORT,
373                            p
374                        ))
375                    })?,
376                    Err(_) => 0,
377                };
378                let interface = std::env::var(tcp_response_stream::DYN_TCP_RESPONSE_STREAM_HOST)
379                    .ok()
380                    .filter(|h| !h.is_empty());
381
382                let host_suffix = interface
383                    .as_ref()
384                    .map_or(String::new(), |h| format!(" on host {h}"));
385                if port == 0 {
386                    tracing::info!(
387                        "TCP response stream server using OS-assigned port{host_suffix}"
388                    );
389                } else {
390                    tracing::info!(
391                        "TCP response stream server using fixed port {port}{host_suffix}"
392                    );
393                }
394
395                let options = tcp::server::ServerOptions { port, interface };
396                let server = tcp::server::TcpStreamServer::new(options).await?;
397                Ok::<_, PipelineError>(server)
398            })
399            .await?
400            .clone())
401    }
402
403    /// Get the network manager
404    ///
405    /// The network manager consolidates all network configuration and provides
406    /// unified access to request plane servers and clients.
407    pub fn network_manager(&self) -> Arc<NetworkManager> {
408        self.network_manager.clone()
409    }
410
411    /// Get the request plane server (convenience method)
412    ///
413    /// This is a shortcut for `network_manager().await?.server().await`.
414    pub async fn request_plane_server(
415        &self,
416    ) -> Result<Arc<dyn crate::pipeline::network::ingress::unified_server::RequestPlaneServer>>
417    {
418        self.network_manager().server().await
419    }
420
421    /// Get system status server information if available
422    pub fn system_status_server_info(
423        &self,
424    ) -> Option<Arc<crate::system_status_server::SystemStatusServerInfo>> {
425        self.system_status_server.get().cloned()
426    }
427
428    /// How the frontend should talk to the backend.
429    pub fn request_plane(&self) -> RequestPlaneMode {
430        self.request_plane
431    }
432
433    /// Returns the event transport kind this runtime was configured with.
434    ///
435    /// The value is resolved once at construction time by `DiscoveryBackend::resolve_event_transport_kind`:
436    /// if `DYN_EVENT_PLANE` is set explicitly that value wins; otherwise the discovery
437    /// backend drives the default (ZMQ for `file`/`mem`, NATS for `etcd`/`kubernetes`).
438    ///
439    /// Use this instead of [`EventTransportKind::from_env_or_default`] wherever you have
440    /// access to a `DistributedRuntime`, so that local-only workflows work without
441    /// setting `DYN_EVENT_PLANE` explicitly.
442    pub fn default_event_transport_kind(&self) -> crate::discovery::EventTransportKind {
443        self.event_transport_kind
444    }
445
446    pub fn child_token(&self) -> CancellationToken {
447        self.runtime.child_token()
448    }
449
450    pub(crate) fn graceful_shutdown_tracker(&self) -> Arc<GracefulShutdownTracker> {
451        self.runtime.graceful_shutdown_tracker()
452    }
453
454    pub(crate) fn endpoint_discovery_sources(&self) -> Arc<Mutex<EndpointDiscoverySourceMap>> {
455        self.endpoint_discovery_sources.clone()
456    }
457
458    /// Register an external long-running shutdown task with this runtime's
459    /// graceful-shutdown tracker. While the returned guard is alive,
460    /// `Runtime::shutdown` will keep waiting in Phase 2 (rather than
461    /// advancing to Phase 3 / NATS+etcd teardown). Drop the guard once
462    /// the task has finished.
463    pub fn register_graceful_task(&self) -> crate::utils::GracefulTaskGuard {
464        self.runtime.graceful_shutdown_tracker().register_task()
465    }
466
467    pub(crate) fn routing_occupancy_states(&self) -> Arc<Mutex<RoutingOccupancyMap>> {
468        self.routing_occupancy_states.clone()
469    }
470
471    /// TODO: This is a temporary KV router measure for component/component.rs EventPublisher impl for
472    /// Component, to allow it to publish to NATS. KV Router is the only user.
473    ///
474    /// When NATS is not available (e.g., running in approximate mode with --no-kv-events),
475    /// this function returns Ok(()) silently since publishing is optional in that mode.
476    pub async fn kv_router_nats_publish(
477        &self,
478        subject: String,
479        payload: bytes::Bytes,
480    ) -> anyhow::Result<()> {
481        let Some(nats_client) = self.nats_client.as_ref() else {
482            // NATS not available - this is expected in approximate mode (--no-kv-events)
483            tracing::trace!("Skipping NATS publish (NATS not configured): {subject}");
484            return Ok(());
485        };
486        Ok(nats_client.client().publish(subject, payload).await?)
487    }
488
489    /// TODO: This is a temporary KV router measure for component/component.rs EventSubscriber impl for
490    /// Component, to allow it to subscribe to NATS. KV Router is the only user.
491    pub(crate) async fn kv_router_nats_subscribe(
492        &self,
493        subject: String,
494    ) -> Result<async_nats::Subscriber> {
495        let Some(nats_client) = self.nats_client.as_ref() else {
496            anyhow::bail!("KV router's EventSubscriber requires NATS");
497        };
498        Ok(nats_client.client().subscribe(subject).await?)
499    }
500
501    /// TODO (karenc): This is a temporary KV router measure for worker query requests.
502    /// Allows KV Router to perform request/reply with workers. (versus the pub/sub pattern above)
503    /// KV Router is the only user, made public for use in dynamo-llm crate
504    pub async fn kv_router_nats_request(
505        &self,
506        subject: String,
507        payload: bytes::Bytes,
508        timeout: std::time::Duration,
509    ) -> anyhow::Result<async_nats::Message> {
510        let Some(nats_client) = self.nats_client.as_ref() else {
511            anyhow::bail!("KV router's request requires NATS");
512        };
513        let response =
514            tokio::time::timeout(timeout, nats_client.client().request(subject, payload))
515                .await
516                .map_err(|_| anyhow::anyhow!("Request timed out after {:?}", timeout))??;
517        Ok(response)
518    }
519
520    /// DEPRECATED: This method exists only for NATS request plane support.
521    /// Once everything uses the TCP request plane, this can be removed along with
522    /// the NATS service registration infrastructure.
523    ///
524    /// Returns a receiver that signals when the NATS service registration is complete.
525    /// The caller should use `blocking_recv()` to wait for completion.
526    pub fn register_nats_service(
527        &self,
528        component: Component,
529    ) -> tokio::sync::mpsc::Receiver<Result<(), String>> {
530        // Create a oneshot-style channel (capacity 1) to signal completion
531        let (tx, rx) = tokio::sync::mpsc::channel::<Result<(), String>>(1);
532
533        let drt = self.clone();
534        self.runtime().secondary().spawn(async move {
535            let service_name = component.service_name();
536
537            // Pre-check to save cost of creating the service, but don't hold the lock
538            if drt
539                .component_registry()
540                .inner
541                .lock()
542                .await
543                .services
544                .contains_key(&service_name)
545            {
546                // The NATS service is per component, but it is called from `serve_endpoint`, and there
547                // are often multiple endpoints for a component (e.g. `clear_kv_blocks` and `generate`).
548                tracing::trace!("Service {service_name} already exists");
549                // Signal success - service already exists
550                let _ = tx.send(Ok(())).await;
551                return;
552            }
553
554            let Some(nats_client) = drt.nats_client.as_ref() else {
555                tracing::error!("Cannot create NATS service without NATS.");
556                let _ = tx
557                    .send(Err("Cannot create NATS service without NATS".to_string()))
558                    .await;
559                return;
560            };
561            let description = None;
562            let nats_service = match crate::component::service::build_nats_service(
563                nats_client,
564                &component,
565                description,
566            )
567            .await
568            {
569                Ok(service) => service,
570                Err(err) => {
571                    tracing::error!(error = %err, component = service_name, "Failed to build NATS service");
572                    let _ = tx.send(Err(format!("Failed to build NATS service: {err}"))).await;
573                    return;
574                }
575            };
576
577            let mut guard = drt.component_registry().inner.lock().await;
578            if !guard.services.contains_key(&service_name) {
579                // Normal case
580                guard.services.insert(service_name.clone(), nats_service);
581
582                tracing::info!("Added NATS service {service_name}");
583
584                drop(guard);
585            } else {
586                drop(guard);
587                let _ = nats_service.stop().await;
588                // The NATS service is per component, but it is called from `serve_endpoint`, and there
589                // are often multiple endpoints for a component (e.g. `clear_kv_blocks` and `generate`).
590                // TODO: Is this still true?
591            }
592
593            // Signal completion - service registered successfully
594            let _ = tx.send(Ok(())).await;
595        });
596
597        rx
598    }
599}
600
601/// Selects which discovery backend to use and, for KV store backends, which KV store.
602#[derive(Clone, Debug)]
603pub enum DiscoveryBackend {
604    /// Use Kubernetes API for service discovery (no KV store needed)
605    Kubernetes,
606    /// Use a KV store (etcd, file, or memory) for service discovery
607    KvStore(kv::Selector),
608}
609
610impl DiscoveryBackend {
611    /// Returns true if this backend requires no external services (file or in-memory).
612    ///
613    /// Local backends do not need etcd, NATS, or any other infrastructure daemon.
614    /// This is used to drive smart defaults: for example, the event plane defaults to
615    /// ZMQ (not NATS) when a local backend is in use and `DYN_EVENT_PLANE` is not set.
616    pub fn is_local(&self) -> bool {
617        matches!(
618            self,
619            DiscoveryBackend::KvStore(kv::Selector::File(_))
620                | DiscoveryBackend::KvStore(kv::Selector::Memory)
621        )
622    }
623
624    /// Resolve the event transport kind for this backend.
625    ///
626    /// This is the single authoritative mapping of `(DYN_EVENT_PLANE, backend)` →
627    /// `EventTransportKind`. When `DYN_EVENT_PLANE` is unset or empty the backend
628    /// drives the default: local backends (`file`/`mem`) → ZMQ, distributed backends
629    /// (`etcd`/`kubernetes`) → NATS.
630    ///
631    /// Call this once at startup and store the result; do not call it repeatedly.
632    pub fn resolve_event_transport_kind(&self) -> crate::discovery::EventTransportKind {
633        use crate::config::environment_names::event_plane::DYN_EVENT_PLANE;
634        use crate::discovery::EventTransportKind;
635        match std::env::var(DYN_EVENT_PLANE).as_deref() {
636            Ok("nats") => EventTransportKind::Nats,
637            Ok("zmq") => EventTransportKind::Zmq,
638            // Unset or empty: derive from backend type.
639            Ok("") | Err(_) => {
640                if self.is_local() {
641                    EventTransportKind::Zmq
642                } else {
643                    EventTransportKind::Nats
644                }
645            }
646            Ok(other) => {
647                let default_kind = if self.is_local() {
648                    EventTransportKind::Zmq
649                } else {
650                    EventTransportKind::Nats
651                };
652                tracing::warn!(
653                    "Invalid DYN_EVENT_PLANE value '{}'. Valid values: 'nats', 'zmq'. \
654                     Defaulting to {:?}.",
655                    other,
656                    default_kind
657                );
658                default_kind
659            }
660        }
661    }
662}
663
664#[derive(Dissolve)]
665pub struct DistributedConfig {
666    pub discovery_backend: DiscoveryBackend,
667    pub nats_config: Option<nats::ClientOptions>,
668    pub request_plane: RequestPlaneMode,
669    /// Resolved event transport kind — computed once at config time from
670    /// `DYN_EVENT_PLANE` and the discovery backend, then stored on the runtime
671    /// so callers always get the same answer regardless of which other services
672    /// happen to be reachable.
673    pub event_transport_kind: crate::discovery::EventTransportKind,
674}
675
676impl DistributedConfig {
677    pub fn from_settings() -> DistributedConfig {
678        let request_plane = RequestPlaneMode::from_env();
679
680        // Determine the discovery backend first — we need it to compute the NATS default below.
681        // Valid values for DYN_DISCOVERY_BACKEND: "kubernetes", "etcd" (default), "file", "mem"
682        let backend_str =
683            std::env::var("DYN_DISCOVERY_BACKEND").unwrap_or_else(|_| "etcd".to_string());
684
685        let discovery_backend = match backend_str.as_str() {
686            "kubernetes" => {
687                tracing::info!("Using Kubernetes discovery backend");
688                DiscoveryBackend::Kubernetes
689            }
690            other => {
691                let selector: kv::Selector = other.parse().unwrap_or_else(|_| {
692                    panic!(
693                        "Unknown DYN_DISCOVERY_BACKEND value: '{other}'. \
694                         Valid options: kubernetes, etcd, file, mem"
695                    )
696                });
697                DiscoveryBackend::KvStore(selector)
698            }
699        };
700
701        // Resolve event transport kind once — the single source of truth used both to
702        // decide whether to open a NATS connection and to answer
703        // `DistributedRuntime::default_event_transport_kind()` later.
704        let event_transport_kind = discovery_backend.resolve_event_transport_kind();
705
706        // NATS is used for more than just NATS request-plane RPC:
707        // - KV router events (JetStream or NATS core + local indexer)
708        // - inter-router replica sync (NATS core)
709        //
710        // Enable the NATS client when any of these hold:
711        // 1. Request plane is NATS
712        // 2. NATS_SERVER is explicitly configured by the user
713        // 3. The resolved event transport kind is NATS
714        let nats_enabled = request_plane.is_nats()
715            || std::env::var(crate::config::environment_names::nats::NATS_SERVER).is_ok()
716            || matches!(
717                event_transport_kind,
718                crate::discovery::EventTransportKind::Nats
719            );
720
721        DistributedConfig {
722            discovery_backend,
723            nats_config: if nats_enabled {
724                Some(nats::ClientOptions::default())
725            } else {
726                None
727            },
728            request_plane,
729            event_transport_kind,
730        }
731    }
732
733    pub fn for_cli() -> DistributedConfig {
734        let etcd_config = etcd::ClientOptions {
735            attach_lease: false,
736            ..Default::default()
737        };
738        let request_plane = RequestPlaneMode::from_env();
739        let discovery_backend =
740            DiscoveryBackend::KvStore(kv::Selector::Etcd(Box::new(etcd_config)));
741        let event_transport_kind = discovery_backend.resolve_event_transport_kind();
742        let nats_enabled = request_plane.is_nats()
743            || std::env::var(crate::config::environment_names::nats::NATS_SERVER).is_ok()
744            || matches!(
745                event_transport_kind,
746                crate::discovery::EventTransportKind::Nats
747            );
748        DistributedConfig {
749            discovery_backend,
750            nats_config: if nats_enabled {
751                Some(nats::ClientOptions::default())
752            } else {
753                None
754            },
755            request_plane,
756            event_transport_kind,
757        }
758    }
759
760    /// A DistributedConfig that isn't distributed, for when the frontend and backend are in the
761    /// same process.
762    pub fn process_local() -> DistributedConfig {
763        DistributedConfig {
764            discovery_backend: DiscoveryBackend::KvStore(kv::Selector::Memory),
765            nats_config: None,
766            // This won't be used in process local, so we likely need a "none" option to
767            // communicate that and avoid opening the ports.
768            request_plane: RequestPlaneMode::Tcp,
769            event_transport_kind: crate::discovery::EventTransportKind::Zmq,
770        }
771    }
772}
773
774/// Request plane transport mode configuration
775///
776/// This determines how requests are distributed from routers to workers:
777/// - `Nats`: Use NATS for request distribution (legacy)
778/// - `Http`: Use HTTP/2 for request distribution
779/// - `Tcp`: Use raw TCP for request distribution with msgpack support (default)
780#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
781pub enum RequestPlaneMode {
782    /// Use NATS for request plane
783    Nats,
784    /// Use HTTP/2 for request plane
785    Http,
786    /// Use raw TCP for request plane with msgpack support
787    #[default]
788    Tcp,
789}
790
791impl fmt::Display for RequestPlaneMode {
792    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
793        match self {
794            Self::Nats => write!(f, "nats"),
795            Self::Http => write!(f, "http"),
796            Self::Tcp => write!(f, "tcp"),
797        }
798    }
799}
800
801impl std::str::FromStr for RequestPlaneMode {
802    type Err = anyhow::Error;
803
804    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
805        match s.to_lowercase().as_str() {
806            "nats" => Ok(Self::Nats),
807            "http" => Ok(Self::Http),
808            "tcp" => Ok(Self::Tcp),
809            _ => Err(anyhow::anyhow!(
810                "Invalid request plane mode: '{}'. Valid options are: 'nats', 'http', 'tcp'",
811                s
812            )),
813        }
814    }
815}
816
817impl RequestPlaneMode {
818    /// Get the request plane mode from environment variable (uncached)
819    /// Reads from `DYN_REQUEST_PLANE` environment variable.
820    fn from_env() -> Self {
821        std::env::var("DYN_REQUEST_PLANE")
822            .ok()
823            .and_then(|s| s.parse().ok())
824            .unwrap_or_default()
825    }
826
827    pub fn is_nats(&self) -> bool {
828        matches!(self, RequestPlaneMode::Nats)
829    }
830}
831
832pub mod distributed_test_utils {
833    //! Common test helper functions for DistributedRuntime tests
834
835    /// Helper function to create a DRT instance for integration-only tests.
836    /// Uses from_current to leverage existing tokio runtime
837    /// Note: Settings are read from environment variables inside DistributedRuntime::from_settings
838    #[cfg(feature = "integration")]
839    pub async fn create_test_drt_async() -> super::DistributedRuntime {
840        use crate::transports::nats;
841
842        let rt = crate::Runtime::from_current().unwrap();
843        let config = super::DistributedConfig {
844            discovery_backend: super::DiscoveryBackend::KvStore(
845                crate::storage::kv::Selector::Memory,
846            ),
847            nats_config: Some(nats::ClientOptions::default()),
848            request_plane: crate::distributed::RequestPlaneMode::default(),
849            event_transport_kind: crate::discovery::EventTransportKind::Nats,
850        };
851        super::DistributedRuntime::new(rt, config).await.unwrap()
852    }
853
854    /// Helper function to create a DRT instance which points at
855    /// a (shared) file-backed KV store and ephemeral NATS transport so that
856    /// multiple DRT instances may observe the same registration state.
857    /// NOTE: This gets around the fact that create_test_drt_async() is
858    /// hardcoded to spin up a memory-backed discovery store
859    /// which means we can't share discovery state across runtimes.
860    pub async fn create_test_shared_drt_async(
861        store_path: &std::path::Path,
862    ) -> super::DistributedRuntime {
863        use crate::transports::nats;
864
865        let rt = crate::Runtime::from_current().unwrap();
866        let config = super::DistributedConfig {
867            discovery_backend: super::DiscoveryBackend::KvStore(
868                crate::storage::kv::Selector::File(store_path.to_path_buf()),
869            ),
870            nats_config: Some(nats::ClientOptions::default()),
871            request_plane: crate::distributed::RequestPlaneMode::default(),
872            event_transport_kind: crate::discovery::EventTransportKind::Nats,
873        };
874        super::DistributedRuntime::new(rt, config).await.unwrap()
875    }
876}
877
878#[cfg(all(test, feature = "integration"))]
879mod tests {
880    use super::RequestPlaneMode;
881    use super::distributed_test_utils::create_test_drt_async;
882
883    #[tokio::test]
884    async fn test_drt_uptime_after_delay_system_disabled() {
885        use crate::config::environment_names::runtime::system as env_system;
886        // Test uptime with system status server disabled
887        temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
888            // Start a DRT
889            let drt = create_test_drt_async().await;
890
891            // Wait 50ms
892            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
893
894            // Check that uptime is 50+ ms
895            let uptime = drt.system_health.lock().uptime();
896            assert!(
897                uptime >= std::time::Duration::from_millis(50),
898                "Expected uptime to be at least 50ms, but got {:?}",
899                uptime
900            );
901
902            println!(
903                "✓ DRT uptime test passed (system disabled): uptime = {:?}",
904                uptime
905            );
906        })
907        .await;
908    }
909
910    #[tokio::test]
911    async fn test_drt_uptime_after_delay_system_enabled() {
912        use crate::config::environment_names::runtime::system as env_system;
913        // Test uptime with system status server enabled
914        temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, Some("8081"))], async {
915            // Start a DRT
916            let drt = create_test_drt_async().await;
917
918            // Wait 50ms
919            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
920
921            // Check that uptime is 50+ ms
922            let uptime = drt.system_health.lock().uptime();
923            assert!(
924                uptime >= std::time::Duration::from_millis(50),
925                "Expected uptime to be at least 50ms, but got {:?}",
926                uptime
927            );
928
929            println!(
930                "✓ DRT uptime test passed (system enabled): uptime = {:?}",
931                uptime
932            );
933        })
934        .await;
935    }
936
937    #[test]
938    fn test_request_plane_mode_from_str() {
939        assert_eq!(
940            "nats".parse::<RequestPlaneMode>().unwrap(),
941            RequestPlaneMode::Nats
942        );
943        assert_eq!(
944            "http".parse::<RequestPlaneMode>().unwrap(),
945            RequestPlaneMode::Http
946        );
947        assert_eq!(
948            "tcp".parse::<RequestPlaneMode>().unwrap(),
949            RequestPlaneMode::Tcp
950        );
951        assert_eq!(
952            "NATS".parse::<RequestPlaneMode>().unwrap(),
953            RequestPlaneMode::Nats
954        );
955        assert_eq!(
956            "HTTP".parse::<RequestPlaneMode>().unwrap(),
957            RequestPlaneMode::Http
958        );
959        assert_eq!(
960            "TCP".parse::<RequestPlaneMode>().unwrap(),
961            RequestPlaneMode::Tcp
962        );
963        assert!("invalid".parse::<RequestPlaneMode>().is_err());
964    }
965
966    #[test]
967    fn test_request_plane_mode_display() {
968        assert_eq!(RequestPlaneMode::Nats.to_string(), "nats");
969        assert_eq!(RequestPlaneMode::Http.to_string(), "http");
970        assert_eq!(RequestPlaneMode::Tcp.to_string(), "tcp");
971    }
972}