dynamo_runtime/
distributed.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::component::{Component, Instance};
5use crate::pipeline::PipelineError;
6use crate::pipeline::network::manager::NetworkManager;
7use crate::service::{ServiceClient, ServiceSet};
8use crate::storage::kv::{self, Store as _};
9use crate::{
10    component::{self, ComponentBuilder, Endpoint, Namespace},
11    discovery::Discovery,
12    metrics::PrometheusUpdateCallback,
13    metrics::{MetricsHierarchy, MetricsRegistry},
14    transports::{etcd, nats, tcp},
15};
16use crate::{discovery, system_status_server, transports};
17
18use super::utils::GracefulShutdownTracker;
19use crate::SystemHealth;
20use crate::runtime::Runtime;
21
22// Used instead of std::cell::OnceCell because get_or_try_init there is nightly
23use async_once_cell::OnceCell;
24
25use std::fmt;
26use std::sync::{Arc, OnceLock, Weak};
27use std::time::Duration;
28use tokio::sync::watch::Receiver;
29
30use anyhow::Result;
31use derive_getters::Dissolve;
32use figment::error;
33use std::collections::HashMap;
34use tokio::sync::Mutex;
35use tokio_util::sync::CancellationToken;
36
37type InstanceMap = HashMap<Endpoint, Weak<Receiver<Vec<Instance>>>>;
38
39/// Distributed [Runtime] which provides access to shared resources across the cluster, this includes
40/// communication protocols and transports.
41#[derive(Clone)]
42pub struct DistributedRuntime {
43    // local runtime
44    runtime: Runtime,
45
46    nats_client: Option<transports::nats::Client>,
47    store: kv::Manager,
48    network_manager: Arc<NetworkManager>,
49    tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>,
50    system_status_server: Arc<OnceLock<Arc<system_status_server::SystemStatusServerInfo>>>,
51    request_plane: RequestPlaneMode,
52
53    // Service discovery client
54    discovery_client: Arc<dyn discovery::Discovery>,
55
56    // Discovery metadata (only used for Kubernetes backend)
57    // Shared with system status server to expose via /metadata endpoint
58    discovery_metadata: Option<Arc<tokio::sync::RwLock<discovery::DiscoveryMetadata>>>,
59
60    // local registry for components
61    // the registry allows us to use share runtime resources across instances of the same component object.
62    // take for example two instances of a client to the same remote component. The registry allows us to use
63    // a single endpoint watcher for both clients, this keeps the number background tasking watching specific
64    // paths in etcd to a minimum.
65    component_registry: component::Registry,
66
67    instance_sources: Arc<tokio::sync::Mutex<InstanceMap>>,
68
69    // Health Status
70    system_health: Arc<parking_lot::Mutex<SystemHealth>>,
71
72    // Local endpoint registry for in-process calls
73    local_endpoint_registry: crate::local_endpoint_registry::LocalEndpointRegistry,
74
75    // This hierarchy's own metrics registry
76    metrics_registry: MetricsRegistry,
77
78    // Registry for /engine/* route callbacks
79    engine_routes: crate::engine_routes::EngineRouteRegistry,
80}
81
82impl MetricsHierarchy for DistributedRuntime {
83    fn basename(&self) -> String {
84        "".to_string() // drt has no basename. Basename only begins with the Namespace.
85    }
86
87    fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
88        vec![] // drt is the root, so no parent hierarchies
89    }
90
91    fn get_metrics_registry(&self) -> &MetricsRegistry {
92        &self.metrics_registry
93    }
94}
95
96impl std::fmt::Debug for DistributedRuntime {
97    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98        write!(f, "DistributedRuntime")
99    }
100}
101
102impl DistributedRuntime {
103    pub async fn new(runtime: Runtime, config: DistributedConfig) -> Result<Self> {
104        let (selected_kv_store, nats_config, request_plane) = config.dissolve();
105
106        let runtime_clone = runtime.clone();
107
108        let store = match selected_kv_store {
109            kv::Selector::Etcd(etcd_config) => {
110                let etcd_client = etcd::Client::new(*etcd_config, runtime_clone).await.inspect_err(|err|
111                    // The returned error doesn't show because of a dropped runtime error, so
112                    // log it first.
113                    tracing::error!(%err, "Could not connect to etcd. Pass `--store-kv ..` to use a different backend or start etcd."))?;
114                kv::Manager::etcd(etcd_client)
115            }
116            kv::Selector::File(root) => kv::Manager::file(runtime.primary_token(), root),
117            kv::Selector::Memory => kv::Manager::memory(),
118        };
119
120        let nats_client = match nats_config {
121            Some(nc) => Some(nc.connect().await?),
122            None => None,
123        };
124
125        // Start system status server for health and metrics if enabled in configuration
126        let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default();
127        // IMPORTANT: We must extract cancel_token from runtime BEFORE moving runtime into the struct below.
128        // This is because after moving, runtime is no longer accessible in this scope (ownership rules).
129        let cancel_token = if config.system_server_enabled() {
130            Some(runtime.clone().child_token())
131        } else {
132            None
133        };
134        let starting_health_status = config.starting_health_status.clone();
135        let use_endpoint_health_status = config.use_endpoint_health_status.clone();
136        let health_endpoint_path = config.system_health_path.clone();
137        let live_endpoint_path = config.system_live_path.clone();
138        let system_health = Arc::new(parking_lot::Mutex::new(SystemHealth::new(
139            starting_health_status,
140            use_endpoint_health_status,
141            health_endpoint_path,
142            live_endpoint_path,
143        )));
144
145        // Initialize discovery client based on backend configuration
146        let discovery_backend =
147            std::env::var("DYN_DISCOVERY_BACKEND").unwrap_or_else(|_| "kv_store".to_string());
148
149        let (discovery_client, discovery_metadata) = match discovery_backend.as_str() {
150            "kubernetes" => {
151                tracing::info!("Initializing Kubernetes discovery backend");
152                let metadata = Arc::new(tokio::sync::RwLock::new(
153                    crate::discovery::DiscoveryMetadata::new(),
154                ));
155                let client = crate::discovery::KubeDiscoveryClient::new(
156                    metadata.clone(),
157                    runtime.primary_token(),
158                )
159                .await
160                .inspect_err(
161                    |err| tracing::error!(%err, "Failed to initialize Kubernetes discovery client"),
162                )?;
163                (Arc::new(client) as Arc<dyn Discovery>, Some(metadata))
164            }
165            _ => {
166                tracing::info!("Initializing KV store discovery backend");
167                use crate::discovery::KVStoreDiscovery;
168                (
169                    Arc::new(KVStoreDiscovery::new(
170                        store.clone(),
171                        runtime.primary_token(),
172                    )) as Arc<dyn Discovery>,
173                    None,
174                )
175            }
176        };
177
178        let component_registry = component::Registry::new();
179
180        // NetworkManager for request plane
181        let network_manager = NetworkManager::new(
182            runtime.child_token(),
183            nats_client.clone().map(|c| c.client().clone()),
184            component_registry.clone(),
185            request_plane,
186        );
187
188        let distributed_runtime = Self {
189            runtime,
190            store,
191            network_manager: Arc::new(network_manager),
192            nats_client,
193            tcp_server: Arc::new(OnceCell::new()),
194            system_status_server: Arc::new(OnceLock::new()),
195            discovery_client,
196            discovery_metadata,
197            component_registry,
198            instance_sources: Arc::new(Mutex::new(HashMap::new())),
199            metrics_registry: crate::MetricsRegistry::new(),
200            system_health,
201            request_plane,
202            local_endpoint_registry: crate::local_endpoint_registry::LocalEndpointRegistry::new(),
203            engine_routes: crate::engine_routes::EngineRouteRegistry::new(),
204        };
205
206        // Initialize the uptime gauge in SystemHealth
207        distributed_runtime
208            .system_health
209            .lock()
210            .initialize_uptime_gauge(&distributed_runtime)?;
211
212        // Handle system status server initialization
213        if let Some(cancel_token) = cancel_token {
214            // System server is enabled - start both the state and HTTP server
215            let host = config.system_host.clone();
216            let port = config.system_port as u16;
217
218            // Start system status server (it creates SystemStatusState internally)
219            match crate::system_status_server::spawn_system_status_server(
220                &host,
221                port,
222                cancel_token,
223                Arc::new(distributed_runtime.clone()),
224                distributed_runtime.discovery_metadata.clone(),
225            )
226            .await
227            {
228                Ok((addr, handle)) => {
229                    tracing::info!("System status server started successfully on {}", addr);
230
231                    // Store system status server information
232                    let system_status_server_info =
233                        crate::system_status_server::SystemStatusServerInfo::new(
234                            addr,
235                            Some(handle),
236                        );
237
238                    // Initialize the system_status_server field
239                    distributed_runtime
240                        .system_status_server
241                        .set(Arc::new(system_status_server_info))
242                        .expect("System status server info should only be set once");
243                }
244                Err(e) => {
245                    tracing::error!("System status server startup failed: {}", e);
246                }
247            }
248        } else {
249            // System server HTTP is disabled, but uptime metrics are still being tracked via SystemHealth
250            tracing::debug!(
251                "System status server HTTP endpoints disabled, but uptime metrics are being tracked"
252            );
253        }
254
255        // Start health check manager if enabled
256        if config.health_check_enabled {
257            let health_check_config = crate::health_check::HealthCheckConfig {
258                canary_wait_time: std::time::Duration::from_secs(config.canary_wait_time_secs),
259                request_timeout: std::time::Duration::from_secs(
260                    config.health_check_request_timeout_secs,
261                ),
262            };
263
264            // Start the health check manager (spawns per-endpoint monitoring tasks)
265            match crate::health_check::start_health_check_manager(
266                distributed_runtime.clone(),
267                Some(health_check_config),
268            )
269            .await
270            {
271                Ok(()) => tracing::info!(
272                    "Health check manager started (canary_wait_time: {}s, request_timeout: {}s)",
273                    config.canary_wait_time_secs,
274                    config.health_check_request_timeout_secs
275                ),
276                Err(e) => tracing::error!("Health check manager failed to start: {}", e),
277            }
278        }
279
280        Ok(distributed_runtime)
281    }
282
283    pub async fn from_settings(runtime: Runtime) -> Result<Self> {
284        let config = DistributedConfig::from_settings();
285        Self::new(runtime, config).await
286    }
287
288    pub fn runtime(&self) -> &Runtime {
289        &self.runtime
290    }
291
292    pub fn primary_token(&self) -> CancellationToken {
293        self.runtime.primary_token()
294    }
295
296    // TODO: Don't hand out pointers, instead have methods to use the registry in friendly ways
297    // (without being aware of async locks and so on)
298    pub fn component_registry(&self) -> &component::Registry {
299        &self.component_registry
300    }
301
302    // TODO: Don't hand out pointers, instead provide system health related services.
303    pub fn system_health(&self) -> Arc<parking_lot::Mutex<SystemHealth>> {
304        self.system_health.clone()
305    }
306
307    /// Get the local endpoint registry for in-process endpoint calls
308    pub fn local_endpoint_registry(
309        &self,
310    ) -> &crate::local_endpoint_registry::LocalEndpointRegistry {
311        &self.local_endpoint_registry
312    }
313
314    /// Get the engine route registry for registering custom /engine/* routes
315    pub fn engine_routes(&self) -> &crate::engine_routes::EngineRouteRegistry {
316        &self.engine_routes
317    }
318
319    pub fn connection_id(&self) -> u64 {
320        self.discovery_client.instance_id()
321    }
322
323    pub fn shutdown(&self) {
324        self.runtime.shutdown();
325        self.store.shutdown();
326    }
327
328    /// Create a [`Namespace`]
329    pub fn namespace(&self, name: impl Into<String>) -> Result<Namespace> {
330        Namespace::new(self.clone(), name.into())
331    }
332
333    /// Returns the discovery interface for service registration and discovery
334    pub fn discovery(&self) -> Arc<dyn Discovery> {
335        self.discovery_client.clone()
336    }
337
338    pub async fn tcp_server(&self) -> Result<Arc<tcp::server::TcpStreamServer>> {
339        Ok(self
340            .tcp_server
341            .get_or_try_init(async move {
342                let options = tcp::server::ServerOptions::default();
343                let server = tcp::server::TcpStreamServer::new(options).await?;
344                Ok::<_, PipelineError>(server)
345            })
346            .await?
347            .clone())
348    }
349
350    /// Get the network manager
351    ///
352    /// The network manager consolidates all network configuration and provides
353    /// unified access to request plane servers and clients.
354    pub fn network_manager(&self) -> Arc<NetworkManager> {
355        self.network_manager.clone()
356    }
357
358    /// Get the request plane server (convenience method)
359    ///
360    /// This is a shortcut for `network_manager().await?.server().await`.
361    pub async fn request_plane_server(
362        &self,
363    ) -> Result<Arc<dyn crate::pipeline::network::ingress::unified_server::RequestPlaneServer>>
364    {
365        self.network_manager().server().await
366    }
367
368    /// Get system status server information if available
369    pub fn system_status_server_info(
370        &self,
371    ) -> Option<Arc<crate::system_status_server::SystemStatusServerInfo>> {
372        self.system_status_server.get().cloned()
373    }
374
375    /// An interface to store things outside of the process. Usually backed by something like etcd.
376    /// Currently does key-value, but will grow to include whatever we need to store.
377    pub fn store(&self) -> &kv::Manager {
378        &self.store
379    }
380
381    /// How the frontend should talk to the backend.
382    pub fn request_plane(&self) -> RequestPlaneMode {
383        self.request_plane
384    }
385
386    pub fn child_token(&self) -> CancellationToken {
387        self.runtime.child_token()
388    }
389
390    pub(crate) fn graceful_shutdown_tracker(&self) -> Arc<GracefulShutdownTracker> {
391        self.runtime.graceful_shutdown_tracker()
392    }
393
394    pub fn instance_sources(&self) -> Arc<Mutex<InstanceMap>> {
395        self.instance_sources.clone()
396    }
397
398    /// TODO: This is a temporary KV router measure for component/component.rs EventPublisher impl for
399    /// Component, to allow it to publish to NATS. KV Router is the only user.
400    ///
401    /// When NATS is not available (e.g., running in approximate mode with --no-kv-events),
402    /// this function returns Ok(()) silently since publishing is optional in that mode.
403    pub async fn kv_router_nats_publish(
404        &self,
405        subject: String,
406        payload: bytes::Bytes,
407    ) -> anyhow::Result<()> {
408        let Some(nats_client) = self.nats_client.as_ref() else {
409            // NATS not available - this is expected in approximate mode (--no-kv-events)
410            tracing::trace!("Skipping NATS publish (NATS not configured): {}", subject);
411            return Ok(());
412        };
413        Ok(nats_client.client().publish(subject, payload).await?)
414    }
415
416    /// TODO: This is a temporary KV router measure for component/component.rs EventSubscriber impl for
417    /// Component, to allow it to subscribe to NATS. KV Router is the only user.
418    pub(crate) async fn kv_router_nats_subscribe(
419        &self,
420        subject: String,
421    ) -> Result<async_nats::Subscriber> {
422        let Some(nats_client) = self.nats_client.as_ref() else {
423            anyhow::bail!("KV router's EventSubscriber requires NATS");
424        };
425        Ok(nats_client.client().subscribe(subject).await?)
426    }
427
428    /// TODO (karenc): This is a temporary KV router measure for worker query requests.
429    /// Allows KV Router to perform request/reply with workers. (versus the pub/sub pattern above)
430    /// KV Router is the only user, made public for use in dynamo-llm crate
431    pub async fn kv_router_nats_request(
432        &self,
433        subject: String,
434        payload: bytes::Bytes,
435        timeout: std::time::Duration,
436    ) -> anyhow::Result<async_nats::Message> {
437        let Some(nats_client) = self.nats_client.as_ref() else {
438            anyhow::bail!("KV router's request requires NATS");
439        };
440        let response =
441            tokio::time::timeout(timeout, nats_client.client().request(subject, payload))
442                .await
443                .map_err(|_| anyhow::anyhow!("Request timed out after {:?}", timeout))??;
444        Ok(response)
445    }
446
447    /// DEPRECATED: This method exists only for NATS request plane support.
448    /// Once everything uses the TCP request plane, this can be removed along with
449    /// the NATS service registration infrastructure.
450    ///
451    /// Returns a receiver that signals when the NATS service registration is complete.
452    /// The caller should use `blocking_recv()` to wait for completion.
453    pub fn register_nats_service(
454        &self,
455        component: Component,
456    ) -> tokio::sync::mpsc::Receiver<Result<(), String>> {
457        // Create a oneshot-style channel (capacity 1) to signal completion
458        let (tx, rx) = tokio::sync::mpsc::channel::<Result<(), String>>(1);
459
460        let drt = self.clone();
461        self.runtime().secondary().spawn(async move {
462            let service_name = component.service_name();
463
464            // Pre-check to save cost of creating the service, but don't hold the lock
465            if drt
466                .component_registry()
467                .inner
468                .lock()
469                .await
470                .services
471                .contains_key(&service_name)
472            {
473                // The NATS service is per component, but it is called from `serve_endpoint`, and there
474                // are often multiple endpoints for a component (e.g. `clear_kv_blocks` and `generate`).
475                tracing::trace!("Service {service_name} already exists");
476                // Signal success - service already exists
477                let _ = tx.send(Ok(())).await;
478                return;
479            }
480
481            let Some(nats_client) = drt.nats_client.as_ref() else {
482                tracing::error!("Cannot create NATS service without NATS.");
483                let _ = tx
484                    .send(Err("Cannot create NATS service without NATS".to_string()))
485                    .await;
486                return;
487            };
488            let description = None;
489            let nats_service = match crate::component::service::build_nats_service(
490                nats_client,
491                &component,
492                description,
493            )
494            .await
495            {
496                Ok(service) => service,
497                Err(err) => {
498                    tracing::error!(error = %err, component = service_name, "Failed to build NATS service");
499                    let _ = tx.send(Err(format!("Failed to build NATS service: {err}"))).await;
500                    return;
501                }
502            };
503
504            let mut guard = drt.component_registry().inner.lock().await;
505            if !guard.services.contains_key(&service_name) {
506                // Normal case
507                guard.services.insert(service_name.clone(), nats_service);
508
509                tracing::info!("Added NATS service {service_name}");
510
511                drop(guard);
512            } else {
513                drop(guard);
514                let _ = nats_service.stop().await;
515                // The NATS service is per component, but it is called from `serve_endpoint`, and there
516                // are often multiple endpoints for a component (e.g. `clear_kv_blocks` and `generate`).
517                // TODO: Is this still true?
518            }
519
520            // Signal completion - service registered successfully
521            let _ = tx.send(Ok(())).await;
522        });
523
524        rx
525    }
526}
527
528#[derive(Dissolve)]
529pub struct DistributedConfig {
530    pub store_backend: kv::Selector,
531    pub nats_config: Option<nats::ClientOptions>,
532    pub request_plane: RequestPlaneMode,
533}
534
535impl DistributedConfig {
536    pub fn from_settings() -> DistributedConfig {
537        let request_plane = RequestPlaneMode::from_env();
538        // NATS is used for more than just NATS request-plane RPC:
539        // - KV router events (JetStream or NATS core + local indexer)
540        // - inter-router replica sync (NATS core)
541        //
542        // Historically we only connected to NATS when the request plane was NATS, which made
543        // `DYN_REQUEST_PLANE=tcp|http` incompatible with KV routing modes that rely on NATS.
544        // If a NATS server is configured via env, enable the client regardless of request plane.
545        let nats_enabled = request_plane.is_nats()
546            || std::env::var(crate::config::environment_names::nats::NATS_SERVER).is_ok();
547        DistributedConfig {
548            store_backend: kv::Selector::Etcd(Box::default()),
549            nats_config: if nats_enabled {
550                Some(nats::ClientOptions::default())
551            } else {
552                None
553            },
554            request_plane,
555        }
556    }
557
558    pub fn for_cli() -> DistributedConfig {
559        let etcd_config = etcd::ClientOptions {
560            attach_lease: false,
561            ..Default::default()
562        };
563        let request_plane = RequestPlaneMode::from_env();
564        let nats_enabled = request_plane.is_nats()
565            || std::env::var(crate::config::environment_names::nats::NATS_SERVER).is_ok();
566        DistributedConfig {
567            store_backend: kv::Selector::Etcd(Box::new(etcd_config)),
568            nats_config: if nats_enabled {
569                Some(nats::ClientOptions::default())
570            } else {
571                None
572            },
573            request_plane,
574        }
575    }
576
577    /// A DistributedConfig that isn't distributed, for when the frontend and backend are in the
578    /// same process.
579    pub fn process_local() -> DistributedConfig {
580        DistributedConfig {
581            store_backend: kv::Selector::Memory,
582            nats_config: None,
583            // This won't be used in process local, so we likely need a "none" option to
584            // communicate that and avoid opening the ports.
585            request_plane: RequestPlaneMode::Tcp,
586        }
587    }
588}
589
590/// Request plane transport mode configuration
591///
592/// This determines how requests are distributed from routers to workers:
593/// - `Nats`: Use NATS for request distribution (legacy)
594/// - `Http`: Use HTTP/2 for request distribution
595/// - `Tcp`: Use raw TCP for request distribution with msgpack support (default)
596#[derive(Debug, Clone, Copy, PartialEq, Eq)]
597pub enum RequestPlaneMode {
598    /// Use NATS for request plane
599    Nats,
600    /// Use HTTP/2 for request plane
601    Http,
602    /// Use raw TCP for request plane with msgpack support
603    Tcp,
604}
605
606impl Default for RequestPlaneMode {
607    fn default() -> Self {
608        Self::Tcp
609    }
610}
611
612impl fmt::Display for RequestPlaneMode {
613    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
614        match self {
615            Self::Nats => write!(f, "nats"),
616            Self::Http => write!(f, "http"),
617            Self::Tcp => write!(f, "tcp"),
618        }
619    }
620}
621
622impl std::str::FromStr for RequestPlaneMode {
623    type Err = anyhow::Error;
624
625    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
626        match s.to_lowercase().as_str() {
627            "nats" => Ok(Self::Nats),
628            "http" => Ok(Self::Http),
629            "tcp" => Ok(Self::Tcp),
630            _ => Err(anyhow::anyhow!(
631                "Invalid request plane mode: '{}'. Valid options are: 'nats', 'http', 'tcp'",
632                s
633            )),
634        }
635    }
636}
637
638impl RequestPlaneMode {
639    /// Get the request plane mode from environment variable (uncached)
640    /// Reads from `DYN_REQUEST_PLANE` environment variable.
641    fn from_env() -> Self {
642        std::env::var("DYN_REQUEST_PLANE")
643            .ok()
644            .and_then(|s| s.parse().ok())
645            .unwrap_or_default()
646    }
647
648    pub fn is_nats(&self) -> bool {
649        matches!(self, RequestPlaneMode::Nats)
650    }
651}
652
653pub mod distributed_test_utils {
654    //! Common test helper functions for DistributedRuntime tests
655
656    /// Helper function to create a DRT instance for integration-only tests.
657    /// Uses from_current to leverage existing tokio runtime
658    /// Note: Settings are read from environment variables inside DistributedRuntime::from_settings
659    #[cfg(feature = "integration")]
660    pub async fn create_test_drt_async() -> super::DistributedRuntime {
661        use crate::{storage::kv, transports::nats};
662
663        let rt = crate::Runtime::from_current().unwrap();
664        let config = super::DistributedConfig {
665            store_backend: kv::Selector::Memory,
666            nats_config: Some(nats::ClientOptions::default()),
667            request_plane: crate::distributed::RequestPlaneMode::default(),
668        };
669        super::DistributedRuntime::new(rt, config).await.unwrap()
670    }
671
672    /// Helper function to create a DRT instance which points at
673    /// a (shared) file-backed KV store and ephemeral NATS transport so that
674    /// multiple DRT instances may observe the same registration state.
675    /// NOTE: This gets around the fact that create_test_drt_async() is
676    /// hardcoded to spin up a memory-backed discovery store
677    /// which means we can't share discovery state across runtimes.
678    pub async fn create_test_shared_drt_async(
679        store_path: &std::path::Path,
680    ) -> super::DistributedRuntime {
681        use crate::{storage::kv, transports::nats};
682
683        let rt = crate::Runtime::from_current().unwrap();
684        let config = super::DistributedConfig {
685            store_backend: kv::Selector::File(store_path.to_path_buf()),
686            nats_config: Some(nats::ClientOptions::default()),
687            request_plane: crate::distributed::RequestPlaneMode::default(),
688        };
689        super::DistributedRuntime::new(rt, config).await.unwrap()
690    }
691}
692
693#[cfg(all(test, feature = "integration"))]
694mod tests {
695    use super::RequestPlaneMode;
696    use super::distributed_test_utils::create_test_drt_async;
697
698    #[tokio::test]
699    async fn test_drt_uptime_after_delay_system_disabled() {
700        use crate::config::environment_names::runtime::system as env_system;
701        // Test uptime with system status server disabled
702        temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
703            // Start a DRT
704            let drt = create_test_drt_async().await;
705
706            // Wait 50ms
707            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
708
709            // Check that uptime is 50+ ms
710            let uptime = drt.system_health.lock().uptime();
711            assert!(
712                uptime >= std::time::Duration::from_millis(50),
713                "Expected uptime to be at least 50ms, but got {:?}",
714                uptime
715            );
716
717            println!(
718                "✓ DRT uptime test passed (system disabled): uptime = {:?}",
719                uptime
720            );
721        })
722        .await;
723    }
724
725    #[tokio::test]
726    async fn test_drt_uptime_after_delay_system_enabled() {
727        use crate::config::environment_names::runtime::system as env_system;
728        // Test uptime with system status server enabled
729        temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, Some("8081"))], async {
730            // Start a DRT
731            let drt = create_test_drt_async().await;
732
733            // Wait 50ms
734            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
735
736            // Check that uptime is 50+ ms
737            let uptime = drt.system_health.lock().uptime();
738            assert!(
739                uptime >= std::time::Duration::from_millis(50),
740                "Expected uptime to be at least 50ms, but got {:?}",
741                uptime
742            );
743
744            println!(
745                "✓ DRT uptime test passed (system enabled): uptime = {:?}",
746                uptime
747            );
748        })
749        .await;
750    }
751
752    #[test]
753    fn test_request_plane_mode_from_str() {
754        assert_eq!(
755            "nats".parse::<RequestPlaneMode>().unwrap(),
756            RequestPlaneMode::Nats
757        );
758        assert_eq!(
759            "http".parse::<RequestPlaneMode>().unwrap(),
760            RequestPlaneMode::Http
761        );
762        assert_eq!(
763            "tcp".parse::<RequestPlaneMode>().unwrap(),
764            RequestPlaneMode::Tcp
765        );
766        assert_eq!(
767            "NATS".parse::<RequestPlaneMode>().unwrap(),
768            RequestPlaneMode::Nats
769        );
770        assert_eq!(
771            "HTTP".parse::<RequestPlaneMode>().unwrap(),
772            RequestPlaneMode::Http
773        );
774        assert_eq!(
775            "TCP".parse::<RequestPlaneMode>().unwrap(),
776            RequestPlaneMode::Tcp
777        );
778        assert!("invalid".parse::<RequestPlaneMode>().is_err());
779    }
780
781    #[test]
782    fn test_request_plane_mode_display() {
783        assert_eq!(RequestPlaneMode::Nats.to_string(), "nats");
784        assert_eq!(RequestPlaneMode::Http.to_string(), "http");
785        assert_eq!(RequestPlaneMode::Tcp.to_string(), "tcp");
786    }
787}