1use crate::component::{Component, Instance};
5use crate::pipeline::PipelineError;
6use crate::pipeline::network::manager::NetworkManager;
7use crate::service::{ServiceClient, ServiceSet};
8use crate::storage::kv::{self, Store as _};
9use crate::{
10 component::{self, ComponentBuilder, Endpoint, Namespace},
11 discovery::Discovery,
12 metrics::PrometheusUpdateCallback,
13 metrics::{MetricsHierarchy, MetricsRegistry},
14 transports::{etcd, nats, tcp},
15};
16use crate::{discovery, system_status_server, transports};
17
18use super::utils::GracefulShutdownTracker;
19use crate::SystemHealth;
20use crate::runtime::Runtime;
21
22use async_once_cell::OnceCell;
24
25use std::fmt;
26use std::sync::{Arc, OnceLock, Weak};
27use std::time::Duration;
28use tokio::sync::watch::Receiver;
29
30use anyhow::Result;
31use derive_getters::Dissolve;
32use figment::error;
33use std::collections::HashMap;
34use tokio::sync::Mutex;
35use tokio_util::sync::CancellationToken;
36
37type InstanceMap = HashMap<Endpoint, Weak<Receiver<Vec<Instance>>>>;
38
39#[derive(Clone)]
42pub struct DistributedRuntime {
43 runtime: Runtime,
45
46 nats_client: Option<transports::nats::Client>,
47 store: kv::Manager,
48 network_manager: Arc<NetworkManager>,
49 tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>,
50 system_status_server: Arc<OnceLock<Arc<system_status_server::SystemStatusServerInfo>>>,
51 request_plane: RequestPlaneMode,
52
53 discovery_client: Arc<dyn discovery::Discovery>,
55
56 discovery_metadata: Option<Arc<tokio::sync::RwLock<discovery::DiscoveryMetadata>>>,
59
60 component_registry: component::Registry,
66
67 instance_sources: Arc<tokio::sync::Mutex<InstanceMap>>,
68
69 system_health: Arc<parking_lot::Mutex<SystemHealth>>,
71
72 local_endpoint_registry: crate::local_endpoint_registry::LocalEndpointRegistry,
74
75 metrics_registry: MetricsRegistry,
77
78 engine_routes: crate::engine_routes::EngineRouteRegistry,
80}
81
82impl MetricsHierarchy for DistributedRuntime {
83 fn basename(&self) -> String {
84 "".to_string() }
86
87 fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
88 vec![] }
90
91 fn get_metrics_registry(&self) -> &MetricsRegistry {
92 &self.metrics_registry
93 }
94}
95
96impl std::fmt::Debug for DistributedRuntime {
97 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98 write!(f, "DistributedRuntime")
99 }
100}
101
102impl DistributedRuntime {
103 pub async fn new(runtime: Runtime, config: DistributedConfig) -> Result<Self> {
104 let (selected_kv_store, nats_config, request_plane) = config.dissolve();
105
106 let runtime_clone = runtime.clone();
107
108 let store = match selected_kv_store {
109 kv::Selector::Etcd(etcd_config) => {
110 let etcd_client = etcd::Client::new(*etcd_config, runtime_clone).await.inspect_err(|err|
111 tracing::error!(%err, "Could not connect to etcd. Pass `--store-kv ..` to use a different backend or start etcd."))?;
114 kv::Manager::etcd(etcd_client)
115 }
116 kv::Selector::File(root) => kv::Manager::file(runtime.primary_token(), root),
117 kv::Selector::Memory => kv::Manager::memory(),
118 };
119
120 let nats_client = match nats_config {
121 Some(nc) => Some(nc.connect().await?),
122 None => None,
123 };
124
125 let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default();
127 let cancel_token = if config.system_server_enabled() {
130 Some(runtime.clone().child_token())
131 } else {
132 None
133 };
134 let starting_health_status = config.starting_health_status.clone();
135 let use_endpoint_health_status = config.use_endpoint_health_status.clone();
136 let health_endpoint_path = config.system_health_path.clone();
137 let live_endpoint_path = config.system_live_path.clone();
138 let system_health = Arc::new(parking_lot::Mutex::new(SystemHealth::new(
139 starting_health_status,
140 use_endpoint_health_status,
141 health_endpoint_path,
142 live_endpoint_path,
143 )));
144
145 let discovery_backend =
147 std::env::var("DYN_DISCOVERY_BACKEND").unwrap_or_else(|_| "kv_store".to_string());
148
149 let (discovery_client, discovery_metadata) = match discovery_backend.as_str() {
150 "kubernetes" => {
151 tracing::info!("Initializing Kubernetes discovery backend");
152 let metadata = Arc::new(tokio::sync::RwLock::new(
153 crate::discovery::DiscoveryMetadata::new(),
154 ));
155 let client = crate::discovery::KubeDiscoveryClient::new(
156 metadata.clone(),
157 runtime.primary_token(),
158 )
159 .await
160 .inspect_err(
161 |err| tracing::error!(%err, "Failed to initialize Kubernetes discovery client"),
162 )?;
163 (Arc::new(client) as Arc<dyn Discovery>, Some(metadata))
164 }
165 _ => {
166 tracing::info!("Initializing KV store discovery backend");
167 use crate::discovery::KVStoreDiscovery;
168 (
169 Arc::new(KVStoreDiscovery::new(
170 store.clone(),
171 runtime.primary_token(),
172 )) as Arc<dyn Discovery>,
173 None,
174 )
175 }
176 };
177
178 let component_registry = component::Registry::new();
179
180 let network_manager = NetworkManager::new(
182 runtime.child_token(),
183 nats_client.clone().map(|c| c.client().clone()),
184 component_registry.clone(),
185 request_plane,
186 );
187
188 let distributed_runtime = Self {
189 runtime,
190 store,
191 network_manager: Arc::new(network_manager),
192 nats_client,
193 tcp_server: Arc::new(OnceCell::new()),
194 system_status_server: Arc::new(OnceLock::new()),
195 discovery_client,
196 discovery_metadata,
197 component_registry,
198 instance_sources: Arc::new(Mutex::new(HashMap::new())),
199 metrics_registry: crate::MetricsRegistry::new(),
200 system_health,
201 request_plane,
202 local_endpoint_registry: crate::local_endpoint_registry::LocalEndpointRegistry::new(),
203 engine_routes: crate::engine_routes::EngineRouteRegistry::new(),
204 };
205
206 distributed_runtime
208 .system_health
209 .lock()
210 .initialize_uptime_gauge(&distributed_runtime)?;
211
212 if let Some(cancel_token) = cancel_token {
214 let host = config.system_host.clone();
216 let port = config.system_port as u16;
217
218 match crate::system_status_server::spawn_system_status_server(
220 &host,
221 port,
222 cancel_token,
223 Arc::new(distributed_runtime.clone()),
224 distributed_runtime.discovery_metadata.clone(),
225 )
226 .await
227 {
228 Ok((addr, handle)) => {
229 tracing::info!("System status server started successfully on {}", addr);
230
231 let system_status_server_info =
233 crate::system_status_server::SystemStatusServerInfo::new(
234 addr,
235 Some(handle),
236 );
237
238 distributed_runtime
240 .system_status_server
241 .set(Arc::new(system_status_server_info))
242 .expect("System status server info should only be set once");
243 }
244 Err(e) => {
245 tracing::error!("System status server startup failed: {}", e);
246 }
247 }
248 } else {
249 tracing::debug!(
251 "System status server HTTP endpoints disabled, but uptime metrics are being tracked"
252 );
253 }
254
255 if config.health_check_enabled {
257 let health_check_config = crate::health_check::HealthCheckConfig {
258 canary_wait_time: std::time::Duration::from_secs(config.canary_wait_time_secs),
259 request_timeout: std::time::Duration::from_secs(
260 config.health_check_request_timeout_secs,
261 ),
262 };
263
264 match crate::health_check::start_health_check_manager(
266 distributed_runtime.clone(),
267 Some(health_check_config),
268 )
269 .await
270 {
271 Ok(()) => tracing::info!(
272 "Health check manager started (canary_wait_time: {}s, request_timeout: {}s)",
273 config.canary_wait_time_secs,
274 config.health_check_request_timeout_secs
275 ),
276 Err(e) => tracing::error!("Health check manager failed to start: {}", e),
277 }
278 }
279
280 Ok(distributed_runtime)
281 }
282
283 pub async fn from_settings(runtime: Runtime) -> Result<Self> {
284 let config = DistributedConfig::from_settings();
285 Self::new(runtime, config).await
286 }
287
288 pub fn runtime(&self) -> &Runtime {
289 &self.runtime
290 }
291
292 pub fn primary_token(&self) -> CancellationToken {
293 self.runtime.primary_token()
294 }
295
296 pub fn component_registry(&self) -> &component::Registry {
299 &self.component_registry
300 }
301
302 pub fn system_health(&self) -> Arc<parking_lot::Mutex<SystemHealth>> {
304 self.system_health.clone()
305 }
306
307 pub fn local_endpoint_registry(
309 &self,
310 ) -> &crate::local_endpoint_registry::LocalEndpointRegistry {
311 &self.local_endpoint_registry
312 }
313
314 pub fn engine_routes(&self) -> &crate::engine_routes::EngineRouteRegistry {
316 &self.engine_routes
317 }
318
319 pub fn connection_id(&self) -> u64 {
320 self.discovery_client.instance_id()
321 }
322
323 pub fn shutdown(&self) {
324 self.runtime.shutdown();
325 self.store.shutdown();
326 }
327
328 pub fn namespace(&self, name: impl Into<String>) -> Result<Namespace> {
330 Namespace::new(self.clone(), name.into())
331 }
332
333 pub fn discovery(&self) -> Arc<dyn Discovery> {
335 self.discovery_client.clone()
336 }
337
338 pub async fn tcp_server(&self) -> Result<Arc<tcp::server::TcpStreamServer>> {
339 Ok(self
340 .tcp_server
341 .get_or_try_init(async move {
342 let options = tcp::server::ServerOptions::default();
343 let server = tcp::server::TcpStreamServer::new(options).await?;
344 Ok::<_, PipelineError>(server)
345 })
346 .await?
347 .clone())
348 }
349
350 pub fn network_manager(&self) -> Arc<NetworkManager> {
355 self.network_manager.clone()
356 }
357
358 pub async fn request_plane_server(
362 &self,
363 ) -> Result<Arc<dyn crate::pipeline::network::ingress::unified_server::RequestPlaneServer>>
364 {
365 self.network_manager().server().await
366 }
367
368 pub fn system_status_server_info(
370 &self,
371 ) -> Option<Arc<crate::system_status_server::SystemStatusServerInfo>> {
372 self.system_status_server.get().cloned()
373 }
374
375 pub fn store(&self) -> &kv::Manager {
378 &self.store
379 }
380
381 pub fn request_plane(&self) -> RequestPlaneMode {
383 self.request_plane
384 }
385
386 pub fn child_token(&self) -> CancellationToken {
387 self.runtime.child_token()
388 }
389
390 pub(crate) fn graceful_shutdown_tracker(&self) -> Arc<GracefulShutdownTracker> {
391 self.runtime.graceful_shutdown_tracker()
392 }
393
394 pub fn instance_sources(&self) -> Arc<Mutex<InstanceMap>> {
395 self.instance_sources.clone()
396 }
397
398 pub async fn kv_router_nats_publish(
404 &self,
405 subject: String,
406 payload: bytes::Bytes,
407 ) -> anyhow::Result<()> {
408 let Some(nats_client) = self.nats_client.as_ref() else {
409 tracing::trace!("Skipping NATS publish (NATS not configured): {}", subject);
411 return Ok(());
412 };
413 Ok(nats_client.client().publish(subject, payload).await?)
414 }
415
416 pub(crate) async fn kv_router_nats_subscribe(
419 &self,
420 subject: String,
421 ) -> Result<async_nats::Subscriber> {
422 let Some(nats_client) = self.nats_client.as_ref() else {
423 anyhow::bail!("KV router's EventSubscriber requires NATS");
424 };
425 Ok(nats_client.client().subscribe(subject).await?)
426 }
427
428 pub async fn kv_router_nats_request(
432 &self,
433 subject: String,
434 payload: bytes::Bytes,
435 timeout: std::time::Duration,
436 ) -> anyhow::Result<async_nats::Message> {
437 let Some(nats_client) = self.nats_client.as_ref() else {
438 anyhow::bail!("KV router's request requires NATS");
439 };
440 let response =
441 tokio::time::timeout(timeout, nats_client.client().request(subject, payload))
442 .await
443 .map_err(|_| anyhow::anyhow!("Request timed out after {:?}", timeout))??;
444 Ok(response)
445 }
446
447 pub fn register_nats_service(
454 &self,
455 component: Component,
456 ) -> tokio::sync::mpsc::Receiver<Result<(), String>> {
457 let (tx, rx) = tokio::sync::mpsc::channel::<Result<(), String>>(1);
459
460 let drt = self.clone();
461 self.runtime().secondary().spawn(async move {
462 let service_name = component.service_name();
463
464 if drt
466 .component_registry()
467 .inner
468 .lock()
469 .await
470 .services
471 .contains_key(&service_name)
472 {
473 tracing::trace!("Service {service_name} already exists");
476 let _ = tx.send(Ok(())).await;
478 return;
479 }
480
481 let Some(nats_client) = drt.nats_client.as_ref() else {
482 tracing::error!("Cannot create NATS service without NATS.");
483 let _ = tx
484 .send(Err("Cannot create NATS service without NATS".to_string()))
485 .await;
486 return;
487 };
488 let description = None;
489 let nats_service = match crate::component::service::build_nats_service(
490 nats_client,
491 &component,
492 description,
493 )
494 .await
495 {
496 Ok(service) => service,
497 Err(err) => {
498 tracing::error!(error = %err, component = service_name, "Failed to build NATS service");
499 let _ = tx.send(Err(format!("Failed to build NATS service: {err}"))).await;
500 return;
501 }
502 };
503
504 let mut guard = drt.component_registry().inner.lock().await;
505 if !guard.services.contains_key(&service_name) {
506 guard.services.insert(service_name.clone(), nats_service);
508
509 tracing::info!("Added NATS service {service_name}");
510
511 drop(guard);
512 } else {
513 drop(guard);
514 let _ = nats_service.stop().await;
515 }
519
520 let _ = tx.send(Ok(())).await;
522 });
523
524 rx
525 }
526}
527
528#[derive(Dissolve)]
529pub struct DistributedConfig {
530 pub store_backend: kv::Selector,
531 pub nats_config: Option<nats::ClientOptions>,
532 pub request_plane: RequestPlaneMode,
533}
534
535impl DistributedConfig {
536 pub fn from_settings() -> DistributedConfig {
537 let request_plane = RequestPlaneMode::from_env();
538 let nats_enabled = request_plane.is_nats()
546 || std::env::var(crate::config::environment_names::nats::NATS_SERVER).is_ok();
547
548 let discovery_backend =
551 std::env::var("DYN_DISCOVERY_BACKEND").unwrap_or_else(|_| "kv_store".to_string());
552
553 let store_backend = if discovery_backend == "kubernetes" {
554 tracing::info!("Using Kubernetes discovery backend");
555 kv::Selector::Memory
556 } else {
557 kv::Selector::Etcd(Box::default())
558 };
559
560 DistributedConfig {
561 store_backend,
562 nats_config: if nats_enabled {
563 Some(nats::ClientOptions::default())
564 } else {
565 None
566 },
567 request_plane,
568 }
569 }
570
571 pub fn for_cli() -> DistributedConfig {
572 let etcd_config = etcd::ClientOptions {
573 attach_lease: false,
574 ..Default::default()
575 };
576 let request_plane = RequestPlaneMode::from_env();
577 let nats_enabled = request_plane.is_nats()
578 || std::env::var(crate::config::environment_names::nats::NATS_SERVER).is_ok();
579 DistributedConfig {
580 store_backend: kv::Selector::Etcd(Box::new(etcd_config)),
581 nats_config: if nats_enabled {
582 Some(nats::ClientOptions::default())
583 } else {
584 None
585 },
586 request_plane,
587 }
588 }
589
590 pub fn process_local() -> DistributedConfig {
593 DistributedConfig {
594 store_backend: kv::Selector::Memory,
595 nats_config: None,
596 request_plane: RequestPlaneMode::Tcp,
599 }
600 }
601}
602
603#[derive(Debug, Clone, Copy, PartialEq, Eq)]
610pub enum RequestPlaneMode {
611 Nats,
613 Http,
615 Tcp,
617}
618
619impl Default for RequestPlaneMode {
620 fn default() -> Self {
621 Self::Tcp
622 }
623}
624
625impl fmt::Display for RequestPlaneMode {
626 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
627 match self {
628 Self::Nats => write!(f, "nats"),
629 Self::Http => write!(f, "http"),
630 Self::Tcp => write!(f, "tcp"),
631 }
632 }
633}
634
635impl std::str::FromStr for RequestPlaneMode {
636 type Err = anyhow::Error;
637
638 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
639 match s.to_lowercase().as_str() {
640 "nats" => Ok(Self::Nats),
641 "http" => Ok(Self::Http),
642 "tcp" => Ok(Self::Tcp),
643 _ => Err(anyhow::anyhow!(
644 "Invalid request plane mode: '{}'. Valid options are: 'nats', 'http', 'tcp'",
645 s
646 )),
647 }
648 }
649}
650
651impl RequestPlaneMode {
652 fn from_env() -> Self {
655 std::env::var("DYN_REQUEST_PLANE")
656 .ok()
657 .and_then(|s| s.parse().ok())
658 .unwrap_or_default()
659 }
660
661 pub fn is_nats(&self) -> bool {
662 matches!(self, RequestPlaneMode::Nats)
663 }
664}
665
666pub mod distributed_test_utils {
667 #[cfg(feature = "integration")]
673 pub async fn create_test_drt_async() -> super::DistributedRuntime {
674 use crate::{storage::kv, transports::nats};
675
676 let rt = crate::Runtime::from_current().unwrap();
677 let config = super::DistributedConfig {
678 store_backend: kv::Selector::Memory,
679 nats_config: Some(nats::ClientOptions::default()),
680 request_plane: crate::distributed::RequestPlaneMode::default(),
681 };
682 super::DistributedRuntime::new(rt, config).await.unwrap()
683 }
684
685 pub async fn create_test_shared_drt_async(
692 store_path: &std::path::Path,
693 ) -> super::DistributedRuntime {
694 use crate::{storage::kv, transports::nats};
695
696 let rt = crate::Runtime::from_current().unwrap();
697 let config = super::DistributedConfig {
698 store_backend: kv::Selector::File(store_path.to_path_buf()),
699 nats_config: Some(nats::ClientOptions::default()),
700 request_plane: crate::distributed::RequestPlaneMode::default(),
701 };
702 super::DistributedRuntime::new(rt, config).await.unwrap()
703 }
704}
705
706#[cfg(all(test, feature = "integration"))]
707mod tests {
708 use super::RequestPlaneMode;
709 use super::distributed_test_utils::create_test_drt_async;
710
711 #[tokio::test]
712 async fn test_drt_uptime_after_delay_system_disabled() {
713 use crate::config::environment_names::runtime::system as env_system;
714 temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
716 let drt = create_test_drt_async().await;
718
719 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
721
722 let uptime = drt.system_health.lock().uptime();
724 assert!(
725 uptime >= std::time::Duration::from_millis(50),
726 "Expected uptime to be at least 50ms, but got {:?}",
727 uptime
728 );
729
730 println!(
731 "✓ DRT uptime test passed (system disabled): uptime = {:?}",
732 uptime
733 );
734 })
735 .await;
736 }
737
738 #[tokio::test]
739 async fn test_drt_uptime_after_delay_system_enabled() {
740 use crate::config::environment_names::runtime::system as env_system;
741 temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, Some("8081"))], async {
743 let drt = create_test_drt_async().await;
745
746 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
748
749 let uptime = drt.system_health.lock().uptime();
751 assert!(
752 uptime >= std::time::Duration::from_millis(50),
753 "Expected uptime to be at least 50ms, but got {:?}",
754 uptime
755 );
756
757 println!(
758 "✓ DRT uptime test passed (system enabled): uptime = {:?}",
759 uptime
760 );
761 })
762 .await;
763 }
764
765 #[test]
766 fn test_request_plane_mode_from_str() {
767 assert_eq!(
768 "nats".parse::<RequestPlaneMode>().unwrap(),
769 RequestPlaneMode::Nats
770 );
771 assert_eq!(
772 "http".parse::<RequestPlaneMode>().unwrap(),
773 RequestPlaneMode::Http
774 );
775 assert_eq!(
776 "tcp".parse::<RequestPlaneMode>().unwrap(),
777 RequestPlaneMode::Tcp
778 );
779 assert_eq!(
780 "NATS".parse::<RequestPlaneMode>().unwrap(),
781 RequestPlaneMode::Nats
782 );
783 assert_eq!(
784 "HTTP".parse::<RequestPlaneMode>().unwrap(),
785 RequestPlaneMode::Http
786 );
787 assert_eq!(
788 "TCP".parse::<RequestPlaneMode>().unwrap(),
789 RequestPlaneMode::Tcp
790 );
791 assert!("invalid".parse::<RequestPlaneMode>().is_err());
792 }
793
794 #[test]
795 fn test_request_plane_mode_display() {
796 assert_eq!(RequestPlaneMode::Nats.to_string(), "nats");
797 assert_eq!(RequestPlaneMode::Http.to_string(), "http");
798 assert_eq!(RequestPlaneMode::Tcp.to_string(), "tcp");
799 }
800}