Skip to main content

dynamo_runtime/
distributed.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::component::{Component, Instance};
5use crate::pipeline::PipelineError;
6use crate::pipeline::network::manager::NetworkManager;
7use crate::service::{ServiceClient, ServiceSet};
8use crate::storage::kv;
9use crate::{
10    component::{self, ComponentBuilder, Endpoint, Namespace},
11    discovery::Discovery,
12    metrics::PrometheusUpdateCallback,
13    metrics::{MetricsHierarchy, MetricsRegistry},
14    transports::{etcd, nats, tcp},
15};
16use crate::{discovery, system_status_server, transports};
17
18use super::utils::GracefulShutdownTracker;
19use crate::SystemHealth;
20use crate::runtime::Runtime;
21
22// Used instead of std::cell::OnceCell because get_or_try_init there is nightly
23use async_once_cell::OnceCell;
24
25use std::fmt;
26use std::sync::{Arc, OnceLock, Weak};
27use std::time::Duration;
28use tokio::sync::watch::Receiver;
29
30use anyhow::Result;
31use derive_getters::Dissolve;
32use figment::error;
33use std::collections::HashMap;
34use tokio::sync::Mutex;
35use tokio_util::sync::CancellationToken;
36
37type InstanceMap = HashMap<Endpoint, Weak<Receiver<Vec<Instance>>>>;
38
39/// Distributed [Runtime] which provides access to shared resources across the cluster, this includes
40/// communication protocols and transports.
41#[derive(Clone)]
42pub struct DistributedRuntime {
43    // local runtime
44    runtime: Runtime,
45
46    nats_client: Option<transports::nats::Client>,
47    network_manager: Arc<NetworkManager>,
48    tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>,
49    system_status_server: Arc<OnceLock<Arc<system_status_server::SystemStatusServerInfo>>>,
50    request_plane: RequestPlaneMode,
51
52    // Service discovery client
53    discovery_client: Arc<dyn discovery::Discovery>,
54
55    // Discovery metadata (only used for Kubernetes backend)
56    // Shared with system status server to expose via /metadata endpoint
57    discovery_metadata: Option<Arc<tokio::sync::RwLock<discovery::DiscoveryMetadata>>>,
58
59    // local registry for components
60    // the registry allows us to use share runtime resources across instances of the same component object.
61    // take for example two instances of a client to the same remote component. The registry allows us to use
62    // a single endpoint watcher for both clients, this keeps the number background tasking watching specific
63    // paths in etcd to a minimum.
64    component_registry: component::Registry,
65
66    instance_sources: Arc<tokio::sync::Mutex<InstanceMap>>,
67
68    // Health Status
69    system_health: Arc<parking_lot::Mutex<SystemHealth>>,
70
71    // Local endpoint registry for in-process calls
72    local_endpoint_registry: crate::local_endpoint_registry::LocalEndpointRegistry,
73
74    // This hierarchy's own metrics registry
75    metrics_registry: MetricsRegistry,
76
77    // Registry for /engine/* route callbacks
78    engine_routes: crate::engine_routes::EngineRouteRegistry,
79}
80
81impl MetricsHierarchy for DistributedRuntime {
82    fn basename(&self) -> String {
83        "".to_string() // drt has no basename. Basename only begins with the Namespace.
84    }
85
86    fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
87        vec![] // drt is the root, so no parent hierarchies
88    }
89
90    fn get_metrics_registry(&self) -> &MetricsRegistry {
91        &self.metrics_registry
92    }
93}
94
95impl std::fmt::Debug for DistributedRuntime {
96    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97        write!(f, "DistributedRuntime")
98    }
99}
100
101impl DistributedRuntime {
102    pub async fn new(runtime: Runtime, config: DistributedConfig) -> Result<Self> {
103        let (discovery_backend, nats_config, request_plane) = config.dissolve();
104
105        let nats_client = match nats_config {
106            Some(nc) => Some(nc.connect().await?),
107            None => None,
108        };
109
110        // Start system status server for health and metrics if enabled in configuration
111        let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default();
112        // IMPORTANT: We must extract cancel_token from runtime BEFORE moving runtime into the struct below.
113        // This is because after moving, runtime is no longer accessible in this scope (ownership rules).
114        let cancel_token = if config.system_server_enabled() {
115            Some(runtime.clone().child_token())
116        } else {
117            None
118        };
119        let starting_health_status = config.starting_health_status.clone();
120        let use_endpoint_health_status = config.use_endpoint_health_status.clone();
121        let health_endpoint_path = config.system_health_path.clone();
122        let live_endpoint_path = config.system_live_path.clone();
123        let system_health = Arc::new(parking_lot::Mutex::new(SystemHealth::new(
124            starting_health_status,
125            use_endpoint_health_status,
126            health_endpoint_path,
127            live_endpoint_path,
128        )));
129
130        // Initialize discovery client based on backend configuration
131        let (discovery_client, discovery_metadata) = match discovery_backend {
132            DiscoveryBackend::Kubernetes => {
133                tracing::info!("Initializing Kubernetes discovery backend");
134                let metadata = Arc::new(tokio::sync::RwLock::new(
135                    crate::discovery::DiscoveryMetadata::new(),
136                ));
137                let client = crate::discovery::KubeDiscoveryClient::new(
138                    metadata.clone(),
139                    runtime.primary_token(),
140                )
141                .await
142                .inspect_err(
143                    |err| tracing::error!(%err, "Failed to initialize Kubernetes discovery client"),
144                )?;
145                (Arc::new(client) as Arc<dyn Discovery>, Some(metadata))
146            }
147            DiscoveryBackend::KvStore(kv_selector) => {
148                tracing::info!("Initializing KV store discovery backend: {}", kv_selector);
149                let runtime_clone = runtime.clone();
150                let store = match kv_selector {
151                    kv::Selector::Etcd(etcd_config) => {
152                        let etcd_client = etcd::Client::new(*etcd_config, runtime_clone).await.inspect_err(|err|
153                            tracing::error!(%err, "Could not connect to etcd. Pass `--discovery-backend ..` to use a different backend or start etcd."))?;
154                        kv::Manager::etcd(etcd_client)
155                    }
156                    kv::Selector::File(root) => kv::Manager::file(runtime.primary_token(), root),
157                    kv::Selector::Memory => kv::Manager::memory(),
158                };
159                use crate::discovery::KVStoreDiscovery;
160                (
161                    Arc::new(KVStoreDiscovery::new(store, runtime.primary_token()))
162                        as Arc<dyn Discovery>,
163                    None,
164                )
165            }
166        };
167
168        let component_registry = component::Registry::new();
169
170        // NetworkManager for request plane
171        let network_manager = NetworkManager::new(
172            runtime.child_token(),
173            nats_client.clone().map(|c| c.client().clone()),
174            component_registry.clone(),
175            request_plane,
176        );
177
178        let distributed_runtime = Self {
179            runtime,
180            network_manager: Arc::new(network_manager),
181            nats_client,
182            tcp_server: Arc::new(OnceCell::new()),
183            system_status_server: Arc::new(OnceLock::new()),
184            discovery_client,
185            discovery_metadata,
186            component_registry,
187            instance_sources: Arc::new(Mutex::new(HashMap::new())),
188            metrics_registry: crate::MetricsRegistry::new(),
189            system_health,
190            request_plane,
191            local_endpoint_registry: crate::local_endpoint_registry::LocalEndpointRegistry::new(),
192            engine_routes: crate::engine_routes::EngineRouteRegistry::new(),
193        };
194
195        // Initialize the uptime gauge in SystemHealth
196        distributed_runtime
197            .system_health
198            .lock()
199            .initialize_uptime_gauge(&distributed_runtime)?;
200
201        // Register an update callback so the uptime gauge is refreshed before
202        // every Prometheus scrape (both system status server and frontend).
203        {
204            let system_health = distributed_runtime.system_health.clone();
205            distributed_runtime
206                .metrics_registry
207                .add_update_callback(std::sync::Arc::new(move || {
208                    system_health.lock().update_uptime_gauge();
209                    Ok(())
210                }));
211        }
212
213        // Handle system status server initialization
214        if let Some(cancel_token) = cancel_token {
215            // System server is enabled - start both the state and HTTP server
216            let host = config.system_host.clone();
217            let port = config.system_port as u16;
218
219            // Start system status server (it creates SystemStatusState internally)
220            match crate::system_status_server::spawn_system_status_server(
221                &host,
222                port,
223                cancel_token,
224                Arc::new(distributed_runtime.clone()),
225                distributed_runtime.discovery_metadata.clone(),
226            )
227            .await
228            {
229                Ok((addr, handle)) => {
230                    tracing::info!("System status server started successfully on {}", addr);
231
232                    // Store system status server information
233                    let system_status_server_info =
234                        crate::system_status_server::SystemStatusServerInfo::new(
235                            addr,
236                            Some(handle),
237                        );
238
239                    // Initialize the system_status_server field
240                    distributed_runtime
241                        .system_status_server
242                        .set(Arc::new(system_status_server_info))
243                        .expect("System status server info should only be set once");
244                }
245                Err(e) => {
246                    tracing::error!("System status server startup failed: {}", e);
247                }
248            }
249        } else {
250            // System server HTTP is disabled, but uptime metrics are still being tracked via SystemHealth
251            tracing::debug!(
252                "System status server HTTP endpoints disabled, but uptime metrics are being tracked"
253            );
254        }
255
256        // Start health check manager if enabled
257        if config.health_check_enabled {
258            let health_check_config = crate::health_check::HealthCheckConfig {
259                canary_wait_time: std::time::Duration::from_secs(config.canary_wait_time_secs),
260                request_timeout: std::time::Duration::from_secs(
261                    config.health_check_request_timeout_secs,
262                ),
263            };
264
265            // Start the health check manager (spawns per-endpoint monitoring tasks)
266            match crate::health_check::start_health_check_manager(
267                distributed_runtime.clone(),
268                Some(health_check_config),
269            )
270            .await
271            {
272                Ok(()) => tracing::info!(
273                    "Health check manager started (canary_wait_time: {}s, request_timeout: {}s)",
274                    config.canary_wait_time_secs,
275                    config.health_check_request_timeout_secs
276                ),
277                Err(e) => tracing::error!("Health check manager failed to start: {}", e),
278            }
279        }
280
281        Ok(distributed_runtime)
282    }
283
284    pub async fn from_settings(runtime: Runtime) -> Result<Self> {
285        let config = DistributedConfig::from_settings();
286        Self::new(runtime, config).await
287    }
288
289    pub fn runtime(&self) -> &Runtime {
290        &self.runtime
291    }
292
293    pub fn primary_token(&self) -> CancellationToken {
294        self.runtime.primary_token()
295    }
296
297    // TODO: Don't hand out pointers, instead have methods to use the registry in friendly ways
298    // (without being aware of async locks and so on)
299    pub fn component_registry(&self) -> &component::Registry {
300        &self.component_registry
301    }
302
303    // TODO: Don't hand out pointers, instead provide system health related services.
304    pub fn system_health(&self) -> Arc<parking_lot::Mutex<SystemHealth>> {
305        self.system_health.clone()
306    }
307
308    /// Get the local endpoint registry for in-process endpoint calls
309    pub fn local_endpoint_registry(
310        &self,
311    ) -> &crate::local_endpoint_registry::LocalEndpointRegistry {
312        &self.local_endpoint_registry
313    }
314
315    /// Get the engine route registry for registering custom /engine/* routes
316    pub fn engine_routes(&self) -> &crate::engine_routes::EngineRouteRegistry {
317        &self.engine_routes
318    }
319
320    pub fn connection_id(&self) -> u64 {
321        self.discovery_client.instance_id()
322    }
323
324    pub fn shutdown(&self) {
325        self.runtime.shutdown();
326        self.discovery_client.shutdown();
327    }
328
329    /// Create a [`Namespace`]
330    pub fn namespace(&self, name: impl Into<String>) -> Result<Namespace> {
331        Namespace::new(self.clone(), name.into())
332    }
333
334    /// Returns the discovery interface for service registration and discovery
335    pub fn discovery(&self) -> Arc<dyn Discovery> {
336        self.discovery_client.clone()
337    }
338
339    pub async fn tcp_server(&self) -> Result<Arc<tcp::server::TcpStreamServer>> {
340        Ok(self
341            .tcp_server
342            .get_or_try_init(async move {
343                let options = tcp::server::ServerOptions::default();
344                let server = tcp::server::TcpStreamServer::new(options).await?;
345                Ok::<_, PipelineError>(server)
346            })
347            .await?
348            .clone())
349    }
350
351    /// Get the network manager
352    ///
353    /// The network manager consolidates all network configuration and provides
354    /// unified access to request plane servers and clients.
355    pub fn network_manager(&self) -> Arc<NetworkManager> {
356        self.network_manager.clone()
357    }
358
359    /// Get the request plane server (convenience method)
360    ///
361    /// This is a shortcut for `network_manager().await?.server().await`.
362    pub async fn request_plane_server(
363        &self,
364    ) -> Result<Arc<dyn crate::pipeline::network::ingress::unified_server::RequestPlaneServer>>
365    {
366        self.network_manager().server().await
367    }
368
369    /// Get system status server information if available
370    pub fn system_status_server_info(
371        &self,
372    ) -> Option<Arc<crate::system_status_server::SystemStatusServerInfo>> {
373        self.system_status_server.get().cloned()
374    }
375
376    /// How the frontend should talk to the backend.
377    pub fn request_plane(&self) -> RequestPlaneMode {
378        self.request_plane
379    }
380
381    pub fn child_token(&self) -> CancellationToken {
382        self.runtime.child_token()
383    }
384
385    pub(crate) fn graceful_shutdown_tracker(&self) -> Arc<GracefulShutdownTracker> {
386        self.runtime.graceful_shutdown_tracker()
387    }
388
389    pub fn instance_sources(&self) -> Arc<Mutex<InstanceMap>> {
390        self.instance_sources.clone()
391    }
392
393    /// TODO: This is a temporary KV router measure for component/component.rs EventPublisher impl for
394    /// Component, to allow it to publish to NATS. KV Router is the only user.
395    ///
396    /// When NATS is not available (e.g., running in approximate mode with --no-kv-events),
397    /// this function returns Ok(()) silently since publishing is optional in that mode.
398    pub async fn kv_router_nats_publish(
399        &self,
400        subject: String,
401        payload: bytes::Bytes,
402    ) -> anyhow::Result<()> {
403        let Some(nats_client) = self.nats_client.as_ref() else {
404            // NATS not available - this is expected in approximate mode (--no-kv-events)
405            tracing::trace!("Skipping NATS publish (NATS not configured): {}", subject);
406            return Ok(());
407        };
408        Ok(nats_client.client().publish(subject, payload).await?)
409    }
410
411    /// TODO: This is a temporary KV router measure for component/component.rs EventSubscriber impl for
412    /// Component, to allow it to subscribe to NATS. KV Router is the only user.
413    pub(crate) async fn kv_router_nats_subscribe(
414        &self,
415        subject: String,
416    ) -> Result<async_nats::Subscriber> {
417        let Some(nats_client) = self.nats_client.as_ref() else {
418            anyhow::bail!("KV router's EventSubscriber requires NATS");
419        };
420        Ok(nats_client.client().subscribe(subject).await?)
421    }
422
423    /// TODO (karenc): This is a temporary KV router measure for worker query requests.
424    /// Allows KV Router to perform request/reply with workers. (versus the pub/sub pattern above)
425    /// KV Router is the only user, made public for use in dynamo-llm crate
426    pub async fn kv_router_nats_request(
427        &self,
428        subject: String,
429        payload: bytes::Bytes,
430        timeout: std::time::Duration,
431    ) -> anyhow::Result<async_nats::Message> {
432        let Some(nats_client) = self.nats_client.as_ref() else {
433            anyhow::bail!("KV router's request requires NATS");
434        };
435        let response =
436            tokio::time::timeout(timeout, nats_client.client().request(subject, payload))
437                .await
438                .map_err(|_| anyhow::anyhow!("Request timed out after {:?}", timeout))??;
439        Ok(response)
440    }
441
442    /// DEPRECATED: This method exists only for NATS request plane support.
443    /// Once everything uses the TCP request plane, this can be removed along with
444    /// the NATS service registration infrastructure.
445    ///
446    /// Returns a receiver that signals when the NATS service registration is complete.
447    /// The caller should use `blocking_recv()` to wait for completion.
448    pub fn register_nats_service(
449        &self,
450        component: Component,
451    ) -> tokio::sync::mpsc::Receiver<Result<(), String>> {
452        // Create a oneshot-style channel (capacity 1) to signal completion
453        let (tx, rx) = tokio::sync::mpsc::channel::<Result<(), String>>(1);
454
455        let drt = self.clone();
456        self.runtime().secondary().spawn(async move {
457            let service_name = component.service_name();
458
459            // Pre-check to save cost of creating the service, but don't hold the lock
460            if drt
461                .component_registry()
462                .inner
463                .lock()
464                .await
465                .services
466                .contains_key(&service_name)
467            {
468                // The NATS service is per component, but it is called from `serve_endpoint`, and there
469                // are often multiple endpoints for a component (e.g. `clear_kv_blocks` and `generate`).
470                tracing::trace!("Service {service_name} already exists");
471                // Signal success - service already exists
472                let _ = tx.send(Ok(())).await;
473                return;
474            }
475
476            let Some(nats_client) = drt.nats_client.as_ref() else {
477                tracing::error!("Cannot create NATS service without NATS.");
478                let _ = tx
479                    .send(Err("Cannot create NATS service without NATS".to_string()))
480                    .await;
481                return;
482            };
483            let description = None;
484            let nats_service = match crate::component::service::build_nats_service(
485                nats_client,
486                &component,
487                description,
488            )
489            .await
490            {
491                Ok(service) => service,
492                Err(err) => {
493                    tracing::error!(error = %err, component = service_name, "Failed to build NATS service");
494                    let _ = tx.send(Err(format!("Failed to build NATS service: {err}"))).await;
495                    return;
496                }
497            };
498
499            let mut guard = drt.component_registry().inner.lock().await;
500            if !guard.services.contains_key(&service_name) {
501                // Normal case
502                guard.services.insert(service_name.clone(), nats_service);
503
504                tracing::info!("Added NATS service {service_name}");
505
506                drop(guard);
507            } else {
508                drop(guard);
509                let _ = nats_service.stop().await;
510                // The NATS service is per component, but it is called from `serve_endpoint`, and there
511                // are often multiple endpoints for a component (e.g. `clear_kv_blocks` and `generate`).
512                // TODO: Is this still true?
513            }
514
515            // Signal completion - service registered successfully
516            let _ = tx.send(Ok(())).await;
517        });
518
519        rx
520    }
521}
522
523/// Selects which discovery backend to use and, for KV store backends, which KV store.
524#[derive(Clone, Debug)]
525pub enum DiscoveryBackend {
526    /// Use Kubernetes API for service discovery (no KV store needed)
527    Kubernetes,
528    /// Use a KV store (etcd, file, or memory) for service discovery
529    KvStore(kv::Selector),
530}
531
532#[derive(Dissolve)]
533pub struct DistributedConfig {
534    pub discovery_backend: DiscoveryBackend,
535    pub nats_config: Option<nats::ClientOptions>,
536    pub request_plane: RequestPlaneMode,
537}
538
539impl DistributedConfig {
540    pub fn from_settings() -> DistributedConfig {
541        let request_plane = RequestPlaneMode::from_env();
542        // NATS is used for more than just NATS request-plane RPC:
543        // - KV router events (JetStream or NATS core + local indexer)
544        // - inter-router replica sync (NATS core)
545        //
546        // Historically we only connected to NATS when the request plane was NATS, which made
547        // `DYN_REQUEST_PLANE=tcp|http` incompatible with KV routing modes that rely on NATS.
548        // If a NATS server is configured via env, enable the client regardless of request plane.
549        let nats_enabled = request_plane.is_nats()
550            || std::env::var(crate::config::environment_names::nats::NATS_SERVER).is_ok();
551
552        // DYN_DISCOVERY_BACKEND selects the discovery mechanism
553        // Valid values: "kubernetes", "etcd" (default), "file", "mem"
554        let backend_str =
555            std::env::var("DYN_DISCOVERY_BACKEND").unwrap_or_else(|_| "etcd".to_string());
556
557        let discovery_backend = match backend_str.as_str() {
558            "kubernetes" => {
559                tracing::info!("Using Kubernetes discovery backend");
560                DiscoveryBackend::Kubernetes
561            }
562            other => {
563                let selector: kv::Selector = other.parse().unwrap_or_else(|_| {
564                    panic!(
565                        "Unknown DYN_DISCOVERY_BACKEND value: '{other}'. \
566                         Valid options: kubernetes, etcd, file, mem"
567                    )
568                });
569                DiscoveryBackend::KvStore(selector)
570            }
571        };
572
573        DistributedConfig {
574            discovery_backend,
575            nats_config: if nats_enabled {
576                Some(nats::ClientOptions::default())
577            } else {
578                None
579            },
580            request_plane,
581        }
582    }
583
584    pub fn for_cli() -> DistributedConfig {
585        let etcd_config = etcd::ClientOptions {
586            attach_lease: false,
587            ..Default::default()
588        };
589        let request_plane = RequestPlaneMode::from_env();
590        let nats_enabled = request_plane.is_nats()
591            || std::env::var(crate::config::environment_names::nats::NATS_SERVER).is_ok();
592        DistributedConfig {
593            discovery_backend: DiscoveryBackend::KvStore(kv::Selector::Etcd(Box::new(etcd_config))),
594            nats_config: if nats_enabled {
595                Some(nats::ClientOptions::default())
596            } else {
597                None
598            },
599            request_plane,
600        }
601    }
602
603    /// A DistributedConfig that isn't distributed, for when the frontend and backend are in the
604    /// same process.
605    pub fn process_local() -> DistributedConfig {
606        DistributedConfig {
607            discovery_backend: DiscoveryBackend::KvStore(kv::Selector::Memory),
608            nats_config: None,
609            // This won't be used in process local, so we likely need a "none" option to
610            // communicate that and avoid opening the ports.
611            request_plane: RequestPlaneMode::Tcp,
612        }
613    }
614}
615
616/// Request plane transport mode configuration
617///
618/// This determines how requests are distributed from routers to workers:
619/// - `Nats`: Use NATS for request distribution (legacy)
620/// - `Http`: Use HTTP/2 for request distribution
621/// - `Tcp`: Use raw TCP for request distribution with msgpack support (default)
622#[derive(Debug, Clone, Copy, PartialEq, Eq)]
623pub enum RequestPlaneMode {
624    /// Use NATS for request plane
625    Nats,
626    /// Use HTTP/2 for request plane
627    Http,
628    /// Use raw TCP for request plane with msgpack support
629    Tcp,
630}
631
632impl Default for RequestPlaneMode {
633    fn default() -> Self {
634        Self::Tcp
635    }
636}
637
638impl fmt::Display for RequestPlaneMode {
639    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
640        match self {
641            Self::Nats => write!(f, "nats"),
642            Self::Http => write!(f, "http"),
643            Self::Tcp => write!(f, "tcp"),
644        }
645    }
646}
647
648impl std::str::FromStr for RequestPlaneMode {
649    type Err = anyhow::Error;
650
651    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
652        match s.to_lowercase().as_str() {
653            "nats" => Ok(Self::Nats),
654            "http" => Ok(Self::Http),
655            "tcp" => Ok(Self::Tcp),
656            _ => Err(anyhow::anyhow!(
657                "Invalid request plane mode: '{}'. Valid options are: 'nats', 'http', 'tcp'",
658                s
659            )),
660        }
661    }
662}
663
664impl RequestPlaneMode {
665    /// Get the request plane mode from environment variable (uncached)
666    /// Reads from `DYN_REQUEST_PLANE` environment variable.
667    fn from_env() -> Self {
668        std::env::var("DYN_REQUEST_PLANE")
669            .ok()
670            .and_then(|s| s.parse().ok())
671            .unwrap_or_default()
672    }
673
674    pub fn is_nats(&self) -> bool {
675        matches!(self, RequestPlaneMode::Nats)
676    }
677}
678
679pub mod distributed_test_utils {
680    //! Common test helper functions for DistributedRuntime tests
681
682    /// Helper function to create a DRT instance for integration-only tests.
683    /// Uses from_current to leverage existing tokio runtime
684    /// Note: Settings are read from environment variables inside DistributedRuntime::from_settings
685    #[cfg(feature = "integration")]
686    pub async fn create_test_drt_async() -> super::DistributedRuntime {
687        use crate::transports::nats;
688
689        let rt = crate::Runtime::from_current().unwrap();
690        let config = super::DistributedConfig {
691            discovery_backend: super::DiscoveryBackend::KvStore(
692                crate::storage::kv::Selector::Memory,
693            ),
694            nats_config: Some(nats::ClientOptions::default()),
695            request_plane: crate::distributed::RequestPlaneMode::default(),
696        };
697        super::DistributedRuntime::new(rt, config).await.unwrap()
698    }
699
700    /// Helper function to create a DRT instance which points at
701    /// a (shared) file-backed KV store and ephemeral NATS transport so that
702    /// multiple DRT instances may observe the same registration state.
703    /// NOTE: This gets around the fact that create_test_drt_async() is
704    /// hardcoded to spin up a memory-backed discovery store
705    /// which means we can't share discovery state across runtimes.
706    pub async fn create_test_shared_drt_async(
707        store_path: &std::path::Path,
708    ) -> super::DistributedRuntime {
709        use crate::transports::nats;
710
711        let rt = crate::Runtime::from_current().unwrap();
712        let config = super::DistributedConfig {
713            discovery_backend: super::DiscoveryBackend::KvStore(
714                crate::storage::kv::Selector::File(store_path.to_path_buf()),
715            ),
716            nats_config: Some(nats::ClientOptions::default()),
717            request_plane: crate::distributed::RequestPlaneMode::default(),
718        };
719        super::DistributedRuntime::new(rt, config).await.unwrap()
720    }
721}
722
723#[cfg(all(test, feature = "integration"))]
724mod tests {
725    use super::RequestPlaneMode;
726    use super::distributed_test_utils::create_test_drt_async;
727
728    #[tokio::test]
729    async fn test_drt_uptime_after_delay_system_disabled() {
730        use crate::config::environment_names::runtime::system as env_system;
731        // Test uptime with system status server disabled
732        temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
733            // Start a DRT
734            let drt = create_test_drt_async().await;
735
736            // Wait 50ms
737            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
738
739            // Check that uptime is 50+ ms
740            let uptime = drt.system_health.lock().uptime();
741            assert!(
742                uptime >= std::time::Duration::from_millis(50),
743                "Expected uptime to be at least 50ms, but got {:?}",
744                uptime
745            );
746
747            println!(
748                "✓ DRT uptime test passed (system disabled): uptime = {:?}",
749                uptime
750            );
751        })
752        .await;
753    }
754
755    #[tokio::test]
756    async fn test_drt_uptime_after_delay_system_enabled() {
757        use crate::config::environment_names::runtime::system as env_system;
758        // Test uptime with system status server enabled
759        temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, Some("8081"))], async {
760            // Start a DRT
761            let drt = create_test_drt_async().await;
762
763            // Wait 50ms
764            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
765
766            // Check that uptime is 50+ ms
767            let uptime = drt.system_health.lock().uptime();
768            assert!(
769                uptime >= std::time::Duration::from_millis(50),
770                "Expected uptime to be at least 50ms, but got {:?}",
771                uptime
772            );
773
774            println!(
775                "✓ DRT uptime test passed (system enabled): uptime = {:?}",
776                uptime
777            );
778        })
779        .await;
780    }
781
782    #[test]
783    fn test_request_plane_mode_from_str() {
784        assert_eq!(
785            "nats".parse::<RequestPlaneMode>().unwrap(),
786            RequestPlaneMode::Nats
787        );
788        assert_eq!(
789            "http".parse::<RequestPlaneMode>().unwrap(),
790            RequestPlaneMode::Http
791        );
792        assert_eq!(
793            "tcp".parse::<RequestPlaneMode>().unwrap(),
794            RequestPlaneMode::Tcp
795        );
796        assert_eq!(
797            "NATS".parse::<RequestPlaneMode>().unwrap(),
798            RequestPlaneMode::Nats
799        );
800        assert_eq!(
801            "HTTP".parse::<RequestPlaneMode>().unwrap(),
802            RequestPlaneMode::Http
803        );
804        assert_eq!(
805            "TCP".parse::<RequestPlaneMode>().unwrap(),
806            RequestPlaneMode::Tcp
807        );
808        assert!("invalid".parse::<RequestPlaneMode>().is_err());
809    }
810
811    #[test]
812    fn test_request_plane_mode_display() {
813        assert_eq!(RequestPlaneMode::Nats.to_string(), "nats");
814        assert_eq!(RequestPlaneMode::Http.to_string(), "http");
815        assert_eq!(RequestPlaneMode::Tcp.to_string(), "tcp");
816    }
817}