1use crate::component::{
5 self, Component, ComponentBuilder, Endpoint, EndpointDiscoverySource, Instance, Namespace,
6 RoutingOccupancyState,
7};
8use crate::config::environment_names::tcp_response_stream;
9use crate::pipeline::PipelineError;
10use crate::pipeline::network::manager::NetworkManager;
11use crate::service::{ServiceClient, ServiceSet};
12use crate::storage::kv;
13use crate::{discovery, system_status_server, transports};
14use crate::{
15 discovery::Discovery,
16 metrics::PrometheusUpdateCallback,
17 metrics::{MetricsHierarchy, MetricsRegistry},
18 transports::{etcd, nats, tcp},
19};
20
21use super::utils::GracefulShutdownTracker;
22use crate::SystemHealth;
23use crate::runtime::Runtime;
24
25use async_once_cell::OnceCell;
27
28use std::fmt;
29use std::sync::{Arc, OnceLock, Weak};
30use std::time::Duration;
31use tokio::sync::watch::Receiver;
32
33use anyhow::Result;
34use derive_getters::Dissolve;
35use figment::error;
36use std::collections::HashMap;
37use tokio::sync::Mutex;
38use tokio_util::sync::CancellationToken;
39
40type EndpointDiscoverySourceMap = HashMap<Endpoint, Weak<EndpointDiscoverySource>>;
41type RoutingOccupancyMap = HashMap<Endpoint, Weak<RoutingOccupancyState>>;
42
43#[derive(Clone)]
46pub struct DistributedRuntime {
47 runtime: Runtime,
49
50 nats_client: Option<transports::nats::Client>,
51 network_manager: Arc<NetworkManager>,
52 tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>,
53 system_status_server: Arc<OnceLock<Arc<system_status_server::SystemStatusServerInfo>>>,
54 request_plane: RequestPlaneMode,
55
56 discovery_client: Arc<dyn discovery::Discovery>,
58
59 discovery_metadata: Option<Arc<tokio::sync::RwLock<discovery::DiscoveryMetadata>>>,
62
63 component_registry: component::Registry,
69
70 endpoint_discovery_sources: Arc<tokio::sync::Mutex<EndpointDiscoverySourceMap>>,
71 routing_occupancy_states: Arc<tokio::sync::Mutex<RoutingOccupancyMap>>,
72
73 system_health: Arc<parking_lot::Mutex<SystemHealth>>,
75
76 local_endpoint_registry: crate::local_endpoint_registry::LocalEndpointRegistry,
78
79 metrics_registry: MetricsRegistry,
81
82 engine_routes: crate::engine_routes::EngineRouteRegistry,
84
85 metadata_artifacts: crate::metadata_registry::MetadataArtifactRegistry,
87
88 event_transport_kind: crate::discovery::EventTransportKind,
91}
92
93impl MetricsHierarchy for DistributedRuntime {
94 fn basename(&self) -> String {
95 "".to_string() }
97
98 fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
99 vec![] }
101
102 fn get_metrics_registry(&self) -> &MetricsRegistry {
103 &self.metrics_registry
104 }
105
106 fn connection_id(&self) -> Option<u64> {
107 Some(self.discovery_client.instance_id())
108 }
109}
110
111impl std::fmt::Debug for DistributedRuntime {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 write!(f, "DistributedRuntime")
114 }
115}
116
117impl DistributedRuntime {
118 pub async fn new(runtime: Runtime, config: DistributedConfig) -> Result<Self> {
119 let (discovery_backend, nats_config, request_plane, event_transport_kind) =
120 config.dissolve();
121
122 let nats_client = match nats_config {
123 Some(nc) => Some(nc.connect().await?),
124 None => None,
125 };
126
127 let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default();
129 let cancel_token = if config.system_server_enabled() {
132 Some(runtime.clone().child_token())
133 } else {
134 None
135 };
136 let starting_health_status = config.starting_health_status.clone();
137 let use_endpoint_health_status = config.use_endpoint_health_status.clone();
138 let health_endpoint_path = config.system_health_path.clone();
139 let live_endpoint_path = config.system_live_path.clone();
140 let system_health = Arc::new(parking_lot::Mutex::new(SystemHealth::new(
141 starting_health_status,
142 use_endpoint_health_status,
143 config.health_check_enabled,
144 health_endpoint_path,
145 live_endpoint_path,
146 )));
147
148 let (discovery_client, discovery_metadata) = match discovery_backend {
150 DiscoveryBackend::Kubernetes => {
151 tracing::info!("Initializing Kubernetes discovery backend");
152 let metadata = Arc::new(tokio::sync::RwLock::new(
153 crate::discovery::DiscoveryMetadata::new(),
154 ));
155 let client = crate::discovery::KubeDiscoveryClient::new(
156 metadata.clone(),
157 runtime.primary_token(),
158 )
159 .await
160 .inspect_err(
161 |err| tracing::error!(%err, "Failed to initialize Kubernetes discovery client"),
162 )?;
163 (Arc::new(client) as Arc<dyn Discovery>, Some(metadata))
164 }
165 DiscoveryBackend::KvStore(kv_selector) => {
166 tracing::info!("Initializing KV store discovery backend: {kv_selector}");
167 let runtime_clone = runtime.clone();
168 let store = match kv_selector {
169 kv::Selector::Etcd(etcd_config) => {
170 let etcd_client = etcd::Client::new(*etcd_config, runtime_clone).await.inspect_err(|err|
171 tracing::error!(%err, "Could not connect to etcd. Pass `--discovery-backend ..` to use a different backend or start etcd."))?;
172 kv::Manager::etcd(etcd_client)
173 }
174 kv::Selector::File(root) => kv::Manager::file(runtime.primary_token(), root),
175 kv::Selector::Memory => kv::Manager::memory(),
176 };
177 use crate::discovery::KVStoreDiscovery;
178 (
179 Arc::new(KVStoreDiscovery::new(store, runtime.primary_token()))
180 as Arc<dyn Discovery>,
181 None,
182 )
183 }
184 };
185
186 let component_registry = component::Registry::new();
187
188 let network_manager = NetworkManager::new(
190 runtime.child_token(),
191 nats_client.clone().map(|c| c.client().clone()),
192 component_registry.clone(),
193 request_plane,
194 );
195
196 let distributed_runtime = Self {
197 runtime,
198 network_manager: Arc::new(network_manager),
199 nats_client,
200 tcp_server: Arc::new(OnceCell::new()),
201 system_status_server: Arc::new(OnceLock::new()),
202 discovery_client,
203 discovery_metadata,
204 component_registry,
205 endpoint_discovery_sources: Arc::new(Mutex::new(HashMap::new())),
206 routing_occupancy_states: Arc::new(Mutex::new(HashMap::new())),
207 metrics_registry: crate::MetricsRegistry::new(),
208 system_health,
209 request_plane,
210 local_endpoint_registry: crate::local_endpoint_registry::LocalEndpointRegistry::new(),
211 engine_routes: crate::engine_routes::EngineRouteRegistry::new(),
212 metadata_artifacts: crate::metadata_registry::MetadataArtifactRegistry::new(),
213 event_transport_kind,
214 };
215
216 distributed_runtime
218 .system_health
219 .lock()
220 .initialize_uptime_gauge(&distributed_runtime)?;
221
222 {
225 let system_health = distributed_runtime.system_health.clone();
226 distributed_runtime
227 .metrics_registry
228 .add_update_callback(std::sync::Arc::new(move || {
229 system_health.lock().update_uptime_gauge();
230 Ok(())
231 }));
232 }
233
234 if let Some(cancel_token) = cancel_token {
236 let host = config.system_host.clone();
238 let port = config.system_port as u16;
239
240 match crate::system_status_server::spawn_system_status_server(
242 &host,
243 port,
244 cancel_token,
245 Arc::new(distributed_runtime.clone()),
246 distributed_runtime.discovery_metadata.clone(),
247 )
248 .await
249 {
250 Ok((addr, handle)) => {
251 tracing::info!("System status server started successfully on {addr}");
252
253 let system_status_server_info =
255 crate::system_status_server::SystemStatusServerInfo::new(
256 addr,
257 Some(handle),
258 );
259
260 distributed_runtime
262 .system_status_server
263 .set(Arc::new(system_status_server_info))
264 .expect("System status server info should only be set once");
265 }
266 Err(e) => {
267 tracing::error!("System status server startup failed: {e}");
268 }
269 }
270 } else {
271 tracing::debug!(
273 "System status server HTTP endpoints disabled, but uptime metrics are being tracked"
274 );
275 }
276
277 if config.health_check_enabled {
279 let health_check_config = crate::health_check::HealthCheckConfig {
280 canary_wait_time: std::time::Duration::from_secs(config.canary_wait_time_secs),
281 request_timeout: std::time::Duration::from_secs(
282 config.health_check_request_timeout_secs,
283 ),
284 };
285
286 match crate::health_check::start_health_check_manager(
288 distributed_runtime.clone(),
289 Some(health_check_config),
290 )
291 .await
292 {
293 Ok(()) => tracing::info!(
294 "Health check manager started (canary_wait_time: {}s, request_timeout: {}s)",
295 config.canary_wait_time_secs,
296 config.health_check_request_timeout_secs
297 ),
298 Err(e) => tracing::error!("Health check manager failed to start: {e}"),
299 }
300 }
301
302 Ok(distributed_runtime)
303 }
304
305 pub async fn from_settings(runtime: Runtime) -> Result<Self> {
306 let config = DistributedConfig::from_settings();
307 Self::new(runtime, config).await
308 }
309
310 pub fn runtime(&self) -> &Runtime {
311 &self.runtime
312 }
313
314 pub fn primary_token(&self) -> CancellationToken {
315 self.runtime.primary_token()
316 }
317
318 pub fn component_registry(&self) -> &component::Registry {
321 &self.component_registry
322 }
323
324 pub fn system_health(&self) -> Arc<parking_lot::Mutex<SystemHealth>> {
326 self.system_health.clone()
327 }
328
329 pub fn local_endpoint_registry(
331 &self,
332 ) -> &crate::local_endpoint_registry::LocalEndpointRegistry {
333 &self.local_endpoint_registry
334 }
335
336 pub fn engine_routes(&self) -> &crate::engine_routes::EngineRouteRegistry {
338 &self.engine_routes
339 }
340
341 pub fn metadata_artifacts(&self) -> &crate::metadata_registry::MetadataArtifactRegistry {
342 &self.metadata_artifacts
343 }
344
345 pub fn connection_id(&self) -> u64 {
346 self.discovery_client.instance_id()
347 }
348
349 pub fn shutdown(&self) {
350 self.runtime.shutdown();
351 self.discovery_client.shutdown();
352 }
353
354 pub fn namespace(&self, name: impl Into<String>) -> Result<Namespace> {
356 Namespace::new(self.clone(), name.into())
357 }
358
359 pub fn discovery(&self) -> Arc<dyn Discovery> {
361 self.discovery_client.clone()
362 }
363
364 pub async fn tcp_server(&self) -> Result<Arc<tcp::server::TcpStreamServer>> {
365 Ok(self
366 .tcp_server
367 .get_or_try_init(async move {
368 let port = match std::env::var(tcp_response_stream::DYN_TCP_RESPONSE_STREAM_PORT) {
369 Ok(p) => p.parse::<u16>().map_err(|_| {
370 PipelineError::Generic(format!(
371 "invalid {}: '{}' is not a valid port number",
372 tcp_response_stream::DYN_TCP_RESPONSE_STREAM_PORT,
373 p
374 ))
375 })?,
376 Err(_) => 0,
377 };
378 let interface = std::env::var(tcp_response_stream::DYN_TCP_RESPONSE_STREAM_HOST)
379 .ok()
380 .filter(|h| !h.is_empty());
381
382 let host_suffix = interface
383 .as_ref()
384 .map_or(String::new(), |h| format!(" on host {h}"));
385 if port == 0 {
386 tracing::info!(
387 "TCP response stream server using OS-assigned port{host_suffix}"
388 );
389 } else {
390 tracing::info!(
391 "TCP response stream server using fixed port {port}{host_suffix}"
392 );
393 }
394
395 let options = tcp::server::ServerOptions { port, interface };
396 let server = tcp::server::TcpStreamServer::new(options).await?;
397 Ok::<_, PipelineError>(server)
398 })
399 .await?
400 .clone())
401 }
402
403 pub fn network_manager(&self) -> Arc<NetworkManager> {
408 self.network_manager.clone()
409 }
410
411 pub async fn request_plane_server(
415 &self,
416 ) -> Result<Arc<dyn crate::pipeline::network::ingress::unified_server::RequestPlaneServer>>
417 {
418 self.network_manager().server().await
419 }
420
421 pub fn system_status_server_info(
423 &self,
424 ) -> Option<Arc<crate::system_status_server::SystemStatusServerInfo>> {
425 self.system_status_server.get().cloned()
426 }
427
428 pub fn request_plane(&self) -> RequestPlaneMode {
430 self.request_plane
431 }
432
433 pub fn default_event_transport_kind(&self) -> crate::discovery::EventTransportKind {
443 self.event_transport_kind
444 }
445
446 pub fn child_token(&self) -> CancellationToken {
447 self.runtime.child_token()
448 }
449
450 pub(crate) fn graceful_shutdown_tracker(&self) -> Arc<GracefulShutdownTracker> {
451 self.runtime.graceful_shutdown_tracker()
452 }
453
454 pub(crate) fn endpoint_discovery_sources(&self) -> Arc<Mutex<EndpointDiscoverySourceMap>> {
455 self.endpoint_discovery_sources.clone()
456 }
457
458 pub fn register_graceful_task(&self) -> crate::utils::GracefulTaskGuard {
464 self.runtime.graceful_shutdown_tracker().register_task()
465 }
466
467 pub(crate) fn routing_occupancy_states(&self) -> Arc<Mutex<RoutingOccupancyMap>> {
468 self.routing_occupancy_states.clone()
469 }
470
471 pub async fn kv_router_nats_publish(
477 &self,
478 subject: String,
479 payload: bytes::Bytes,
480 ) -> anyhow::Result<()> {
481 let Some(nats_client) = self.nats_client.as_ref() else {
482 tracing::trace!("Skipping NATS publish (NATS not configured): {subject}");
484 return Ok(());
485 };
486 Ok(nats_client.client().publish(subject, payload).await?)
487 }
488
489 pub(crate) async fn kv_router_nats_subscribe(
492 &self,
493 subject: String,
494 ) -> Result<async_nats::Subscriber> {
495 let Some(nats_client) = self.nats_client.as_ref() else {
496 anyhow::bail!("KV router's EventSubscriber requires NATS");
497 };
498 Ok(nats_client.client().subscribe(subject).await?)
499 }
500
501 pub async fn kv_router_nats_request(
505 &self,
506 subject: String,
507 payload: bytes::Bytes,
508 timeout: std::time::Duration,
509 ) -> anyhow::Result<async_nats::Message> {
510 let Some(nats_client) = self.nats_client.as_ref() else {
511 anyhow::bail!("KV router's request requires NATS");
512 };
513 let response =
514 tokio::time::timeout(timeout, nats_client.client().request(subject, payload))
515 .await
516 .map_err(|_| anyhow::anyhow!("Request timed out after {:?}", timeout))??;
517 Ok(response)
518 }
519
520 pub fn register_nats_service(
527 &self,
528 component: Component,
529 ) -> tokio::sync::mpsc::Receiver<Result<(), String>> {
530 let (tx, rx) = tokio::sync::mpsc::channel::<Result<(), String>>(1);
532
533 let drt = self.clone();
534 self.runtime().secondary().spawn(async move {
535 let service_name = component.service_name();
536
537 if drt
539 .component_registry()
540 .inner
541 .lock()
542 .await
543 .services
544 .contains_key(&service_name)
545 {
546 tracing::trace!("Service {service_name} already exists");
549 let _ = tx.send(Ok(())).await;
551 return;
552 }
553
554 let Some(nats_client) = drt.nats_client.as_ref() else {
555 tracing::error!("Cannot create NATS service without NATS.");
556 let _ = tx
557 .send(Err("Cannot create NATS service without NATS".to_string()))
558 .await;
559 return;
560 };
561 let description = None;
562 let nats_service = match crate::component::service::build_nats_service(
563 nats_client,
564 &component,
565 description,
566 )
567 .await
568 {
569 Ok(service) => service,
570 Err(err) => {
571 tracing::error!(error = %err, component = service_name, "Failed to build NATS service");
572 let _ = tx.send(Err(format!("Failed to build NATS service: {err}"))).await;
573 return;
574 }
575 };
576
577 let mut guard = drt.component_registry().inner.lock().await;
578 if !guard.services.contains_key(&service_name) {
579 guard.services.insert(service_name.clone(), nats_service);
581
582 tracing::info!("Added NATS service {service_name}");
583
584 drop(guard);
585 } else {
586 drop(guard);
587 let _ = nats_service.stop().await;
588 }
592
593 let _ = tx.send(Ok(())).await;
595 });
596
597 rx
598 }
599}
600
601#[derive(Clone, Debug)]
603pub enum DiscoveryBackend {
604 Kubernetes,
606 KvStore(kv::Selector),
608}
609
610impl DiscoveryBackend {
611 pub fn is_local(&self) -> bool {
617 matches!(
618 self,
619 DiscoveryBackend::KvStore(kv::Selector::File(_))
620 | DiscoveryBackend::KvStore(kv::Selector::Memory)
621 )
622 }
623
624 pub fn resolve_event_transport_kind(&self) -> crate::discovery::EventTransportKind {
633 use crate::config::environment_names::event_plane::DYN_EVENT_PLANE;
634 use crate::discovery::EventTransportKind;
635 match std::env::var(DYN_EVENT_PLANE).as_deref() {
636 Ok("nats") => EventTransportKind::Nats,
637 Ok("zmq") => EventTransportKind::Zmq,
638 Ok("") | Err(_) => {
640 if self.is_local() {
641 EventTransportKind::Zmq
642 } else {
643 EventTransportKind::Nats
644 }
645 }
646 Ok(other) => {
647 let default_kind = if self.is_local() {
648 EventTransportKind::Zmq
649 } else {
650 EventTransportKind::Nats
651 };
652 tracing::warn!(
653 "Invalid DYN_EVENT_PLANE value '{}'. Valid values: 'nats', 'zmq'. \
654 Defaulting to {:?}.",
655 other,
656 default_kind
657 );
658 default_kind
659 }
660 }
661 }
662}
663
664#[derive(Dissolve)]
665pub struct DistributedConfig {
666 pub discovery_backend: DiscoveryBackend,
667 pub nats_config: Option<nats::ClientOptions>,
668 pub request_plane: RequestPlaneMode,
669 pub event_transport_kind: crate::discovery::EventTransportKind,
674}
675
676impl DistributedConfig {
677 pub fn from_settings() -> DistributedConfig {
678 let request_plane = RequestPlaneMode::from_env();
679
680 let backend_str =
683 std::env::var("DYN_DISCOVERY_BACKEND").unwrap_or_else(|_| "etcd".to_string());
684
685 let discovery_backend = match backend_str.as_str() {
686 "kubernetes" => {
687 tracing::info!("Using Kubernetes discovery backend");
688 DiscoveryBackend::Kubernetes
689 }
690 other => {
691 let selector: kv::Selector = other.parse().unwrap_or_else(|_| {
692 panic!(
693 "Unknown DYN_DISCOVERY_BACKEND value: '{other}'. \
694 Valid options: kubernetes, etcd, file, mem"
695 )
696 });
697 DiscoveryBackend::KvStore(selector)
698 }
699 };
700
701 let event_transport_kind = discovery_backend.resolve_event_transport_kind();
705
706 let nats_enabled = request_plane.is_nats()
715 || std::env::var(crate::config::environment_names::nats::NATS_SERVER).is_ok()
716 || matches!(
717 event_transport_kind,
718 crate::discovery::EventTransportKind::Nats
719 );
720
721 DistributedConfig {
722 discovery_backend,
723 nats_config: if nats_enabled {
724 Some(nats::ClientOptions::default())
725 } else {
726 None
727 },
728 request_plane,
729 event_transport_kind,
730 }
731 }
732
733 pub fn for_cli() -> DistributedConfig {
734 let etcd_config = etcd::ClientOptions {
735 attach_lease: false,
736 ..Default::default()
737 };
738 let request_plane = RequestPlaneMode::from_env();
739 let discovery_backend =
740 DiscoveryBackend::KvStore(kv::Selector::Etcd(Box::new(etcd_config)));
741 let event_transport_kind = discovery_backend.resolve_event_transport_kind();
742 let nats_enabled = request_plane.is_nats()
743 || std::env::var(crate::config::environment_names::nats::NATS_SERVER).is_ok()
744 || matches!(
745 event_transport_kind,
746 crate::discovery::EventTransportKind::Nats
747 );
748 DistributedConfig {
749 discovery_backend,
750 nats_config: if nats_enabled {
751 Some(nats::ClientOptions::default())
752 } else {
753 None
754 },
755 request_plane,
756 event_transport_kind,
757 }
758 }
759
760 pub fn process_local() -> DistributedConfig {
763 DistributedConfig {
764 discovery_backend: DiscoveryBackend::KvStore(kv::Selector::Memory),
765 nats_config: None,
766 request_plane: RequestPlaneMode::Tcp,
769 event_transport_kind: crate::discovery::EventTransportKind::Zmq,
770 }
771 }
772}
773
774#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
780pub enum RequestPlaneMode {
781 Nats,
783 #[default]
785 Tcp,
786}
787
788impl fmt::Display for RequestPlaneMode {
789 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
790 match self {
791 Self::Nats => write!(f, "nats"),
792 Self::Tcp => write!(f, "tcp"),
793 }
794 }
795}
796
797impl std::str::FromStr for RequestPlaneMode {
798 type Err = anyhow::Error;
799
800 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
801 match s.to_lowercase().as_str() {
802 "nats" => Ok(Self::Nats),
803 "tcp" => Ok(Self::Tcp),
804 _ => Err(anyhow::anyhow!(
805 "Invalid request plane mode: '{}'. Valid options are: 'nats', 'tcp'",
806 s
807 )),
808 }
809 }
810}
811
812impl RequestPlaneMode {
813 fn from_env() -> Self {
816 std::env::var("DYN_REQUEST_PLANE")
817 .ok()
818 .and_then(|s| s.parse().ok())
819 .unwrap_or_default()
820 }
821
822 pub fn is_nats(&self) -> bool {
823 matches!(self, RequestPlaneMode::Nats)
824 }
825}
826
827pub mod distributed_test_utils {
828 #[cfg(feature = "integration")]
834 pub async fn create_test_drt_async() -> super::DistributedRuntime {
835 use crate::transports::nats;
836
837 let rt = crate::Runtime::from_current().unwrap();
838 let config = super::DistributedConfig {
839 discovery_backend: super::DiscoveryBackend::KvStore(
840 crate::storage::kv::Selector::Memory,
841 ),
842 nats_config: Some(nats::ClientOptions::default()),
843 request_plane: crate::distributed::RequestPlaneMode::default(),
844 event_transport_kind: crate::discovery::EventTransportKind::Nats,
845 };
846 super::DistributedRuntime::new(rt, config).await.unwrap()
847 }
848
849 pub async fn create_test_shared_drt_async(
856 store_path: &std::path::Path,
857 ) -> super::DistributedRuntime {
858 use crate::transports::nats;
859
860 let rt = crate::Runtime::from_current().unwrap();
861 let config = super::DistributedConfig {
862 discovery_backend: super::DiscoveryBackend::KvStore(
863 crate::storage::kv::Selector::File(store_path.to_path_buf()),
864 ),
865 nats_config: Some(nats::ClientOptions::default()),
866 request_plane: crate::distributed::RequestPlaneMode::default(),
867 event_transport_kind: crate::discovery::EventTransportKind::Nats,
868 };
869 super::DistributedRuntime::new(rt, config).await.unwrap()
870 }
871}
872
873#[cfg(all(test, feature = "integration"))]
874mod tests {
875 use super::RequestPlaneMode;
876 use super::distributed_test_utils::create_test_drt_async;
877
878 #[tokio::test]
879 async fn test_drt_uptime_after_delay_system_disabled() {
880 use crate::config::environment_names::runtime::system as env_system;
881 temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
883 let drt = create_test_drt_async().await;
885
886 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
888
889 let uptime = drt.system_health.lock().uptime();
891 assert!(
892 uptime >= std::time::Duration::from_millis(50),
893 "Expected uptime to be at least 50ms, but got {:?}",
894 uptime
895 );
896
897 println!(
898 "✓ DRT uptime test passed (system disabled): uptime = {:?}",
899 uptime
900 );
901 })
902 .await;
903 }
904
905 #[tokio::test]
906 async fn test_drt_uptime_after_delay_system_enabled() {
907 use crate::config::environment_names::runtime::system as env_system;
908 temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, Some("8081"))], async {
910 let drt = create_test_drt_async().await;
912
913 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
915
916 let uptime = drt.system_health.lock().uptime();
918 assert!(
919 uptime >= std::time::Duration::from_millis(50),
920 "Expected uptime to be at least 50ms, but got {:?}",
921 uptime
922 );
923
924 println!(
925 "✓ DRT uptime test passed (system enabled): uptime = {:?}",
926 uptime
927 );
928 })
929 .await;
930 }
931
932 #[test]
933 fn test_request_plane_mode_from_str() {
934 assert_eq!(
935 "nats".parse::<RequestPlaneMode>().unwrap(),
936 RequestPlaneMode::Nats
937 );
938 assert_eq!(
939 "tcp".parse::<RequestPlaneMode>().unwrap(),
940 RequestPlaneMode::Tcp
941 );
942 assert_eq!(
943 "NATS".parse::<RequestPlaneMode>().unwrap(),
944 RequestPlaneMode::Nats
945 );
946 assert_eq!(
947 "TCP".parse::<RequestPlaneMode>().unwrap(),
948 RequestPlaneMode::Tcp
949 );
950 assert!("invalid".parse::<RequestPlaneMode>().is_err());
951 }
952}