Skip to main content

dynamo_runtime/
distributed.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::component::{
5    self, Component, ComponentBuilder, Endpoint, Instance, Namespace, RoutingOccupancyState,
6};
7use crate::config::environment_names::tcp_response_stream;
8use crate::pipeline::PipelineError;
9use crate::pipeline::network::manager::NetworkManager;
10use crate::service::{ServiceClient, ServiceSet};
11use crate::storage::kv;
12use crate::{discovery, system_status_server, transports};
13use crate::{
14    discovery::Discovery,
15    metrics::PrometheusUpdateCallback,
16    metrics::{MetricsHierarchy, MetricsRegistry},
17    transports::{etcd, nats, tcp},
18};
19
20use super::utils::GracefulShutdownTracker;
21use crate::SystemHealth;
22use crate::runtime::Runtime;
23
24// Used instead of std::cell::OnceCell because get_or_try_init there is nightly
25use async_once_cell::OnceCell;
26
27use std::fmt;
28use std::sync::{Arc, OnceLock, Weak};
29use std::time::Duration;
30use tokio::sync::watch::Receiver;
31
32use anyhow::Result;
33use derive_getters::Dissolve;
34use figment::error;
35use std::collections::HashMap;
36use tokio::sync::Mutex;
37use tokio_util::sync::CancellationToken;
38
39type InstanceMap = HashMap<Endpoint, Weak<Receiver<Vec<Instance>>>>;
40type RoutingOccupancyMap = HashMap<Endpoint, Weak<RoutingOccupancyState>>;
41
42/// Distributed [Runtime] which provides access to shared resources across the cluster, this includes
43/// communication protocols and transports.
44#[derive(Clone)]
45pub struct DistributedRuntime {
46    // local runtime
47    runtime: Runtime,
48
49    nats_client: Option<transports::nats::Client>,
50    network_manager: Arc<NetworkManager>,
51    tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>,
52    system_status_server: Arc<OnceLock<Arc<system_status_server::SystemStatusServerInfo>>>,
53    request_plane: RequestPlaneMode,
54
55    // Service discovery client
56    discovery_client: Arc<dyn discovery::Discovery>,
57
58    // Discovery metadata (only used for Kubernetes backend)
59    // Shared with system status server to expose via /metadata endpoint
60    discovery_metadata: Option<Arc<tokio::sync::RwLock<discovery::DiscoveryMetadata>>>,
61
62    // local registry for components
63    // the registry allows us to use share runtime resources across instances of the same component object.
64    // take for example two instances of a client to the same remote component. The registry allows us to use
65    // a single endpoint watcher for both clients, this keeps the number background tasking watching specific
66    // paths in etcd to a minimum.
67    component_registry: component::Registry,
68
69    instance_sources: Arc<tokio::sync::Mutex<InstanceMap>>,
70    routing_occupancy_states: Arc<tokio::sync::Mutex<RoutingOccupancyMap>>,
71
72    // Health Status
73    system_health: Arc<parking_lot::Mutex<SystemHealth>>,
74
75    // Local endpoint registry for in-process calls
76    local_endpoint_registry: crate::local_endpoint_registry::LocalEndpointRegistry,
77
78    // This hierarchy's own metrics registry
79    metrics_registry: MetricsRegistry,
80
81    // Registry for /engine/* route callbacks
82    engine_routes: crate::engine_routes::EngineRouteRegistry,
83
84    // Resolved event transport kind — set once at construction time from
85    // DYN_EVENT_PLANE + discovery backend; returned by default_event_transport_kind().
86    event_transport_kind: crate::discovery::EventTransportKind,
87}
88
89impl MetricsHierarchy for DistributedRuntime {
90    fn basename(&self) -> String {
91        "".to_string() // drt has no basename. Basename only begins with the Namespace.
92    }
93
94    fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
95        vec![] // drt is the root, so no parent hierarchies
96    }
97
98    fn get_metrics_registry(&self) -> &MetricsRegistry {
99        &self.metrics_registry
100    }
101
102    fn connection_id(&self) -> Option<u64> {
103        Some(self.discovery_client.instance_id())
104    }
105}
106
107impl std::fmt::Debug for DistributedRuntime {
108    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109        write!(f, "DistributedRuntime")
110    }
111}
112
113impl DistributedRuntime {
114    pub async fn new(runtime: Runtime, config: DistributedConfig) -> Result<Self> {
115        let (discovery_backend, nats_config, request_plane, event_transport_kind) =
116            config.dissolve();
117
118        let nats_client = match nats_config {
119            Some(nc) => Some(nc.connect().await?),
120            None => None,
121        };
122
123        // Start system status server for health and metrics if enabled in configuration
124        let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default();
125        // IMPORTANT: We must extract cancel_token from runtime BEFORE moving runtime into the struct below.
126        // This is because after moving, runtime is no longer accessible in this scope (ownership rules).
127        let cancel_token = if config.system_server_enabled() {
128            Some(runtime.clone().child_token())
129        } else {
130            None
131        };
132        let starting_health_status = config.starting_health_status.clone();
133        let use_endpoint_health_status = config.use_endpoint_health_status.clone();
134        let health_endpoint_path = config.system_health_path.clone();
135        let live_endpoint_path = config.system_live_path.clone();
136        let system_health = Arc::new(parking_lot::Mutex::new(SystemHealth::new(
137            starting_health_status,
138            use_endpoint_health_status,
139            health_endpoint_path,
140            live_endpoint_path,
141        )));
142
143        // Initialize discovery client based on backend configuration
144        let (discovery_client, discovery_metadata) = match discovery_backend {
145            DiscoveryBackend::Kubernetes => {
146                tracing::info!("Initializing Kubernetes discovery backend");
147                let metadata = Arc::new(tokio::sync::RwLock::new(
148                    crate::discovery::DiscoveryMetadata::new(),
149                ));
150                let client = crate::discovery::KubeDiscoveryClient::new(
151                    metadata.clone(),
152                    runtime.primary_token(),
153                )
154                .await
155                .inspect_err(
156                    |err| tracing::error!(%err, "Failed to initialize Kubernetes discovery client"),
157                )?;
158                (Arc::new(client) as Arc<dyn Discovery>, Some(metadata))
159            }
160            DiscoveryBackend::KvStore(kv_selector) => {
161                tracing::info!("Initializing KV store discovery backend: {kv_selector}");
162                let runtime_clone = runtime.clone();
163                let store = match kv_selector {
164                    kv::Selector::Etcd(etcd_config) => {
165                        let etcd_client = etcd::Client::new(*etcd_config, runtime_clone).await.inspect_err(|err|
166                            tracing::error!(%err, "Could not connect to etcd. Pass `--discovery-backend ..` to use a different backend or start etcd."))?;
167                        kv::Manager::etcd(etcd_client)
168                    }
169                    kv::Selector::File(root) => kv::Manager::file(runtime.primary_token(), root),
170                    kv::Selector::Memory => kv::Manager::memory(),
171                };
172                use crate::discovery::KVStoreDiscovery;
173                (
174                    Arc::new(KVStoreDiscovery::new(store, runtime.primary_token()))
175                        as Arc<dyn Discovery>,
176                    None,
177                )
178            }
179        };
180
181        let component_registry = component::Registry::new();
182
183        // NetworkManager for request plane
184        let network_manager = NetworkManager::new(
185            runtime.child_token(),
186            nats_client.clone().map(|c| c.client().clone()),
187            component_registry.clone(),
188            request_plane,
189        );
190
191        let distributed_runtime = Self {
192            runtime,
193            network_manager: Arc::new(network_manager),
194            nats_client,
195            tcp_server: Arc::new(OnceCell::new()),
196            system_status_server: Arc::new(OnceLock::new()),
197            discovery_client,
198            discovery_metadata,
199            component_registry,
200            instance_sources: Arc::new(Mutex::new(HashMap::new())),
201            routing_occupancy_states: Arc::new(Mutex::new(HashMap::new())),
202            metrics_registry: crate::MetricsRegistry::new(),
203            system_health,
204            request_plane,
205            local_endpoint_registry: crate::local_endpoint_registry::LocalEndpointRegistry::new(),
206            engine_routes: crate::engine_routes::EngineRouteRegistry::new(),
207            event_transport_kind,
208        };
209
210        // Initialize the uptime gauge in SystemHealth
211        distributed_runtime
212            .system_health
213            .lock()
214            .initialize_uptime_gauge(&distributed_runtime)?;
215
216        // Register an update callback so the uptime gauge is refreshed before
217        // every Prometheus scrape (both system status server and frontend).
218        {
219            let system_health = distributed_runtime.system_health.clone();
220            distributed_runtime
221                .metrics_registry
222                .add_update_callback(std::sync::Arc::new(move || {
223                    system_health.lock().update_uptime_gauge();
224                    Ok(())
225                }));
226        }
227
228        // Handle system status server initialization
229        if let Some(cancel_token) = cancel_token {
230            // System server is enabled - start both the state and HTTP server
231            let host = config.system_host.clone();
232            let port = config.system_port as u16;
233
234            // Start system status server (it creates SystemStatusState internally)
235            match crate::system_status_server::spawn_system_status_server(
236                &host,
237                port,
238                cancel_token,
239                Arc::new(distributed_runtime.clone()),
240                distributed_runtime.discovery_metadata.clone(),
241            )
242            .await
243            {
244                Ok((addr, handle)) => {
245                    tracing::info!("System status server started successfully on {addr}");
246
247                    // Store system status server information
248                    let system_status_server_info =
249                        crate::system_status_server::SystemStatusServerInfo::new(
250                            addr,
251                            Some(handle),
252                        );
253
254                    // Initialize the system_status_server field
255                    distributed_runtime
256                        .system_status_server
257                        .set(Arc::new(system_status_server_info))
258                        .expect("System status server info should only be set once");
259                }
260                Err(e) => {
261                    tracing::error!("System status server startup failed: {e}");
262                }
263            }
264        } else {
265            // System server HTTP is disabled, but uptime metrics are still being tracked via SystemHealth
266            tracing::debug!(
267                "System status server HTTP endpoints disabled, but uptime metrics are being tracked"
268            );
269        }
270
271        // Start health check manager if enabled
272        if config.health_check_enabled {
273            let health_check_config = crate::health_check::HealthCheckConfig {
274                canary_wait_time: std::time::Duration::from_secs(config.canary_wait_time_secs),
275                request_timeout: std::time::Duration::from_secs(
276                    config.health_check_request_timeout_secs,
277                ),
278            };
279
280            // Start the health check manager (spawns per-endpoint monitoring tasks)
281            match crate::health_check::start_health_check_manager(
282                distributed_runtime.clone(),
283                Some(health_check_config),
284            )
285            .await
286            {
287                Ok(()) => tracing::info!(
288                    "Health check manager started (canary_wait_time: {}s, request_timeout: {}s)",
289                    config.canary_wait_time_secs,
290                    config.health_check_request_timeout_secs
291                ),
292                Err(e) => tracing::error!("Health check manager failed to start: {e}"),
293            }
294        }
295
296        Ok(distributed_runtime)
297    }
298
299    pub async fn from_settings(runtime: Runtime) -> Result<Self> {
300        let config = DistributedConfig::from_settings();
301        Self::new(runtime, config).await
302    }
303
304    pub fn runtime(&self) -> &Runtime {
305        &self.runtime
306    }
307
308    pub fn primary_token(&self) -> CancellationToken {
309        self.runtime.primary_token()
310    }
311
312    // TODO: Don't hand out pointers, instead have methods to use the registry in friendly ways
313    // (without being aware of async locks and so on)
314    pub fn component_registry(&self) -> &component::Registry {
315        &self.component_registry
316    }
317
318    // TODO: Don't hand out pointers, instead provide system health related services.
319    pub fn system_health(&self) -> Arc<parking_lot::Mutex<SystemHealth>> {
320        self.system_health.clone()
321    }
322
323    /// Get the local endpoint registry for in-process endpoint calls
324    pub fn local_endpoint_registry(
325        &self,
326    ) -> &crate::local_endpoint_registry::LocalEndpointRegistry {
327        &self.local_endpoint_registry
328    }
329
330    /// Get the engine route registry for registering custom /engine/* routes
331    pub fn engine_routes(&self) -> &crate::engine_routes::EngineRouteRegistry {
332        &self.engine_routes
333    }
334
335    pub fn connection_id(&self) -> u64 {
336        self.discovery_client.instance_id()
337    }
338
339    pub fn shutdown(&self) {
340        self.runtime.shutdown();
341        self.discovery_client.shutdown();
342    }
343
344    /// Create a [`Namespace`]
345    pub fn namespace(&self, name: impl Into<String>) -> Result<Namespace> {
346        Namespace::new(self.clone(), name.into())
347    }
348
349    /// Returns the discovery interface for service registration and discovery
350    pub fn discovery(&self) -> Arc<dyn Discovery> {
351        self.discovery_client.clone()
352    }
353
354    pub async fn tcp_server(&self) -> Result<Arc<tcp::server::TcpStreamServer>> {
355        Ok(self
356            .tcp_server
357            .get_or_try_init(async move {
358                let port = match std::env::var(tcp_response_stream::DYN_TCP_RESPONSE_STREAM_PORT) {
359                    Ok(p) => p.parse::<u16>().map_err(|_| {
360                        PipelineError::Generic(format!(
361                            "invalid {}: '{}' is not a valid port number",
362                            tcp_response_stream::DYN_TCP_RESPONSE_STREAM_PORT,
363                            p
364                        ))
365                    })?,
366                    Err(_) => 0,
367                };
368                let interface = std::env::var(tcp_response_stream::DYN_TCP_RESPONSE_STREAM_HOST)
369                    .ok()
370                    .filter(|h| !h.is_empty());
371
372                let host_suffix = interface
373                    .as_ref()
374                    .map_or(String::new(), |h| format!(" on host {h}"));
375                if port == 0 {
376                    tracing::info!(
377                        "TCP response stream server using OS-assigned port{host_suffix}"
378                    );
379                } else {
380                    tracing::info!(
381                        "TCP response stream server using fixed port {port}{host_suffix}"
382                    );
383                }
384
385                let options = tcp::server::ServerOptions { port, interface };
386                let server = tcp::server::TcpStreamServer::new(options).await?;
387                Ok::<_, PipelineError>(server)
388            })
389            .await?
390            .clone())
391    }
392
393    /// Get the network manager
394    ///
395    /// The network manager consolidates all network configuration and provides
396    /// unified access to request plane servers and clients.
397    pub fn network_manager(&self) -> Arc<NetworkManager> {
398        self.network_manager.clone()
399    }
400
401    /// Get the request plane server (convenience method)
402    ///
403    /// This is a shortcut for `network_manager().await?.server().await`.
404    pub async fn request_plane_server(
405        &self,
406    ) -> Result<Arc<dyn crate::pipeline::network::ingress::unified_server::RequestPlaneServer>>
407    {
408        self.network_manager().server().await
409    }
410
411    /// Get system status server information if available
412    pub fn system_status_server_info(
413        &self,
414    ) -> Option<Arc<crate::system_status_server::SystemStatusServerInfo>> {
415        self.system_status_server.get().cloned()
416    }
417
418    /// How the frontend should talk to the backend.
419    pub fn request_plane(&self) -> RequestPlaneMode {
420        self.request_plane
421    }
422
423    /// Returns the event transport kind this runtime was configured with.
424    ///
425    /// The value is resolved once at construction time by `DiscoveryBackend::resolve_event_transport_kind`:
426    /// if `DYN_EVENT_PLANE` is set explicitly that value wins; otherwise the discovery
427    /// backend drives the default (ZMQ for `file`/`mem`, NATS for `etcd`/`kubernetes`).
428    ///
429    /// Use this instead of [`EventTransportKind::from_env_or_default`] wherever you have
430    /// access to a `DistributedRuntime`, so that local-only workflows work without
431    /// setting `DYN_EVENT_PLANE` explicitly.
432    pub fn default_event_transport_kind(&self) -> crate::discovery::EventTransportKind {
433        self.event_transport_kind
434    }
435
436    pub fn child_token(&self) -> CancellationToken {
437        self.runtime.child_token()
438    }
439
440    pub(crate) fn graceful_shutdown_tracker(&self) -> Arc<GracefulShutdownTracker> {
441        self.runtime.graceful_shutdown_tracker()
442    }
443
444    pub fn instance_sources(&self) -> Arc<Mutex<InstanceMap>> {
445        self.instance_sources.clone()
446    }
447
448    pub(crate) fn routing_occupancy_states(&self) -> Arc<Mutex<RoutingOccupancyMap>> {
449        self.routing_occupancy_states.clone()
450    }
451
452    /// TODO: This is a temporary KV router measure for component/component.rs EventPublisher impl for
453    /// Component, to allow it to publish to NATS. KV Router is the only user.
454    ///
455    /// When NATS is not available (e.g., running in approximate mode with --no-kv-events),
456    /// this function returns Ok(()) silently since publishing is optional in that mode.
457    pub async fn kv_router_nats_publish(
458        &self,
459        subject: String,
460        payload: bytes::Bytes,
461    ) -> anyhow::Result<()> {
462        let Some(nats_client) = self.nats_client.as_ref() else {
463            // NATS not available - this is expected in approximate mode (--no-kv-events)
464            tracing::trace!("Skipping NATS publish (NATS not configured): {subject}");
465            return Ok(());
466        };
467        Ok(nats_client.client().publish(subject, payload).await?)
468    }
469
470    /// TODO: This is a temporary KV router measure for component/component.rs EventSubscriber impl for
471    /// Component, to allow it to subscribe to NATS. KV Router is the only user.
472    pub(crate) async fn kv_router_nats_subscribe(
473        &self,
474        subject: String,
475    ) -> Result<async_nats::Subscriber> {
476        let Some(nats_client) = self.nats_client.as_ref() else {
477            anyhow::bail!("KV router's EventSubscriber requires NATS");
478        };
479        Ok(nats_client.client().subscribe(subject).await?)
480    }
481
482    /// TODO (karenc): This is a temporary KV router measure for worker query requests.
483    /// Allows KV Router to perform request/reply with workers. (versus the pub/sub pattern above)
484    /// KV Router is the only user, made public for use in dynamo-llm crate
485    pub async fn kv_router_nats_request(
486        &self,
487        subject: String,
488        payload: bytes::Bytes,
489        timeout: std::time::Duration,
490    ) -> anyhow::Result<async_nats::Message> {
491        let Some(nats_client) = self.nats_client.as_ref() else {
492            anyhow::bail!("KV router's request requires NATS");
493        };
494        let response =
495            tokio::time::timeout(timeout, nats_client.client().request(subject, payload))
496                .await
497                .map_err(|_| anyhow::anyhow!("Request timed out after {:?}", timeout))??;
498        Ok(response)
499    }
500
501    /// DEPRECATED: This method exists only for NATS request plane support.
502    /// Once everything uses the TCP request plane, this can be removed along with
503    /// the NATS service registration infrastructure.
504    ///
505    /// Returns a receiver that signals when the NATS service registration is complete.
506    /// The caller should use `blocking_recv()` to wait for completion.
507    pub fn register_nats_service(
508        &self,
509        component: Component,
510    ) -> tokio::sync::mpsc::Receiver<Result<(), String>> {
511        // Create a oneshot-style channel (capacity 1) to signal completion
512        let (tx, rx) = tokio::sync::mpsc::channel::<Result<(), String>>(1);
513
514        let drt = self.clone();
515        self.runtime().secondary().spawn(async move {
516            let service_name = component.service_name();
517
518            // Pre-check to save cost of creating the service, but don't hold the lock
519            if drt
520                .component_registry()
521                .inner
522                .lock()
523                .await
524                .services
525                .contains_key(&service_name)
526            {
527                // The NATS service is per component, but it is called from `serve_endpoint`, and there
528                // are often multiple endpoints for a component (e.g. `clear_kv_blocks` and `generate`).
529                tracing::trace!("Service {service_name} already exists");
530                // Signal success - service already exists
531                let _ = tx.send(Ok(())).await;
532                return;
533            }
534
535            let Some(nats_client) = drt.nats_client.as_ref() else {
536                tracing::error!("Cannot create NATS service without NATS.");
537                let _ = tx
538                    .send(Err("Cannot create NATS service without NATS".to_string()))
539                    .await;
540                return;
541            };
542            let description = None;
543            let nats_service = match crate::component::service::build_nats_service(
544                nats_client,
545                &component,
546                description,
547            )
548            .await
549            {
550                Ok(service) => service,
551                Err(err) => {
552                    tracing::error!(error = %err, component = service_name, "Failed to build NATS service");
553                    let _ = tx.send(Err(format!("Failed to build NATS service: {err}"))).await;
554                    return;
555                }
556            };
557
558            let mut guard = drt.component_registry().inner.lock().await;
559            if !guard.services.contains_key(&service_name) {
560                // Normal case
561                guard.services.insert(service_name.clone(), nats_service);
562
563                tracing::info!("Added NATS service {service_name}");
564
565                drop(guard);
566            } else {
567                drop(guard);
568                let _ = nats_service.stop().await;
569                // The NATS service is per component, but it is called from `serve_endpoint`, and there
570                // are often multiple endpoints for a component (e.g. `clear_kv_blocks` and `generate`).
571                // TODO: Is this still true?
572            }
573
574            // Signal completion - service registered successfully
575            let _ = tx.send(Ok(())).await;
576        });
577
578        rx
579    }
580}
581
582/// Selects which discovery backend to use and, for KV store backends, which KV store.
583#[derive(Clone, Debug)]
584pub enum DiscoveryBackend {
585    /// Use Kubernetes API for service discovery (no KV store needed)
586    Kubernetes,
587    /// Use a KV store (etcd, file, or memory) for service discovery
588    KvStore(kv::Selector),
589}
590
591impl DiscoveryBackend {
592    /// Returns true if this backend requires no external services (file or in-memory).
593    ///
594    /// Local backends do not need etcd, NATS, or any other infrastructure daemon.
595    /// This is used to drive smart defaults: for example, the event plane defaults to
596    /// ZMQ (not NATS) when a local backend is in use and `DYN_EVENT_PLANE` is not set.
597    pub fn is_local(&self) -> bool {
598        matches!(
599            self,
600            DiscoveryBackend::KvStore(kv::Selector::File(_))
601                | DiscoveryBackend::KvStore(kv::Selector::Memory)
602        )
603    }
604
605    /// Resolve the event transport kind for this backend.
606    ///
607    /// This is the single authoritative mapping of `(DYN_EVENT_PLANE, backend)` →
608    /// `EventTransportKind`. When `DYN_EVENT_PLANE` is unset or empty the backend
609    /// drives the default: local backends (`file`/`mem`) → ZMQ, distributed backends
610    /// (`etcd`/`kubernetes`) → NATS.
611    ///
612    /// Call this once at startup and store the result; do not call it repeatedly.
613    pub fn resolve_event_transport_kind(&self) -> crate::discovery::EventTransportKind {
614        use crate::config::environment_names::event_plane::DYN_EVENT_PLANE;
615        use crate::discovery::EventTransportKind;
616        match std::env::var(DYN_EVENT_PLANE).as_deref() {
617            Ok("nats") => EventTransportKind::Nats,
618            Ok("zmq") => EventTransportKind::Zmq,
619            // Unset or empty: derive from backend type.
620            Ok("") | Err(_) => {
621                if self.is_local() {
622                    EventTransportKind::Zmq
623                } else {
624                    EventTransportKind::Nats
625                }
626            }
627            Ok(other) => {
628                let default_kind = if self.is_local() {
629                    EventTransportKind::Zmq
630                } else {
631                    EventTransportKind::Nats
632                };
633                tracing::warn!(
634                    "Invalid DYN_EVENT_PLANE value '{}'. Valid values: 'nats', 'zmq'. \
635                     Defaulting to {:?}.",
636                    other,
637                    default_kind
638                );
639                default_kind
640            }
641        }
642    }
643}
644
645#[derive(Dissolve)]
646pub struct DistributedConfig {
647    pub discovery_backend: DiscoveryBackend,
648    pub nats_config: Option<nats::ClientOptions>,
649    pub request_plane: RequestPlaneMode,
650    /// Resolved event transport kind — computed once at config time from
651    /// `DYN_EVENT_PLANE` and the discovery backend, then stored on the runtime
652    /// so callers always get the same answer regardless of which other services
653    /// happen to be reachable.
654    pub event_transport_kind: crate::discovery::EventTransportKind,
655}
656
657impl DistributedConfig {
658    pub fn from_settings() -> DistributedConfig {
659        let request_plane = RequestPlaneMode::from_env();
660
661        // Determine the discovery backend first — we need it to compute the NATS default below.
662        // Valid values for DYN_DISCOVERY_BACKEND: "kubernetes", "etcd" (default), "file", "mem"
663        let backend_str =
664            std::env::var("DYN_DISCOVERY_BACKEND").unwrap_or_else(|_| "etcd".to_string());
665
666        let discovery_backend = match backend_str.as_str() {
667            "kubernetes" => {
668                tracing::info!("Using Kubernetes discovery backend");
669                DiscoveryBackend::Kubernetes
670            }
671            other => {
672                let selector: kv::Selector = other.parse().unwrap_or_else(|_| {
673                    panic!(
674                        "Unknown DYN_DISCOVERY_BACKEND value: '{other}'. \
675                         Valid options: kubernetes, etcd, file, mem"
676                    )
677                });
678                DiscoveryBackend::KvStore(selector)
679            }
680        };
681
682        // Resolve event transport kind once — the single source of truth used both to
683        // decide whether to open a NATS connection and to answer
684        // `DistributedRuntime::default_event_transport_kind()` later.
685        let event_transport_kind = discovery_backend.resolve_event_transport_kind();
686
687        // NATS is used for more than just NATS request-plane RPC:
688        // - KV router events (JetStream or NATS core + local indexer)
689        // - inter-router replica sync (NATS core)
690        //
691        // Enable the NATS client when any of these hold:
692        // 1. Request plane is NATS
693        // 2. NATS_SERVER is explicitly configured by the user
694        // 3. The resolved event transport kind is NATS
695        let nats_enabled = request_plane.is_nats()
696            || std::env::var(crate::config::environment_names::nats::NATS_SERVER).is_ok()
697            || matches!(
698                event_transport_kind,
699                crate::discovery::EventTransportKind::Nats
700            );
701
702        DistributedConfig {
703            discovery_backend,
704            nats_config: if nats_enabled {
705                Some(nats::ClientOptions::default())
706            } else {
707                None
708            },
709            request_plane,
710            event_transport_kind,
711        }
712    }
713
714    pub fn for_cli() -> DistributedConfig {
715        let etcd_config = etcd::ClientOptions {
716            attach_lease: false,
717            ..Default::default()
718        };
719        let request_plane = RequestPlaneMode::from_env();
720        let discovery_backend =
721            DiscoveryBackend::KvStore(kv::Selector::Etcd(Box::new(etcd_config)));
722        let event_transport_kind = discovery_backend.resolve_event_transport_kind();
723        let nats_enabled = request_plane.is_nats()
724            || std::env::var(crate::config::environment_names::nats::NATS_SERVER).is_ok()
725            || matches!(
726                event_transport_kind,
727                crate::discovery::EventTransportKind::Nats
728            );
729        DistributedConfig {
730            discovery_backend,
731            nats_config: if nats_enabled {
732                Some(nats::ClientOptions::default())
733            } else {
734                None
735            },
736            request_plane,
737            event_transport_kind,
738        }
739    }
740
741    /// A DistributedConfig that isn't distributed, for when the frontend and backend are in the
742    /// same process.
743    pub fn process_local() -> DistributedConfig {
744        DistributedConfig {
745            discovery_backend: DiscoveryBackend::KvStore(kv::Selector::Memory),
746            nats_config: None,
747            // This won't be used in process local, so we likely need a "none" option to
748            // communicate that and avoid opening the ports.
749            request_plane: RequestPlaneMode::Tcp,
750            event_transport_kind: crate::discovery::EventTransportKind::Zmq,
751        }
752    }
753}
754
755/// Request plane transport mode configuration
756///
757/// This determines how requests are distributed from routers to workers:
758/// - `Nats`: Use NATS for request distribution (legacy)
759/// - `Http`: Use HTTP/2 for request distribution
760/// - `Tcp`: Use raw TCP for request distribution with msgpack support (default)
761#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
762pub enum RequestPlaneMode {
763    /// Use NATS for request plane
764    Nats,
765    /// Use HTTP/2 for request plane
766    Http,
767    /// Use raw TCP for request plane with msgpack support
768    #[default]
769    Tcp,
770}
771
772impl fmt::Display for RequestPlaneMode {
773    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
774        match self {
775            Self::Nats => write!(f, "nats"),
776            Self::Http => write!(f, "http"),
777            Self::Tcp => write!(f, "tcp"),
778        }
779    }
780}
781
782impl std::str::FromStr for RequestPlaneMode {
783    type Err = anyhow::Error;
784
785    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
786        match s.to_lowercase().as_str() {
787            "nats" => Ok(Self::Nats),
788            "http" => Ok(Self::Http),
789            "tcp" => Ok(Self::Tcp),
790            _ => Err(anyhow::anyhow!(
791                "Invalid request plane mode: '{}'. Valid options are: 'nats', 'http', 'tcp'",
792                s
793            )),
794        }
795    }
796}
797
798impl RequestPlaneMode {
799    /// Get the request plane mode from environment variable (uncached)
800    /// Reads from `DYN_REQUEST_PLANE` environment variable.
801    fn from_env() -> Self {
802        std::env::var("DYN_REQUEST_PLANE")
803            .ok()
804            .and_then(|s| s.parse().ok())
805            .unwrap_or_default()
806    }
807
808    pub fn is_nats(&self) -> bool {
809        matches!(self, RequestPlaneMode::Nats)
810    }
811}
812
813pub mod distributed_test_utils {
814    //! Common test helper functions for DistributedRuntime tests
815
816    /// Helper function to create a DRT instance for integration-only tests.
817    /// Uses from_current to leverage existing tokio runtime
818    /// Note: Settings are read from environment variables inside DistributedRuntime::from_settings
819    #[cfg(feature = "integration")]
820    pub async fn create_test_drt_async() -> super::DistributedRuntime {
821        use crate::transports::nats;
822
823        let rt = crate::Runtime::from_current().unwrap();
824        let config = super::DistributedConfig {
825            discovery_backend: super::DiscoveryBackend::KvStore(
826                crate::storage::kv::Selector::Memory,
827            ),
828            nats_config: Some(nats::ClientOptions::default()),
829            request_plane: crate::distributed::RequestPlaneMode::default(),
830            event_transport_kind: crate::discovery::EventTransportKind::Nats,
831        };
832        super::DistributedRuntime::new(rt, config).await.unwrap()
833    }
834
835    /// Helper function to create a DRT instance which points at
836    /// a (shared) file-backed KV store and ephemeral NATS transport so that
837    /// multiple DRT instances may observe the same registration state.
838    /// NOTE: This gets around the fact that create_test_drt_async() is
839    /// hardcoded to spin up a memory-backed discovery store
840    /// which means we can't share discovery state across runtimes.
841    pub async fn create_test_shared_drt_async(
842        store_path: &std::path::Path,
843    ) -> super::DistributedRuntime {
844        use crate::transports::nats;
845
846        let rt = crate::Runtime::from_current().unwrap();
847        let config = super::DistributedConfig {
848            discovery_backend: super::DiscoveryBackend::KvStore(
849                crate::storage::kv::Selector::File(store_path.to_path_buf()),
850            ),
851            nats_config: Some(nats::ClientOptions::default()),
852            request_plane: crate::distributed::RequestPlaneMode::default(),
853            event_transport_kind: crate::discovery::EventTransportKind::Nats,
854        };
855        super::DistributedRuntime::new(rt, config).await.unwrap()
856    }
857}
858
859#[cfg(all(test, feature = "integration"))]
860mod tests {
861    use super::RequestPlaneMode;
862    use super::distributed_test_utils::create_test_drt_async;
863
864    #[tokio::test]
865    async fn test_drt_uptime_after_delay_system_disabled() {
866        use crate::config::environment_names::runtime::system as env_system;
867        // Test uptime with system status server disabled
868        temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
869            // Start a DRT
870            let drt = create_test_drt_async().await;
871
872            // Wait 50ms
873            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
874
875            // Check that uptime is 50+ ms
876            let uptime = drt.system_health.lock().uptime();
877            assert!(
878                uptime >= std::time::Duration::from_millis(50),
879                "Expected uptime to be at least 50ms, but got {:?}",
880                uptime
881            );
882
883            println!(
884                "✓ DRT uptime test passed (system disabled): uptime = {:?}",
885                uptime
886            );
887        })
888        .await;
889    }
890
891    #[tokio::test]
892    async fn test_drt_uptime_after_delay_system_enabled() {
893        use crate::config::environment_names::runtime::system as env_system;
894        // Test uptime with system status server enabled
895        temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, Some("8081"))], async {
896            // Start a DRT
897            let drt = create_test_drt_async().await;
898
899            // Wait 50ms
900            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
901
902            // Check that uptime is 50+ ms
903            let uptime = drt.system_health.lock().uptime();
904            assert!(
905                uptime >= std::time::Duration::from_millis(50),
906                "Expected uptime to be at least 50ms, but got {:?}",
907                uptime
908            );
909
910            println!(
911                "✓ DRT uptime test passed (system enabled): uptime = {:?}",
912                uptime
913            );
914        })
915        .await;
916    }
917
918    #[test]
919    fn test_request_plane_mode_from_str() {
920        assert_eq!(
921            "nats".parse::<RequestPlaneMode>().unwrap(),
922            RequestPlaneMode::Nats
923        );
924        assert_eq!(
925            "http".parse::<RequestPlaneMode>().unwrap(),
926            RequestPlaneMode::Http
927        );
928        assert_eq!(
929            "tcp".parse::<RequestPlaneMode>().unwrap(),
930            RequestPlaneMode::Tcp
931        );
932        assert_eq!(
933            "NATS".parse::<RequestPlaneMode>().unwrap(),
934            RequestPlaneMode::Nats
935        );
936        assert_eq!(
937            "HTTP".parse::<RequestPlaneMode>().unwrap(),
938            RequestPlaneMode::Http
939        );
940        assert_eq!(
941            "TCP".parse::<RequestPlaneMode>().unwrap(),
942            RequestPlaneMode::Tcp
943        );
944        assert!("invalid".parse::<RequestPlaneMode>().is_err());
945    }
946
947    #[test]
948    fn test_request_plane_mode_display() {
949        assert_eq!(RequestPlaneMode::Nats.to_string(), "nats");
950        assert_eq!(RequestPlaneMode::Http.to_string(), "http");
951        assert_eq!(RequestPlaneMode::Tcp.to_string(), "tcp");
952    }
953}