Skip to main content

dynamo_runtime/
distributed.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::component::{Component, Instance};
5use crate::pipeline::PipelineError;
6use crate::pipeline::network::manager::NetworkManager;
7use crate::service::{ServiceClient, ServiceSet};
8use crate::storage::kv::{self, Store as _};
9use crate::{
10    component::{self, ComponentBuilder, Endpoint, Namespace},
11    discovery::Discovery,
12    metrics::PrometheusUpdateCallback,
13    metrics::{MetricsHierarchy, MetricsRegistry},
14    transports::{etcd, nats, tcp},
15};
16use crate::{discovery, system_status_server, transports};
17
18use super::utils::GracefulShutdownTracker;
19use crate::SystemHealth;
20use crate::runtime::Runtime;
21
22// Used instead of std::cell::OnceCell because get_or_try_init there is nightly
23use async_once_cell::OnceCell;
24
25use std::fmt;
26use std::sync::{Arc, OnceLock, Weak};
27use std::time::Duration;
28use tokio::sync::watch::Receiver;
29
30use anyhow::Result;
31use derive_getters::Dissolve;
32use figment::error;
33use std::collections::HashMap;
34use tokio::sync::Mutex;
35use tokio_util::sync::CancellationToken;
36
37type InstanceMap = HashMap<Endpoint, Weak<Receiver<Vec<Instance>>>>;
38
39/// Distributed [Runtime] which provides access to shared resources across the cluster, this includes
40/// communication protocols and transports.
41#[derive(Clone)]
42pub struct DistributedRuntime {
43    // local runtime
44    runtime: Runtime,
45
46    nats_client: Option<transports::nats::Client>,
47    store: kv::Manager,
48    network_manager: Arc<NetworkManager>,
49    tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>,
50    system_status_server: Arc<OnceLock<Arc<system_status_server::SystemStatusServerInfo>>>,
51    request_plane: RequestPlaneMode,
52
53    // Service discovery client
54    discovery_client: Arc<dyn discovery::Discovery>,
55
56    // Discovery metadata (only used for Kubernetes backend)
57    // Shared with system status server to expose via /metadata endpoint
58    discovery_metadata: Option<Arc<tokio::sync::RwLock<discovery::DiscoveryMetadata>>>,
59
60    // local registry for components
61    // the registry allows us to use share runtime resources across instances of the same component object.
62    // take for example two instances of a client to the same remote component. The registry allows us to use
63    // a single endpoint watcher for both clients, this keeps the number background tasking watching specific
64    // paths in etcd to a minimum.
65    component_registry: component::Registry,
66
67    instance_sources: Arc<tokio::sync::Mutex<InstanceMap>>,
68
69    // Health Status
70    system_health: Arc<parking_lot::Mutex<SystemHealth>>,
71
72    // Local endpoint registry for in-process calls
73    local_endpoint_registry: crate::local_endpoint_registry::LocalEndpointRegistry,
74
75    // This hierarchy's own metrics registry
76    metrics_registry: MetricsRegistry,
77
78    // Registry for /engine/* route callbacks
79    engine_routes: crate::engine_routes::EngineRouteRegistry,
80}
81
82impl MetricsHierarchy for DistributedRuntime {
83    fn basename(&self) -> String {
84        "".to_string() // drt has no basename. Basename only begins with the Namespace.
85    }
86
87    fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
88        vec![] // drt is the root, so no parent hierarchies
89    }
90
91    fn get_metrics_registry(&self) -> &MetricsRegistry {
92        &self.metrics_registry
93    }
94}
95
96impl std::fmt::Debug for DistributedRuntime {
97    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98        write!(f, "DistributedRuntime")
99    }
100}
101
102impl DistributedRuntime {
103    pub async fn new(runtime: Runtime, config: DistributedConfig) -> Result<Self> {
104        let (selected_kv_store, nats_config, request_plane) = config.dissolve();
105
106        let runtime_clone = runtime.clone();
107
108        let store = match selected_kv_store {
109            kv::Selector::Etcd(etcd_config) => {
110                let etcd_client = etcd::Client::new(*etcd_config, runtime_clone).await.inspect_err(|err|
111                    // The returned error doesn't show because of a dropped runtime error, so
112                    // log it first.
113                    tracing::error!(%err, "Could not connect to etcd. Pass `--store-kv ..` to use a different backend or start etcd."))?;
114                kv::Manager::etcd(etcd_client)
115            }
116            kv::Selector::File(root) => kv::Manager::file(runtime.primary_token(), root),
117            kv::Selector::Memory => kv::Manager::memory(),
118        };
119
120        let nats_client = match nats_config {
121            Some(nc) => Some(nc.connect().await?),
122            None => None,
123        };
124
125        // Start system status server for health and metrics if enabled in configuration
126        let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default();
127        // IMPORTANT: We must extract cancel_token from runtime BEFORE moving runtime into the struct below.
128        // This is because after moving, runtime is no longer accessible in this scope (ownership rules).
129        let cancel_token = if config.system_server_enabled() {
130            Some(runtime.clone().child_token())
131        } else {
132            None
133        };
134        let starting_health_status = config.starting_health_status.clone();
135        let use_endpoint_health_status = config.use_endpoint_health_status.clone();
136        let health_endpoint_path = config.system_health_path.clone();
137        let live_endpoint_path = config.system_live_path.clone();
138        let system_health = Arc::new(parking_lot::Mutex::new(SystemHealth::new(
139            starting_health_status,
140            use_endpoint_health_status,
141            health_endpoint_path,
142            live_endpoint_path,
143        )));
144
145        // Initialize discovery client based on backend configuration
146        let discovery_backend =
147            std::env::var("DYN_DISCOVERY_BACKEND").unwrap_or_else(|_| "kv_store".to_string());
148
149        let (discovery_client, discovery_metadata) = match discovery_backend.as_str() {
150            "kubernetes" => {
151                tracing::info!("Initializing Kubernetes discovery backend");
152                let metadata = Arc::new(tokio::sync::RwLock::new(
153                    crate::discovery::DiscoveryMetadata::new(),
154                ));
155                let client = crate::discovery::KubeDiscoveryClient::new(
156                    metadata.clone(),
157                    runtime.primary_token(),
158                )
159                .await
160                .inspect_err(
161                    |err| tracing::error!(%err, "Failed to initialize Kubernetes discovery client"),
162                )?;
163                (Arc::new(client) as Arc<dyn Discovery>, Some(metadata))
164            }
165            _ => {
166                tracing::info!("Initializing KV store discovery backend");
167                use crate::discovery::KVStoreDiscovery;
168                (
169                    Arc::new(KVStoreDiscovery::new(
170                        store.clone(),
171                        runtime.primary_token(),
172                    )) as Arc<dyn Discovery>,
173                    None,
174                )
175            }
176        };
177
178        let component_registry = component::Registry::new();
179
180        // NetworkManager for request plane
181        let network_manager = NetworkManager::new(
182            runtime.child_token(),
183            nats_client.clone().map(|c| c.client().clone()),
184            component_registry.clone(),
185            request_plane,
186        );
187
188        let distributed_runtime = Self {
189            runtime,
190            store,
191            network_manager: Arc::new(network_manager),
192            nats_client,
193            tcp_server: Arc::new(OnceCell::new()),
194            system_status_server: Arc::new(OnceLock::new()),
195            discovery_client,
196            discovery_metadata,
197            component_registry,
198            instance_sources: Arc::new(Mutex::new(HashMap::new())),
199            metrics_registry: crate::MetricsRegistry::new(),
200            system_health,
201            request_plane,
202            local_endpoint_registry: crate::local_endpoint_registry::LocalEndpointRegistry::new(),
203            engine_routes: crate::engine_routes::EngineRouteRegistry::new(),
204        };
205
206        // Initialize the uptime gauge in SystemHealth
207        distributed_runtime
208            .system_health
209            .lock()
210            .initialize_uptime_gauge(&distributed_runtime)?;
211
212        // Handle system status server initialization
213        if let Some(cancel_token) = cancel_token {
214            // System server is enabled - start both the state and HTTP server
215            let host = config.system_host.clone();
216            let port = config.system_port as u16;
217
218            // Start system status server (it creates SystemStatusState internally)
219            match crate::system_status_server::spawn_system_status_server(
220                &host,
221                port,
222                cancel_token,
223                Arc::new(distributed_runtime.clone()),
224                distributed_runtime.discovery_metadata.clone(),
225            )
226            .await
227            {
228                Ok((addr, handle)) => {
229                    tracing::info!("System status server started successfully on {}", addr);
230
231                    // Store system status server information
232                    let system_status_server_info =
233                        crate::system_status_server::SystemStatusServerInfo::new(
234                            addr,
235                            Some(handle),
236                        );
237
238                    // Initialize the system_status_server field
239                    distributed_runtime
240                        .system_status_server
241                        .set(Arc::new(system_status_server_info))
242                        .expect("System status server info should only be set once");
243                }
244                Err(e) => {
245                    tracing::error!("System status server startup failed: {}", e);
246                }
247            }
248        } else {
249            // System server HTTP is disabled, but uptime metrics are still being tracked via SystemHealth
250            tracing::debug!(
251                "System status server HTTP endpoints disabled, but uptime metrics are being tracked"
252            );
253        }
254
255        // Start health check manager if enabled
256        if config.health_check_enabled {
257            let health_check_config = crate::health_check::HealthCheckConfig {
258                canary_wait_time: std::time::Duration::from_secs(config.canary_wait_time_secs),
259                request_timeout: std::time::Duration::from_secs(
260                    config.health_check_request_timeout_secs,
261                ),
262            };
263
264            // Start the health check manager (spawns per-endpoint monitoring tasks)
265            match crate::health_check::start_health_check_manager(
266                distributed_runtime.clone(),
267                Some(health_check_config),
268            )
269            .await
270            {
271                Ok(()) => tracing::info!(
272                    "Health check manager started (canary_wait_time: {}s, request_timeout: {}s)",
273                    config.canary_wait_time_secs,
274                    config.health_check_request_timeout_secs
275                ),
276                Err(e) => tracing::error!("Health check manager failed to start: {}", e),
277            }
278        }
279
280        Ok(distributed_runtime)
281    }
282
283    pub async fn from_settings(runtime: Runtime) -> Result<Self> {
284        let config = DistributedConfig::from_settings();
285        Self::new(runtime, config).await
286    }
287
288    pub fn runtime(&self) -> &Runtime {
289        &self.runtime
290    }
291
292    pub fn primary_token(&self) -> CancellationToken {
293        self.runtime.primary_token()
294    }
295
296    // TODO: Don't hand out pointers, instead have methods to use the registry in friendly ways
297    // (without being aware of async locks and so on)
298    pub fn component_registry(&self) -> &component::Registry {
299        &self.component_registry
300    }
301
302    // TODO: Don't hand out pointers, instead provide system health related services.
303    pub fn system_health(&self) -> Arc<parking_lot::Mutex<SystemHealth>> {
304        self.system_health.clone()
305    }
306
307    /// Get the local endpoint registry for in-process endpoint calls
308    pub fn local_endpoint_registry(
309        &self,
310    ) -> &crate::local_endpoint_registry::LocalEndpointRegistry {
311        &self.local_endpoint_registry
312    }
313
314    /// Get the engine route registry for registering custom /engine/* routes
315    pub fn engine_routes(&self) -> &crate::engine_routes::EngineRouteRegistry {
316        &self.engine_routes
317    }
318
319    pub fn connection_id(&self) -> u64 {
320        self.discovery_client.instance_id()
321    }
322
323    pub fn shutdown(&self) {
324        self.runtime.shutdown();
325        self.store.shutdown();
326    }
327
328    /// Create a [`Namespace`]
329    pub fn namespace(&self, name: impl Into<String>) -> Result<Namespace> {
330        Namespace::new(self.clone(), name.into())
331    }
332
333    /// Returns the discovery interface for service registration and discovery
334    pub fn discovery(&self) -> Arc<dyn Discovery> {
335        self.discovery_client.clone()
336    }
337
338    pub async fn tcp_server(&self) -> Result<Arc<tcp::server::TcpStreamServer>> {
339        Ok(self
340            .tcp_server
341            .get_or_try_init(async move {
342                let options = tcp::server::ServerOptions::default();
343                let server = tcp::server::TcpStreamServer::new(options).await?;
344                Ok::<_, PipelineError>(server)
345            })
346            .await?
347            .clone())
348    }
349
350    /// Get the network manager
351    ///
352    /// The network manager consolidates all network configuration and provides
353    /// unified access to request plane servers and clients.
354    pub fn network_manager(&self) -> Arc<NetworkManager> {
355        self.network_manager.clone()
356    }
357
358    /// Get the request plane server (convenience method)
359    ///
360    /// This is a shortcut for `network_manager().await?.server().await`.
361    pub async fn request_plane_server(
362        &self,
363    ) -> Result<Arc<dyn crate::pipeline::network::ingress::unified_server::RequestPlaneServer>>
364    {
365        self.network_manager().server().await
366    }
367
368    /// Get system status server information if available
369    pub fn system_status_server_info(
370        &self,
371    ) -> Option<Arc<crate::system_status_server::SystemStatusServerInfo>> {
372        self.system_status_server.get().cloned()
373    }
374
375    /// An interface to store things outside of the process. Usually backed by something like etcd.
376    /// Currently does key-value, but will grow to include whatever we need to store.
377    pub fn store(&self) -> &kv::Manager {
378        &self.store
379    }
380
381    /// How the frontend should talk to the backend.
382    pub fn request_plane(&self) -> RequestPlaneMode {
383        self.request_plane
384    }
385
386    pub fn child_token(&self) -> CancellationToken {
387        self.runtime.child_token()
388    }
389
390    pub(crate) fn graceful_shutdown_tracker(&self) -> Arc<GracefulShutdownTracker> {
391        self.runtime.graceful_shutdown_tracker()
392    }
393
394    pub fn instance_sources(&self) -> Arc<Mutex<InstanceMap>> {
395        self.instance_sources.clone()
396    }
397
398    /// TODO: This is a temporary KV router measure for component/component.rs EventPublisher impl for
399    /// Component, to allow it to publish to NATS. KV Router is the only user.
400    ///
401    /// When NATS is not available (e.g., running in approximate mode with --no-kv-events),
402    /// this function returns Ok(()) silently since publishing is optional in that mode.
403    pub async fn kv_router_nats_publish(
404        &self,
405        subject: String,
406        payload: bytes::Bytes,
407    ) -> anyhow::Result<()> {
408        let Some(nats_client) = self.nats_client.as_ref() else {
409            // NATS not available - this is expected in approximate mode (--no-kv-events)
410            tracing::trace!("Skipping NATS publish (NATS not configured): {}", subject);
411            return Ok(());
412        };
413        Ok(nats_client.client().publish(subject, payload).await?)
414    }
415
416    /// TODO: This is a temporary KV router measure for component/component.rs EventSubscriber impl for
417    /// Component, to allow it to subscribe to NATS. KV Router is the only user.
418    pub(crate) async fn kv_router_nats_subscribe(
419        &self,
420        subject: String,
421    ) -> Result<async_nats::Subscriber> {
422        let Some(nats_client) = self.nats_client.as_ref() else {
423            anyhow::bail!("KV router's EventSubscriber requires NATS");
424        };
425        Ok(nats_client.client().subscribe(subject).await?)
426    }
427
428    /// TODO (karenc): This is a temporary KV router measure for worker query requests.
429    /// Allows KV Router to perform request/reply with workers. (versus the pub/sub pattern above)
430    /// KV Router is the only user, made public for use in dynamo-llm crate
431    pub async fn kv_router_nats_request(
432        &self,
433        subject: String,
434        payload: bytes::Bytes,
435        timeout: std::time::Duration,
436    ) -> anyhow::Result<async_nats::Message> {
437        let Some(nats_client) = self.nats_client.as_ref() else {
438            anyhow::bail!("KV router's request requires NATS");
439        };
440        let response =
441            tokio::time::timeout(timeout, nats_client.client().request(subject, payload))
442                .await
443                .map_err(|_| anyhow::anyhow!("Request timed out after {:?}", timeout))??;
444        Ok(response)
445    }
446
447    /// DEPRECATED: This method exists only for NATS request plane support.
448    /// Once everything uses the TCP request plane, this can be removed along with
449    /// the NATS service registration infrastructure.
450    ///
451    /// Returns a receiver that signals when the NATS service registration is complete.
452    /// The caller should use `blocking_recv()` to wait for completion.
453    pub fn register_nats_service(
454        &self,
455        component: Component,
456    ) -> tokio::sync::mpsc::Receiver<Result<(), String>> {
457        // Create a oneshot-style channel (capacity 1) to signal completion
458        let (tx, rx) = tokio::sync::mpsc::channel::<Result<(), String>>(1);
459
460        let drt = self.clone();
461        self.runtime().secondary().spawn(async move {
462            let service_name = component.service_name();
463
464            // Pre-check to save cost of creating the service, but don't hold the lock
465            if drt
466                .component_registry()
467                .inner
468                .lock()
469                .await
470                .services
471                .contains_key(&service_name)
472            {
473                // The NATS service is per component, but it is called from `serve_endpoint`, and there
474                // are often multiple endpoints for a component (e.g. `clear_kv_blocks` and `generate`).
475                tracing::trace!("Service {service_name} already exists");
476                // Signal success - service already exists
477                let _ = tx.send(Ok(())).await;
478                return;
479            }
480
481            let Some(nats_client) = drt.nats_client.as_ref() else {
482                tracing::error!("Cannot create NATS service without NATS.");
483                let _ = tx
484                    .send(Err("Cannot create NATS service without NATS".to_string()))
485                    .await;
486                return;
487            };
488            let description = None;
489            let nats_service = match crate::component::service::build_nats_service(
490                nats_client,
491                &component,
492                description,
493            )
494            .await
495            {
496                Ok(service) => service,
497                Err(err) => {
498                    tracing::error!(error = %err, component = service_name, "Failed to build NATS service");
499                    let _ = tx.send(Err(format!("Failed to build NATS service: {err}"))).await;
500                    return;
501                }
502            };
503
504            let mut guard = drt.component_registry().inner.lock().await;
505            if !guard.services.contains_key(&service_name) {
506                // Normal case
507                guard.services.insert(service_name.clone(), nats_service);
508
509                tracing::info!("Added NATS service {service_name}");
510
511                drop(guard);
512            } else {
513                drop(guard);
514                let _ = nats_service.stop().await;
515                // The NATS service is per component, but it is called from `serve_endpoint`, and there
516                // are often multiple endpoints for a component (e.g. `clear_kv_blocks` and `generate`).
517                // TODO: Is this still true?
518            }
519
520            // Signal completion - service registered successfully
521            let _ = tx.send(Ok(())).await;
522        });
523
524        rx
525    }
526}
527
528#[derive(Dissolve)]
529pub struct DistributedConfig {
530    pub store_backend: kv::Selector,
531    pub nats_config: Option<nats::ClientOptions>,
532    pub request_plane: RequestPlaneMode,
533}
534
535impl DistributedConfig {
536    pub fn from_settings() -> DistributedConfig {
537        let request_plane = RequestPlaneMode::from_env();
538        // NATS is used for more than just NATS request-plane RPC:
539        // - KV router events (JetStream or NATS core + local indexer)
540        // - inter-router replica sync (NATS core)
541        //
542        // Historically we only connected to NATS when the request plane was NATS, which made
543        // `DYN_REQUEST_PLANE=tcp|http` incompatible with KV routing modes that rely on NATS.
544        // If a NATS server is configured via env, enable the client regardless of request plane.
545        let nats_enabled = request_plane.is_nats()
546            || std::env::var(crate::config::environment_names::nats::NATS_SERVER).is_ok();
547
548        // Check discovery backend to determine the appropriate KV store backend -
549        // kubernetes discovery, or etcd.
550        let discovery_backend =
551            std::env::var("DYN_DISCOVERY_BACKEND").unwrap_or_else(|_| "kv_store".to_string());
552
553        let store_backend = if discovery_backend == "kubernetes" {
554            tracing::info!("Using Kubernetes discovery backend");
555            kv::Selector::Memory
556        } else {
557            kv::Selector::Etcd(Box::default())
558        };
559
560        DistributedConfig {
561            store_backend,
562            nats_config: if nats_enabled {
563                Some(nats::ClientOptions::default())
564            } else {
565                None
566            },
567            request_plane,
568        }
569    }
570
571    pub fn for_cli() -> DistributedConfig {
572        let etcd_config = etcd::ClientOptions {
573            attach_lease: false,
574            ..Default::default()
575        };
576        let request_plane = RequestPlaneMode::from_env();
577        let nats_enabled = request_plane.is_nats()
578            || std::env::var(crate::config::environment_names::nats::NATS_SERVER).is_ok();
579        DistributedConfig {
580            store_backend: kv::Selector::Etcd(Box::new(etcd_config)),
581            nats_config: if nats_enabled {
582                Some(nats::ClientOptions::default())
583            } else {
584                None
585            },
586            request_plane,
587        }
588    }
589
590    /// A DistributedConfig that isn't distributed, for when the frontend and backend are in the
591    /// same process.
592    pub fn process_local() -> DistributedConfig {
593        DistributedConfig {
594            store_backend: kv::Selector::Memory,
595            nats_config: None,
596            // This won't be used in process local, so we likely need a "none" option to
597            // communicate that and avoid opening the ports.
598            request_plane: RequestPlaneMode::Tcp,
599        }
600    }
601}
602
603/// Request plane transport mode configuration
604///
605/// This determines how requests are distributed from routers to workers:
606/// - `Nats`: Use NATS for request distribution (legacy)
607/// - `Http`: Use HTTP/2 for request distribution
608/// - `Tcp`: Use raw TCP for request distribution with msgpack support (default)
609#[derive(Debug, Clone, Copy, PartialEq, Eq)]
610pub enum RequestPlaneMode {
611    /// Use NATS for request plane
612    Nats,
613    /// Use HTTP/2 for request plane
614    Http,
615    /// Use raw TCP for request plane with msgpack support
616    Tcp,
617}
618
619impl Default for RequestPlaneMode {
620    fn default() -> Self {
621        Self::Tcp
622    }
623}
624
625impl fmt::Display for RequestPlaneMode {
626    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
627        match self {
628            Self::Nats => write!(f, "nats"),
629            Self::Http => write!(f, "http"),
630            Self::Tcp => write!(f, "tcp"),
631        }
632    }
633}
634
635impl std::str::FromStr for RequestPlaneMode {
636    type Err = anyhow::Error;
637
638    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
639        match s.to_lowercase().as_str() {
640            "nats" => Ok(Self::Nats),
641            "http" => Ok(Self::Http),
642            "tcp" => Ok(Self::Tcp),
643            _ => Err(anyhow::anyhow!(
644                "Invalid request plane mode: '{}'. Valid options are: 'nats', 'http', 'tcp'",
645                s
646            )),
647        }
648    }
649}
650
651impl RequestPlaneMode {
652    /// Get the request plane mode from environment variable (uncached)
653    /// Reads from `DYN_REQUEST_PLANE` environment variable.
654    fn from_env() -> Self {
655        std::env::var("DYN_REQUEST_PLANE")
656            .ok()
657            .and_then(|s| s.parse().ok())
658            .unwrap_or_default()
659    }
660
661    pub fn is_nats(&self) -> bool {
662        matches!(self, RequestPlaneMode::Nats)
663    }
664}
665
666pub mod distributed_test_utils {
667    //! Common test helper functions for DistributedRuntime tests
668
669    /// Helper function to create a DRT instance for integration-only tests.
670    /// Uses from_current to leverage existing tokio runtime
671    /// Note: Settings are read from environment variables inside DistributedRuntime::from_settings
672    #[cfg(feature = "integration")]
673    pub async fn create_test_drt_async() -> super::DistributedRuntime {
674        use crate::{storage::kv, transports::nats};
675
676        let rt = crate::Runtime::from_current().unwrap();
677        let config = super::DistributedConfig {
678            store_backend: kv::Selector::Memory,
679            nats_config: Some(nats::ClientOptions::default()),
680            request_plane: crate::distributed::RequestPlaneMode::default(),
681        };
682        super::DistributedRuntime::new(rt, config).await.unwrap()
683    }
684
685    /// Helper function to create a DRT instance which points at
686    /// a (shared) file-backed KV store and ephemeral NATS transport so that
687    /// multiple DRT instances may observe the same registration state.
688    /// NOTE: This gets around the fact that create_test_drt_async() is
689    /// hardcoded to spin up a memory-backed discovery store
690    /// which means we can't share discovery state across runtimes.
691    pub async fn create_test_shared_drt_async(
692        store_path: &std::path::Path,
693    ) -> super::DistributedRuntime {
694        use crate::{storage::kv, transports::nats};
695
696        let rt = crate::Runtime::from_current().unwrap();
697        let config = super::DistributedConfig {
698            store_backend: kv::Selector::File(store_path.to_path_buf()),
699            nats_config: Some(nats::ClientOptions::default()),
700            request_plane: crate::distributed::RequestPlaneMode::default(),
701        };
702        super::DistributedRuntime::new(rt, config).await.unwrap()
703    }
704}
705
706#[cfg(all(test, feature = "integration"))]
707mod tests {
708    use super::RequestPlaneMode;
709    use super::distributed_test_utils::create_test_drt_async;
710
711    #[tokio::test]
712    async fn test_drt_uptime_after_delay_system_disabled() {
713        use crate::config::environment_names::runtime::system as env_system;
714        // Test uptime with system status server disabled
715        temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
716            // Start a DRT
717            let drt = create_test_drt_async().await;
718
719            // Wait 50ms
720            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
721
722            // Check that uptime is 50+ ms
723            let uptime = drt.system_health.lock().uptime();
724            assert!(
725                uptime >= std::time::Duration::from_millis(50),
726                "Expected uptime to be at least 50ms, but got {:?}",
727                uptime
728            );
729
730            println!(
731                "✓ DRT uptime test passed (system disabled): uptime = {:?}",
732                uptime
733            );
734        })
735        .await;
736    }
737
738    #[tokio::test]
739    async fn test_drt_uptime_after_delay_system_enabled() {
740        use crate::config::environment_names::runtime::system as env_system;
741        // Test uptime with system status server enabled
742        temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, Some("8081"))], async {
743            // Start a DRT
744            let drt = create_test_drt_async().await;
745
746            // Wait 50ms
747            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
748
749            // Check that uptime is 50+ ms
750            let uptime = drt.system_health.lock().uptime();
751            assert!(
752                uptime >= std::time::Duration::from_millis(50),
753                "Expected uptime to be at least 50ms, but got {:?}",
754                uptime
755            );
756
757            println!(
758                "✓ DRT uptime test passed (system enabled): uptime = {:?}",
759                uptime
760            );
761        })
762        .await;
763    }
764
765    #[test]
766    fn test_request_plane_mode_from_str() {
767        assert_eq!(
768            "nats".parse::<RequestPlaneMode>().unwrap(),
769            RequestPlaneMode::Nats
770        );
771        assert_eq!(
772            "http".parse::<RequestPlaneMode>().unwrap(),
773            RequestPlaneMode::Http
774        );
775        assert_eq!(
776            "tcp".parse::<RequestPlaneMode>().unwrap(),
777            RequestPlaneMode::Tcp
778        );
779        assert_eq!(
780            "NATS".parse::<RequestPlaneMode>().unwrap(),
781            RequestPlaneMode::Nats
782        );
783        assert_eq!(
784            "HTTP".parse::<RequestPlaneMode>().unwrap(),
785            RequestPlaneMode::Http
786        );
787        assert_eq!(
788            "TCP".parse::<RequestPlaneMode>().unwrap(),
789            RequestPlaneMode::Tcp
790        );
791        assert!("invalid".parse::<RequestPlaneMode>().is_err());
792    }
793
794    #[test]
795    fn test_request_plane_mode_display() {
796        assert_eq!(RequestPlaneMode::Nats.to_string(), "nats");
797        assert_eq!(RequestPlaneMode::Http.to_string(), "http");
798        assert_eq!(RequestPlaneMode::Tcp.to_string(), "tcp");
799    }
800}