1use crate::component::{
5 self, Component, ComponentBuilder, Endpoint, Instance, Namespace, RoutingOccupancyState,
6};
7use crate::config::environment_names::tcp_response_stream;
8use crate::pipeline::PipelineError;
9use crate::pipeline::network::manager::NetworkManager;
10use crate::service::{ServiceClient, ServiceSet};
11use crate::storage::kv;
12use crate::{discovery, system_status_server, transports};
13use crate::{
14 discovery::Discovery,
15 metrics::PrometheusUpdateCallback,
16 metrics::{MetricsHierarchy, MetricsRegistry},
17 transports::{etcd, nats, tcp},
18};
19
20use super::utils::GracefulShutdownTracker;
21use crate::SystemHealth;
22use crate::runtime::Runtime;
23
24use async_once_cell::OnceCell;
26
27use std::fmt;
28use std::sync::{Arc, OnceLock, Weak};
29use std::time::Duration;
30use tokio::sync::watch::Receiver;
31
32use anyhow::Result;
33use derive_getters::Dissolve;
34use figment::error;
35use std::collections::HashMap;
36use tokio::sync::Mutex;
37use tokio_util::sync::CancellationToken;
38
39type InstanceMap = HashMap<Endpoint, Weak<Receiver<Vec<Instance>>>>;
40type RoutingOccupancyMap = HashMap<Endpoint, Weak<RoutingOccupancyState>>;
41
42#[derive(Clone)]
45pub struct DistributedRuntime {
46 runtime: Runtime,
48
49 nats_client: Option<transports::nats::Client>,
50 network_manager: Arc<NetworkManager>,
51 tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>,
52 system_status_server: Arc<OnceLock<Arc<system_status_server::SystemStatusServerInfo>>>,
53 request_plane: RequestPlaneMode,
54
55 discovery_client: Arc<dyn discovery::Discovery>,
57
58 discovery_metadata: Option<Arc<tokio::sync::RwLock<discovery::DiscoveryMetadata>>>,
61
62 component_registry: component::Registry,
68
69 instance_sources: Arc<tokio::sync::Mutex<InstanceMap>>,
70 routing_occupancy_states: Arc<tokio::sync::Mutex<RoutingOccupancyMap>>,
71
72 system_health: Arc<parking_lot::Mutex<SystemHealth>>,
74
75 local_endpoint_registry: crate::local_endpoint_registry::LocalEndpointRegistry,
77
78 metrics_registry: MetricsRegistry,
80
81 engine_routes: crate::engine_routes::EngineRouteRegistry,
83
84 event_transport_kind: crate::discovery::EventTransportKind,
87}
88
89impl MetricsHierarchy for DistributedRuntime {
90 fn basename(&self) -> String {
91 "".to_string() }
93
94 fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
95 vec![] }
97
98 fn get_metrics_registry(&self) -> &MetricsRegistry {
99 &self.metrics_registry
100 }
101
102 fn connection_id(&self) -> Option<u64> {
103 Some(self.discovery_client.instance_id())
104 }
105}
106
107impl std::fmt::Debug for DistributedRuntime {
108 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109 write!(f, "DistributedRuntime")
110 }
111}
112
113impl DistributedRuntime {
114 pub async fn new(runtime: Runtime, config: DistributedConfig) -> Result<Self> {
115 let (discovery_backend, nats_config, request_plane, event_transport_kind) =
116 config.dissolve();
117
118 let nats_client = match nats_config {
119 Some(nc) => Some(nc.connect().await?),
120 None => None,
121 };
122
123 let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default();
125 let cancel_token = if config.system_server_enabled() {
128 Some(runtime.clone().child_token())
129 } else {
130 None
131 };
132 let starting_health_status = config.starting_health_status.clone();
133 let use_endpoint_health_status = config.use_endpoint_health_status.clone();
134 let health_endpoint_path = config.system_health_path.clone();
135 let live_endpoint_path = config.system_live_path.clone();
136 let system_health = Arc::new(parking_lot::Mutex::new(SystemHealth::new(
137 starting_health_status,
138 use_endpoint_health_status,
139 health_endpoint_path,
140 live_endpoint_path,
141 )));
142
143 let (discovery_client, discovery_metadata) = match discovery_backend {
145 DiscoveryBackend::Kubernetes => {
146 tracing::info!("Initializing Kubernetes discovery backend");
147 let metadata = Arc::new(tokio::sync::RwLock::new(
148 crate::discovery::DiscoveryMetadata::new(),
149 ));
150 let client = crate::discovery::KubeDiscoveryClient::new(
151 metadata.clone(),
152 runtime.primary_token(),
153 )
154 .await
155 .inspect_err(
156 |err| tracing::error!(%err, "Failed to initialize Kubernetes discovery client"),
157 )?;
158 (Arc::new(client) as Arc<dyn Discovery>, Some(metadata))
159 }
160 DiscoveryBackend::KvStore(kv_selector) => {
161 tracing::info!("Initializing KV store discovery backend: {kv_selector}");
162 let runtime_clone = runtime.clone();
163 let store = match kv_selector {
164 kv::Selector::Etcd(etcd_config) => {
165 let etcd_client = etcd::Client::new(*etcd_config, runtime_clone).await.inspect_err(|err|
166 tracing::error!(%err, "Could not connect to etcd. Pass `--discovery-backend ..` to use a different backend or start etcd."))?;
167 kv::Manager::etcd(etcd_client)
168 }
169 kv::Selector::File(root) => kv::Manager::file(runtime.primary_token(), root),
170 kv::Selector::Memory => kv::Manager::memory(),
171 };
172 use crate::discovery::KVStoreDiscovery;
173 (
174 Arc::new(KVStoreDiscovery::new(store, runtime.primary_token()))
175 as Arc<dyn Discovery>,
176 None,
177 )
178 }
179 };
180
181 let component_registry = component::Registry::new();
182
183 let network_manager = NetworkManager::new(
185 runtime.child_token(),
186 nats_client.clone().map(|c| c.client().clone()),
187 component_registry.clone(),
188 request_plane,
189 );
190
191 let distributed_runtime = Self {
192 runtime,
193 network_manager: Arc::new(network_manager),
194 nats_client,
195 tcp_server: Arc::new(OnceCell::new()),
196 system_status_server: Arc::new(OnceLock::new()),
197 discovery_client,
198 discovery_metadata,
199 component_registry,
200 instance_sources: Arc::new(Mutex::new(HashMap::new())),
201 routing_occupancy_states: Arc::new(Mutex::new(HashMap::new())),
202 metrics_registry: crate::MetricsRegistry::new(),
203 system_health,
204 request_plane,
205 local_endpoint_registry: crate::local_endpoint_registry::LocalEndpointRegistry::new(),
206 engine_routes: crate::engine_routes::EngineRouteRegistry::new(),
207 event_transport_kind,
208 };
209
210 distributed_runtime
212 .system_health
213 .lock()
214 .initialize_uptime_gauge(&distributed_runtime)?;
215
216 {
219 let system_health = distributed_runtime.system_health.clone();
220 distributed_runtime
221 .metrics_registry
222 .add_update_callback(std::sync::Arc::new(move || {
223 system_health.lock().update_uptime_gauge();
224 Ok(())
225 }));
226 }
227
228 if let Some(cancel_token) = cancel_token {
230 let host = config.system_host.clone();
232 let port = config.system_port as u16;
233
234 match crate::system_status_server::spawn_system_status_server(
236 &host,
237 port,
238 cancel_token,
239 Arc::new(distributed_runtime.clone()),
240 distributed_runtime.discovery_metadata.clone(),
241 )
242 .await
243 {
244 Ok((addr, handle)) => {
245 tracing::info!("System status server started successfully on {addr}");
246
247 let system_status_server_info =
249 crate::system_status_server::SystemStatusServerInfo::new(
250 addr,
251 Some(handle),
252 );
253
254 distributed_runtime
256 .system_status_server
257 .set(Arc::new(system_status_server_info))
258 .expect("System status server info should only be set once");
259 }
260 Err(e) => {
261 tracing::error!("System status server startup failed: {e}");
262 }
263 }
264 } else {
265 tracing::debug!(
267 "System status server HTTP endpoints disabled, but uptime metrics are being tracked"
268 );
269 }
270
271 if config.health_check_enabled {
273 let health_check_config = crate::health_check::HealthCheckConfig {
274 canary_wait_time: std::time::Duration::from_secs(config.canary_wait_time_secs),
275 request_timeout: std::time::Duration::from_secs(
276 config.health_check_request_timeout_secs,
277 ),
278 };
279
280 match crate::health_check::start_health_check_manager(
282 distributed_runtime.clone(),
283 Some(health_check_config),
284 )
285 .await
286 {
287 Ok(()) => tracing::info!(
288 "Health check manager started (canary_wait_time: {}s, request_timeout: {}s)",
289 config.canary_wait_time_secs,
290 config.health_check_request_timeout_secs
291 ),
292 Err(e) => tracing::error!("Health check manager failed to start: {e}"),
293 }
294 }
295
296 Ok(distributed_runtime)
297 }
298
299 pub async fn from_settings(runtime: Runtime) -> Result<Self> {
300 let config = DistributedConfig::from_settings();
301 Self::new(runtime, config).await
302 }
303
304 pub fn runtime(&self) -> &Runtime {
305 &self.runtime
306 }
307
308 pub fn primary_token(&self) -> CancellationToken {
309 self.runtime.primary_token()
310 }
311
312 pub fn component_registry(&self) -> &component::Registry {
315 &self.component_registry
316 }
317
318 pub fn system_health(&self) -> Arc<parking_lot::Mutex<SystemHealth>> {
320 self.system_health.clone()
321 }
322
323 pub fn local_endpoint_registry(
325 &self,
326 ) -> &crate::local_endpoint_registry::LocalEndpointRegistry {
327 &self.local_endpoint_registry
328 }
329
330 pub fn engine_routes(&self) -> &crate::engine_routes::EngineRouteRegistry {
332 &self.engine_routes
333 }
334
335 pub fn connection_id(&self) -> u64 {
336 self.discovery_client.instance_id()
337 }
338
339 pub fn shutdown(&self) {
340 self.runtime.shutdown();
341 self.discovery_client.shutdown();
342 }
343
344 pub fn namespace(&self, name: impl Into<String>) -> Result<Namespace> {
346 Namespace::new(self.clone(), name.into())
347 }
348
349 pub fn discovery(&self) -> Arc<dyn Discovery> {
351 self.discovery_client.clone()
352 }
353
354 pub async fn tcp_server(&self) -> Result<Arc<tcp::server::TcpStreamServer>> {
355 Ok(self
356 .tcp_server
357 .get_or_try_init(async move {
358 let port = match std::env::var(tcp_response_stream::DYN_TCP_RESPONSE_STREAM_PORT) {
359 Ok(p) => p.parse::<u16>().map_err(|_| {
360 PipelineError::Generic(format!(
361 "invalid {}: '{}' is not a valid port number",
362 tcp_response_stream::DYN_TCP_RESPONSE_STREAM_PORT,
363 p
364 ))
365 })?,
366 Err(_) => 0,
367 };
368 let interface = std::env::var(tcp_response_stream::DYN_TCP_RESPONSE_STREAM_HOST)
369 .ok()
370 .filter(|h| !h.is_empty());
371
372 let host_suffix = interface
373 .as_ref()
374 .map_or(String::new(), |h| format!(" on host {h}"));
375 if port == 0 {
376 tracing::info!(
377 "TCP response stream server using OS-assigned port{host_suffix}"
378 );
379 } else {
380 tracing::info!(
381 "TCP response stream server using fixed port {port}{host_suffix}"
382 );
383 }
384
385 let options = tcp::server::ServerOptions { port, interface };
386 let server = tcp::server::TcpStreamServer::new(options).await?;
387 Ok::<_, PipelineError>(server)
388 })
389 .await?
390 .clone())
391 }
392
393 pub fn network_manager(&self) -> Arc<NetworkManager> {
398 self.network_manager.clone()
399 }
400
401 pub async fn request_plane_server(
405 &self,
406 ) -> Result<Arc<dyn crate::pipeline::network::ingress::unified_server::RequestPlaneServer>>
407 {
408 self.network_manager().server().await
409 }
410
411 pub fn system_status_server_info(
413 &self,
414 ) -> Option<Arc<crate::system_status_server::SystemStatusServerInfo>> {
415 self.system_status_server.get().cloned()
416 }
417
418 pub fn request_plane(&self) -> RequestPlaneMode {
420 self.request_plane
421 }
422
423 pub fn default_event_transport_kind(&self) -> crate::discovery::EventTransportKind {
433 self.event_transport_kind
434 }
435
436 pub fn child_token(&self) -> CancellationToken {
437 self.runtime.child_token()
438 }
439
440 pub(crate) fn graceful_shutdown_tracker(&self) -> Arc<GracefulShutdownTracker> {
441 self.runtime.graceful_shutdown_tracker()
442 }
443
444 pub fn instance_sources(&self) -> Arc<Mutex<InstanceMap>> {
445 self.instance_sources.clone()
446 }
447
448 pub(crate) fn routing_occupancy_states(&self) -> Arc<Mutex<RoutingOccupancyMap>> {
449 self.routing_occupancy_states.clone()
450 }
451
452 pub async fn kv_router_nats_publish(
458 &self,
459 subject: String,
460 payload: bytes::Bytes,
461 ) -> anyhow::Result<()> {
462 let Some(nats_client) = self.nats_client.as_ref() else {
463 tracing::trace!("Skipping NATS publish (NATS not configured): {subject}");
465 return Ok(());
466 };
467 Ok(nats_client.client().publish(subject, payload).await?)
468 }
469
470 pub(crate) async fn kv_router_nats_subscribe(
473 &self,
474 subject: String,
475 ) -> Result<async_nats::Subscriber> {
476 let Some(nats_client) = self.nats_client.as_ref() else {
477 anyhow::bail!("KV router's EventSubscriber requires NATS");
478 };
479 Ok(nats_client.client().subscribe(subject).await?)
480 }
481
482 pub async fn kv_router_nats_request(
486 &self,
487 subject: String,
488 payload: bytes::Bytes,
489 timeout: std::time::Duration,
490 ) -> anyhow::Result<async_nats::Message> {
491 let Some(nats_client) = self.nats_client.as_ref() else {
492 anyhow::bail!("KV router's request requires NATS");
493 };
494 let response =
495 tokio::time::timeout(timeout, nats_client.client().request(subject, payload))
496 .await
497 .map_err(|_| anyhow::anyhow!("Request timed out after {:?}", timeout))??;
498 Ok(response)
499 }
500
501 pub fn register_nats_service(
508 &self,
509 component: Component,
510 ) -> tokio::sync::mpsc::Receiver<Result<(), String>> {
511 let (tx, rx) = tokio::sync::mpsc::channel::<Result<(), String>>(1);
513
514 let drt = self.clone();
515 self.runtime().secondary().spawn(async move {
516 let service_name = component.service_name();
517
518 if drt
520 .component_registry()
521 .inner
522 .lock()
523 .await
524 .services
525 .contains_key(&service_name)
526 {
527 tracing::trace!("Service {service_name} already exists");
530 let _ = tx.send(Ok(())).await;
532 return;
533 }
534
535 let Some(nats_client) = drt.nats_client.as_ref() else {
536 tracing::error!("Cannot create NATS service without NATS.");
537 let _ = tx
538 .send(Err("Cannot create NATS service without NATS".to_string()))
539 .await;
540 return;
541 };
542 let description = None;
543 let nats_service = match crate::component::service::build_nats_service(
544 nats_client,
545 &component,
546 description,
547 )
548 .await
549 {
550 Ok(service) => service,
551 Err(err) => {
552 tracing::error!(error = %err, component = service_name, "Failed to build NATS service");
553 let _ = tx.send(Err(format!("Failed to build NATS service: {err}"))).await;
554 return;
555 }
556 };
557
558 let mut guard = drt.component_registry().inner.lock().await;
559 if !guard.services.contains_key(&service_name) {
560 guard.services.insert(service_name.clone(), nats_service);
562
563 tracing::info!("Added NATS service {service_name}");
564
565 drop(guard);
566 } else {
567 drop(guard);
568 let _ = nats_service.stop().await;
569 }
573
574 let _ = tx.send(Ok(())).await;
576 });
577
578 rx
579 }
580}
581
582#[derive(Clone, Debug)]
584pub enum DiscoveryBackend {
585 Kubernetes,
587 KvStore(kv::Selector),
589}
590
591impl DiscoveryBackend {
592 pub fn is_local(&self) -> bool {
598 matches!(
599 self,
600 DiscoveryBackend::KvStore(kv::Selector::File(_))
601 | DiscoveryBackend::KvStore(kv::Selector::Memory)
602 )
603 }
604
605 pub fn resolve_event_transport_kind(&self) -> crate::discovery::EventTransportKind {
614 use crate::config::environment_names::event_plane::DYN_EVENT_PLANE;
615 use crate::discovery::EventTransportKind;
616 match std::env::var(DYN_EVENT_PLANE).as_deref() {
617 Ok("nats") => EventTransportKind::Nats,
618 Ok("zmq") => EventTransportKind::Zmq,
619 Ok("") | Err(_) => {
621 if self.is_local() {
622 EventTransportKind::Zmq
623 } else {
624 EventTransportKind::Nats
625 }
626 }
627 Ok(other) => {
628 let default_kind = if self.is_local() {
629 EventTransportKind::Zmq
630 } else {
631 EventTransportKind::Nats
632 };
633 tracing::warn!(
634 "Invalid DYN_EVENT_PLANE value '{}'. Valid values: 'nats', 'zmq'. \
635 Defaulting to {:?}.",
636 other,
637 default_kind
638 );
639 default_kind
640 }
641 }
642 }
643}
644
645#[derive(Dissolve)]
646pub struct DistributedConfig {
647 pub discovery_backend: DiscoveryBackend,
648 pub nats_config: Option<nats::ClientOptions>,
649 pub request_plane: RequestPlaneMode,
650 pub event_transport_kind: crate::discovery::EventTransportKind,
655}
656
657impl DistributedConfig {
658 pub fn from_settings() -> DistributedConfig {
659 let request_plane = RequestPlaneMode::from_env();
660
661 let backend_str =
664 std::env::var("DYN_DISCOVERY_BACKEND").unwrap_or_else(|_| "etcd".to_string());
665
666 let discovery_backend = match backend_str.as_str() {
667 "kubernetes" => {
668 tracing::info!("Using Kubernetes discovery backend");
669 DiscoveryBackend::Kubernetes
670 }
671 other => {
672 let selector: kv::Selector = other.parse().unwrap_or_else(|_| {
673 panic!(
674 "Unknown DYN_DISCOVERY_BACKEND value: '{other}'. \
675 Valid options: kubernetes, etcd, file, mem"
676 )
677 });
678 DiscoveryBackend::KvStore(selector)
679 }
680 };
681
682 let event_transport_kind = discovery_backend.resolve_event_transport_kind();
686
687 let nats_enabled = request_plane.is_nats()
696 || std::env::var(crate::config::environment_names::nats::NATS_SERVER).is_ok()
697 || matches!(
698 event_transport_kind,
699 crate::discovery::EventTransportKind::Nats
700 );
701
702 DistributedConfig {
703 discovery_backend,
704 nats_config: if nats_enabled {
705 Some(nats::ClientOptions::default())
706 } else {
707 None
708 },
709 request_plane,
710 event_transport_kind,
711 }
712 }
713
714 pub fn for_cli() -> DistributedConfig {
715 let etcd_config = etcd::ClientOptions {
716 attach_lease: false,
717 ..Default::default()
718 };
719 let request_plane = RequestPlaneMode::from_env();
720 let discovery_backend =
721 DiscoveryBackend::KvStore(kv::Selector::Etcd(Box::new(etcd_config)));
722 let event_transport_kind = discovery_backend.resolve_event_transport_kind();
723 let nats_enabled = request_plane.is_nats()
724 || std::env::var(crate::config::environment_names::nats::NATS_SERVER).is_ok()
725 || matches!(
726 event_transport_kind,
727 crate::discovery::EventTransportKind::Nats
728 );
729 DistributedConfig {
730 discovery_backend,
731 nats_config: if nats_enabled {
732 Some(nats::ClientOptions::default())
733 } else {
734 None
735 },
736 request_plane,
737 event_transport_kind,
738 }
739 }
740
741 pub fn process_local() -> DistributedConfig {
744 DistributedConfig {
745 discovery_backend: DiscoveryBackend::KvStore(kv::Selector::Memory),
746 nats_config: None,
747 request_plane: RequestPlaneMode::Tcp,
750 event_transport_kind: crate::discovery::EventTransportKind::Zmq,
751 }
752 }
753}
754
755#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
762pub enum RequestPlaneMode {
763 Nats,
765 Http,
767 #[default]
769 Tcp,
770}
771
772impl fmt::Display for RequestPlaneMode {
773 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
774 match self {
775 Self::Nats => write!(f, "nats"),
776 Self::Http => write!(f, "http"),
777 Self::Tcp => write!(f, "tcp"),
778 }
779 }
780}
781
782impl std::str::FromStr for RequestPlaneMode {
783 type Err = anyhow::Error;
784
785 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
786 match s.to_lowercase().as_str() {
787 "nats" => Ok(Self::Nats),
788 "http" => Ok(Self::Http),
789 "tcp" => Ok(Self::Tcp),
790 _ => Err(anyhow::anyhow!(
791 "Invalid request plane mode: '{}'. Valid options are: 'nats', 'http', 'tcp'",
792 s
793 )),
794 }
795 }
796}
797
798impl RequestPlaneMode {
799 fn from_env() -> Self {
802 std::env::var("DYN_REQUEST_PLANE")
803 .ok()
804 .and_then(|s| s.parse().ok())
805 .unwrap_or_default()
806 }
807
808 pub fn is_nats(&self) -> bool {
809 matches!(self, RequestPlaneMode::Nats)
810 }
811}
812
813pub mod distributed_test_utils {
814 #[cfg(feature = "integration")]
820 pub async fn create_test_drt_async() -> super::DistributedRuntime {
821 use crate::transports::nats;
822
823 let rt = crate::Runtime::from_current().unwrap();
824 let config = super::DistributedConfig {
825 discovery_backend: super::DiscoveryBackend::KvStore(
826 crate::storage::kv::Selector::Memory,
827 ),
828 nats_config: Some(nats::ClientOptions::default()),
829 request_plane: crate::distributed::RequestPlaneMode::default(),
830 event_transport_kind: crate::discovery::EventTransportKind::Nats,
831 };
832 super::DistributedRuntime::new(rt, config).await.unwrap()
833 }
834
835 pub async fn create_test_shared_drt_async(
842 store_path: &std::path::Path,
843 ) -> super::DistributedRuntime {
844 use crate::transports::nats;
845
846 let rt = crate::Runtime::from_current().unwrap();
847 let config = super::DistributedConfig {
848 discovery_backend: super::DiscoveryBackend::KvStore(
849 crate::storage::kv::Selector::File(store_path.to_path_buf()),
850 ),
851 nats_config: Some(nats::ClientOptions::default()),
852 request_plane: crate::distributed::RequestPlaneMode::default(),
853 event_transport_kind: crate::discovery::EventTransportKind::Nats,
854 };
855 super::DistributedRuntime::new(rt, config).await.unwrap()
856 }
857}
858
859#[cfg(all(test, feature = "integration"))]
860mod tests {
861 use super::RequestPlaneMode;
862 use super::distributed_test_utils::create_test_drt_async;
863
864 #[tokio::test]
865 async fn test_drt_uptime_after_delay_system_disabled() {
866 use crate::config::environment_names::runtime::system as env_system;
867 temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
869 let drt = create_test_drt_async().await;
871
872 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
874
875 let uptime = drt.system_health.lock().uptime();
877 assert!(
878 uptime >= std::time::Duration::from_millis(50),
879 "Expected uptime to be at least 50ms, but got {:?}",
880 uptime
881 );
882
883 println!(
884 "✓ DRT uptime test passed (system disabled): uptime = {:?}",
885 uptime
886 );
887 })
888 .await;
889 }
890
891 #[tokio::test]
892 async fn test_drt_uptime_after_delay_system_enabled() {
893 use crate::config::environment_names::runtime::system as env_system;
894 temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, Some("8081"))], async {
896 let drt = create_test_drt_async().await;
898
899 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
901
902 let uptime = drt.system_health.lock().uptime();
904 assert!(
905 uptime >= std::time::Duration::from_millis(50),
906 "Expected uptime to be at least 50ms, but got {:?}",
907 uptime
908 );
909
910 println!(
911 "✓ DRT uptime test passed (system enabled): uptime = {:?}",
912 uptime
913 );
914 })
915 .await;
916 }
917
918 #[test]
919 fn test_request_plane_mode_from_str() {
920 assert_eq!(
921 "nats".parse::<RequestPlaneMode>().unwrap(),
922 RequestPlaneMode::Nats
923 );
924 assert_eq!(
925 "http".parse::<RequestPlaneMode>().unwrap(),
926 RequestPlaneMode::Http
927 );
928 assert_eq!(
929 "tcp".parse::<RequestPlaneMode>().unwrap(),
930 RequestPlaneMode::Tcp
931 );
932 assert_eq!(
933 "NATS".parse::<RequestPlaneMode>().unwrap(),
934 RequestPlaneMode::Nats
935 );
936 assert_eq!(
937 "HTTP".parse::<RequestPlaneMode>().unwrap(),
938 RequestPlaneMode::Http
939 );
940 assert_eq!(
941 "TCP".parse::<RequestPlaneMode>().unwrap(),
942 RequestPlaneMode::Tcp
943 );
944 assert!("invalid".parse::<RequestPlaneMode>().is_err());
945 }
946
947 #[test]
948 fn test_request_plane_mode_display() {
949 assert_eq!(RequestPlaneMode::Nats.to_string(), "nats");
950 assert_eq!(RequestPlaneMode::Http.to_string(), "http");
951 assert_eq!(RequestPlaneMode::Tcp.to_string(), "tcp");
952 }
953}