1use crate::component::{Component, Instance};
5use crate::pipeline::PipelineError;
6use crate::pipeline::network::manager::NetworkManager;
7use crate::service::{ServiceClient, ServiceSet};
8use crate::storage::kv;
9use crate::{
10 component::{self, ComponentBuilder, Endpoint, Namespace},
11 discovery::Discovery,
12 metrics::PrometheusUpdateCallback,
13 metrics::{MetricsHierarchy, MetricsRegistry},
14 transports::{etcd, nats, tcp},
15};
16use crate::{discovery, system_status_server, transports};
17
18use super::utils::GracefulShutdownTracker;
19use crate::SystemHealth;
20use crate::runtime::Runtime;
21
22use async_once_cell::OnceCell;
24
25use std::fmt;
26use std::sync::{Arc, OnceLock, Weak};
27use std::time::Duration;
28use tokio::sync::watch::Receiver;
29
30use anyhow::Result;
31use derive_getters::Dissolve;
32use figment::error;
33use std::collections::HashMap;
34use tokio::sync::Mutex;
35use tokio_util::sync::CancellationToken;
36
37type InstanceMap = HashMap<Endpoint, Weak<Receiver<Vec<Instance>>>>;
38
39#[derive(Clone)]
42pub struct DistributedRuntime {
43 runtime: Runtime,
45
46 nats_client: Option<transports::nats::Client>,
47 network_manager: Arc<NetworkManager>,
48 tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>,
49 system_status_server: Arc<OnceLock<Arc<system_status_server::SystemStatusServerInfo>>>,
50 request_plane: RequestPlaneMode,
51
52 discovery_client: Arc<dyn discovery::Discovery>,
54
55 discovery_metadata: Option<Arc<tokio::sync::RwLock<discovery::DiscoveryMetadata>>>,
58
59 component_registry: component::Registry,
65
66 instance_sources: Arc<tokio::sync::Mutex<InstanceMap>>,
67
68 system_health: Arc<parking_lot::Mutex<SystemHealth>>,
70
71 local_endpoint_registry: crate::local_endpoint_registry::LocalEndpointRegistry,
73
74 metrics_registry: MetricsRegistry,
76
77 engine_routes: crate::engine_routes::EngineRouteRegistry,
79}
80
81impl MetricsHierarchy for DistributedRuntime {
82 fn basename(&self) -> String {
83 "".to_string() }
85
86 fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
87 vec![] }
89
90 fn get_metrics_registry(&self) -> &MetricsRegistry {
91 &self.metrics_registry
92 }
93}
94
95impl std::fmt::Debug for DistributedRuntime {
96 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97 write!(f, "DistributedRuntime")
98 }
99}
100
101impl DistributedRuntime {
102 pub async fn new(runtime: Runtime, config: DistributedConfig) -> Result<Self> {
103 let (discovery_backend, nats_config, request_plane) = config.dissolve();
104
105 let nats_client = match nats_config {
106 Some(nc) => Some(nc.connect().await?),
107 None => None,
108 };
109
110 let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default();
112 let cancel_token = if config.system_server_enabled() {
115 Some(runtime.clone().child_token())
116 } else {
117 None
118 };
119 let starting_health_status = config.starting_health_status.clone();
120 let use_endpoint_health_status = config.use_endpoint_health_status.clone();
121 let health_endpoint_path = config.system_health_path.clone();
122 let live_endpoint_path = config.system_live_path.clone();
123 let system_health = Arc::new(parking_lot::Mutex::new(SystemHealth::new(
124 starting_health_status,
125 use_endpoint_health_status,
126 health_endpoint_path,
127 live_endpoint_path,
128 )));
129
130 let (discovery_client, discovery_metadata) = match discovery_backend {
132 DiscoveryBackend::Kubernetes => {
133 tracing::info!("Initializing Kubernetes discovery backend");
134 let metadata = Arc::new(tokio::sync::RwLock::new(
135 crate::discovery::DiscoveryMetadata::new(),
136 ));
137 let client = crate::discovery::KubeDiscoveryClient::new(
138 metadata.clone(),
139 runtime.primary_token(),
140 )
141 .await
142 .inspect_err(
143 |err| tracing::error!(%err, "Failed to initialize Kubernetes discovery client"),
144 )?;
145 (Arc::new(client) as Arc<dyn Discovery>, Some(metadata))
146 }
147 DiscoveryBackend::KvStore(kv_selector) => {
148 tracing::info!("Initializing KV store discovery backend: {}", kv_selector);
149 let runtime_clone = runtime.clone();
150 let store = match kv_selector {
151 kv::Selector::Etcd(etcd_config) => {
152 let etcd_client = etcd::Client::new(*etcd_config, runtime_clone).await.inspect_err(|err|
153 tracing::error!(%err, "Could not connect to etcd. Pass `--discovery-backend ..` to use a different backend or start etcd."))?;
154 kv::Manager::etcd(etcd_client)
155 }
156 kv::Selector::File(root) => kv::Manager::file(runtime.primary_token(), root),
157 kv::Selector::Memory => kv::Manager::memory(),
158 };
159 use crate::discovery::KVStoreDiscovery;
160 (
161 Arc::new(KVStoreDiscovery::new(store, runtime.primary_token()))
162 as Arc<dyn Discovery>,
163 None,
164 )
165 }
166 };
167
168 let component_registry = component::Registry::new();
169
170 let network_manager = NetworkManager::new(
172 runtime.child_token(),
173 nats_client.clone().map(|c| c.client().clone()),
174 component_registry.clone(),
175 request_plane,
176 );
177
178 let distributed_runtime = Self {
179 runtime,
180 network_manager: Arc::new(network_manager),
181 nats_client,
182 tcp_server: Arc::new(OnceCell::new()),
183 system_status_server: Arc::new(OnceLock::new()),
184 discovery_client,
185 discovery_metadata,
186 component_registry,
187 instance_sources: Arc::new(Mutex::new(HashMap::new())),
188 metrics_registry: crate::MetricsRegistry::new(),
189 system_health,
190 request_plane,
191 local_endpoint_registry: crate::local_endpoint_registry::LocalEndpointRegistry::new(),
192 engine_routes: crate::engine_routes::EngineRouteRegistry::new(),
193 };
194
195 distributed_runtime
197 .system_health
198 .lock()
199 .initialize_uptime_gauge(&distributed_runtime)?;
200
201 {
204 let system_health = distributed_runtime.system_health.clone();
205 distributed_runtime
206 .metrics_registry
207 .add_update_callback(std::sync::Arc::new(move || {
208 system_health.lock().update_uptime_gauge();
209 Ok(())
210 }));
211 }
212
213 if let Some(cancel_token) = cancel_token {
215 let host = config.system_host.clone();
217 let port = config.system_port as u16;
218
219 match crate::system_status_server::spawn_system_status_server(
221 &host,
222 port,
223 cancel_token,
224 Arc::new(distributed_runtime.clone()),
225 distributed_runtime.discovery_metadata.clone(),
226 )
227 .await
228 {
229 Ok((addr, handle)) => {
230 tracing::info!("System status server started successfully on {}", addr);
231
232 let system_status_server_info =
234 crate::system_status_server::SystemStatusServerInfo::new(
235 addr,
236 Some(handle),
237 );
238
239 distributed_runtime
241 .system_status_server
242 .set(Arc::new(system_status_server_info))
243 .expect("System status server info should only be set once");
244 }
245 Err(e) => {
246 tracing::error!("System status server startup failed: {}", e);
247 }
248 }
249 } else {
250 tracing::debug!(
252 "System status server HTTP endpoints disabled, but uptime metrics are being tracked"
253 );
254 }
255
256 if config.health_check_enabled {
258 let health_check_config = crate::health_check::HealthCheckConfig {
259 canary_wait_time: std::time::Duration::from_secs(config.canary_wait_time_secs),
260 request_timeout: std::time::Duration::from_secs(
261 config.health_check_request_timeout_secs,
262 ),
263 };
264
265 match crate::health_check::start_health_check_manager(
267 distributed_runtime.clone(),
268 Some(health_check_config),
269 )
270 .await
271 {
272 Ok(()) => tracing::info!(
273 "Health check manager started (canary_wait_time: {}s, request_timeout: {}s)",
274 config.canary_wait_time_secs,
275 config.health_check_request_timeout_secs
276 ),
277 Err(e) => tracing::error!("Health check manager failed to start: {}", e),
278 }
279 }
280
281 Ok(distributed_runtime)
282 }
283
284 pub async fn from_settings(runtime: Runtime) -> Result<Self> {
285 let config = DistributedConfig::from_settings();
286 Self::new(runtime, config).await
287 }
288
289 pub fn runtime(&self) -> &Runtime {
290 &self.runtime
291 }
292
293 pub fn primary_token(&self) -> CancellationToken {
294 self.runtime.primary_token()
295 }
296
297 pub fn component_registry(&self) -> &component::Registry {
300 &self.component_registry
301 }
302
303 pub fn system_health(&self) -> Arc<parking_lot::Mutex<SystemHealth>> {
305 self.system_health.clone()
306 }
307
308 pub fn local_endpoint_registry(
310 &self,
311 ) -> &crate::local_endpoint_registry::LocalEndpointRegistry {
312 &self.local_endpoint_registry
313 }
314
315 pub fn engine_routes(&self) -> &crate::engine_routes::EngineRouteRegistry {
317 &self.engine_routes
318 }
319
320 pub fn connection_id(&self) -> u64 {
321 self.discovery_client.instance_id()
322 }
323
324 pub fn shutdown(&self) {
325 self.runtime.shutdown();
326 self.discovery_client.shutdown();
327 }
328
329 pub fn namespace(&self, name: impl Into<String>) -> Result<Namespace> {
331 Namespace::new(self.clone(), name.into())
332 }
333
334 pub fn discovery(&self) -> Arc<dyn Discovery> {
336 self.discovery_client.clone()
337 }
338
339 pub async fn tcp_server(&self) -> Result<Arc<tcp::server::TcpStreamServer>> {
340 Ok(self
341 .tcp_server
342 .get_or_try_init(async move {
343 let options = tcp::server::ServerOptions::default();
344 let server = tcp::server::TcpStreamServer::new(options).await?;
345 Ok::<_, PipelineError>(server)
346 })
347 .await?
348 .clone())
349 }
350
351 pub fn network_manager(&self) -> Arc<NetworkManager> {
356 self.network_manager.clone()
357 }
358
359 pub async fn request_plane_server(
363 &self,
364 ) -> Result<Arc<dyn crate::pipeline::network::ingress::unified_server::RequestPlaneServer>>
365 {
366 self.network_manager().server().await
367 }
368
369 pub fn system_status_server_info(
371 &self,
372 ) -> Option<Arc<crate::system_status_server::SystemStatusServerInfo>> {
373 self.system_status_server.get().cloned()
374 }
375
376 pub fn request_plane(&self) -> RequestPlaneMode {
378 self.request_plane
379 }
380
381 pub fn child_token(&self) -> CancellationToken {
382 self.runtime.child_token()
383 }
384
385 pub(crate) fn graceful_shutdown_tracker(&self) -> Arc<GracefulShutdownTracker> {
386 self.runtime.graceful_shutdown_tracker()
387 }
388
389 pub fn instance_sources(&self) -> Arc<Mutex<InstanceMap>> {
390 self.instance_sources.clone()
391 }
392
393 pub async fn kv_router_nats_publish(
399 &self,
400 subject: String,
401 payload: bytes::Bytes,
402 ) -> anyhow::Result<()> {
403 let Some(nats_client) = self.nats_client.as_ref() else {
404 tracing::trace!("Skipping NATS publish (NATS not configured): {}", subject);
406 return Ok(());
407 };
408 Ok(nats_client.client().publish(subject, payload).await?)
409 }
410
411 pub(crate) async fn kv_router_nats_subscribe(
414 &self,
415 subject: String,
416 ) -> Result<async_nats::Subscriber> {
417 let Some(nats_client) = self.nats_client.as_ref() else {
418 anyhow::bail!("KV router's EventSubscriber requires NATS");
419 };
420 Ok(nats_client.client().subscribe(subject).await?)
421 }
422
423 pub async fn kv_router_nats_request(
427 &self,
428 subject: String,
429 payload: bytes::Bytes,
430 timeout: std::time::Duration,
431 ) -> anyhow::Result<async_nats::Message> {
432 let Some(nats_client) = self.nats_client.as_ref() else {
433 anyhow::bail!("KV router's request requires NATS");
434 };
435 let response =
436 tokio::time::timeout(timeout, nats_client.client().request(subject, payload))
437 .await
438 .map_err(|_| anyhow::anyhow!("Request timed out after {:?}", timeout))??;
439 Ok(response)
440 }
441
442 pub fn register_nats_service(
449 &self,
450 component: Component,
451 ) -> tokio::sync::mpsc::Receiver<Result<(), String>> {
452 let (tx, rx) = tokio::sync::mpsc::channel::<Result<(), String>>(1);
454
455 let drt = self.clone();
456 self.runtime().secondary().spawn(async move {
457 let service_name = component.service_name();
458
459 if drt
461 .component_registry()
462 .inner
463 .lock()
464 .await
465 .services
466 .contains_key(&service_name)
467 {
468 tracing::trace!("Service {service_name} already exists");
471 let _ = tx.send(Ok(())).await;
473 return;
474 }
475
476 let Some(nats_client) = drt.nats_client.as_ref() else {
477 tracing::error!("Cannot create NATS service without NATS.");
478 let _ = tx
479 .send(Err("Cannot create NATS service without NATS".to_string()))
480 .await;
481 return;
482 };
483 let description = None;
484 let nats_service = match crate::component::service::build_nats_service(
485 nats_client,
486 &component,
487 description,
488 )
489 .await
490 {
491 Ok(service) => service,
492 Err(err) => {
493 tracing::error!(error = %err, component = service_name, "Failed to build NATS service");
494 let _ = tx.send(Err(format!("Failed to build NATS service: {err}"))).await;
495 return;
496 }
497 };
498
499 let mut guard = drt.component_registry().inner.lock().await;
500 if !guard.services.contains_key(&service_name) {
501 guard.services.insert(service_name.clone(), nats_service);
503
504 tracing::info!("Added NATS service {service_name}");
505
506 drop(guard);
507 } else {
508 drop(guard);
509 let _ = nats_service.stop().await;
510 }
514
515 let _ = tx.send(Ok(())).await;
517 });
518
519 rx
520 }
521}
522
523#[derive(Clone, Debug)]
525pub enum DiscoveryBackend {
526 Kubernetes,
528 KvStore(kv::Selector),
530}
531
532#[derive(Dissolve)]
533pub struct DistributedConfig {
534 pub discovery_backend: DiscoveryBackend,
535 pub nats_config: Option<nats::ClientOptions>,
536 pub request_plane: RequestPlaneMode,
537}
538
539impl DistributedConfig {
540 pub fn from_settings() -> DistributedConfig {
541 let request_plane = RequestPlaneMode::from_env();
542 let nats_enabled = request_plane.is_nats()
550 || std::env::var(crate::config::environment_names::nats::NATS_SERVER).is_ok();
551
552 let backend_str =
555 std::env::var("DYN_DISCOVERY_BACKEND").unwrap_or_else(|_| "etcd".to_string());
556
557 let discovery_backend = match backend_str.as_str() {
558 "kubernetes" => {
559 tracing::info!("Using Kubernetes discovery backend");
560 DiscoveryBackend::Kubernetes
561 }
562 other => {
563 let selector: kv::Selector = other.parse().unwrap_or_else(|_| {
564 panic!(
565 "Unknown DYN_DISCOVERY_BACKEND value: '{other}'. \
566 Valid options: kubernetes, etcd, file, mem"
567 )
568 });
569 DiscoveryBackend::KvStore(selector)
570 }
571 };
572
573 DistributedConfig {
574 discovery_backend,
575 nats_config: if nats_enabled {
576 Some(nats::ClientOptions::default())
577 } else {
578 None
579 },
580 request_plane,
581 }
582 }
583
584 pub fn for_cli() -> DistributedConfig {
585 let etcd_config = etcd::ClientOptions {
586 attach_lease: false,
587 ..Default::default()
588 };
589 let request_plane = RequestPlaneMode::from_env();
590 let nats_enabled = request_plane.is_nats()
591 || std::env::var(crate::config::environment_names::nats::NATS_SERVER).is_ok();
592 DistributedConfig {
593 discovery_backend: DiscoveryBackend::KvStore(kv::Selector::Etcd(Box::new(etcd_config))),
594 nats_config: if nats_enabled {
595 Some(nats::ClientOptions::default())
596 } else {
597 None
598 },
599 request_plane,
600 }
601 }
602
603 pub fn process_local() -> DistributedConfig {
606 DistributedConfig {
607 discovery_backend: DiscoveryBackend::KvStore(kv::Selector::Memory),
608 nats_config: None,
609 request_plane: RequestPlaneMode::Tcp,
612 }
613 }
614}
615
616#[derive(Debug, Clone, Copy, PartialEq, Eq)]
623pub enum RequestPlaneMode {
624 Nats,
626 Http,
628 Tcp,
630}
631
632impl Default for RequestPlaneMode {
633 fn default() -> Self {
634 Self::Tcp
635 }
636}
637
638impl fmt::Display for RequestPlaneMode {
639 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
640 match self {
641 Self::Nats => write!(f, "nats"),
642 Self::Http => write!(f, "http"),
643 Self::Tcp => write!(f, "tcp"),
644 }
645 }
646}
647
648impl std::str::FromStr for RequestPlaneMode {
649 type Err = anyhow::Error;
650
651 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
652 match s.to_lowercase().as_str() {
653 "nats" => Ok(Self::Nats),
654 "http" => Ok(Self::Http),
655 "tcp" => Ok(Self::Tcp),
656 _ => Err(anyhow::anyhow!(
657 "Invalid request plane mode: '{}'. Valid options are: 'nats', 'http', 'tcp'",
658 s
659 )),
660 }
661 }
662}
663
664impl RequestPlaneMode {
665 fn from_env() -> Self {
668 std::env::var("DYN_REQUEST_PLANE")
669 .ok()
670 .and_then(|s| s.parse().ok())
671 .unwrap_or_default()
672 }
673
674 pub fn is_nats(&self) -> bool {
675 matches!(self, RequestPlaneMode::Nats)
676 }
677}
678
679pub mod distributed_test_utils {
680 #[cfg(feature = "integration")]
686 pub async fn create_test_drt_async() -> super::DistributedRuntime {
687 use crate::transports::nats;
688
689 let rt = crate::Runtime::from_current().unwrap();
690 let config = super::DistributedConfig {
691 discovery_backend: super::DiscoveryBackend::KvStore(
692 crate::storage::kv::Selector::Memory,
693 ),
694 nats_config: Some(nats::ClientOptions::default()),
695 request_plane: crate::distributed::RequestPlaneMode::default(),
696 };
697 super::DistributedRuntime::new(rt, config).await.unwrap()
698 }
699
700 pub async fn create_test_shared_drt_async(
707 store_path: &std::path::Path,
708 ) -> super::DistributedRuntime {
709 use crate::transports::nats;
710
711 let rt = crate::Runtime::from_current().unwrap();
712 let config = super::DistributedConfig {
713 discovery_backend: super::DiscoveryBackend::KvStore(
714 crate::storage::kv::Selector::File(store_path.to_path_buf()),
715 ),
716 nats_config: Some(nats::ClientOptions::default()),
717 request_plane: crate::distributed::RequestPlaneMode::default(),
718 };
719 super::DistributedRuntime::new(rt, config).await.unwrap()
720 }
721}
722
723#[cfg(all(test, feature = "integration"))]
724mod tests {
725 use super::RequestPlaneMode;
726 use super::distributed_test_utils::create_test_drt_async;
727
728 #[tokio::test]
729 async fn test_drt_uptime_after_delay_system_disabled() {
730 use crate::config::environment_names::runtime::system as env_system;
731 temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
733 let drt = create_test_drt_async().await;
735
736 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
738
739 let uptime = drt.system_health.lock().uptime();
741 assert!(
742 uptime >= std::time::Duration::from_millis(50),
743 "Expected uptime to be at least 50ms, but got {:?}",
744 uptime
745 );
746
747 println!(
748 "✓ DRT uptime test passed (system disabled): uptime = {:?}",
749 uptime
750 );
751 })
752 .await;
753 }
754
755 #[tokio::test]
756 async fn test_drt_uptime_after_delay_system_enabled() {
757 use crate::config::environment_names::runtime::system as env_system;
758 temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, Some("8081"))], async {
760 let drt = create_test_drt_async().await;
762
763 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
765
766 let uptime = drt.system_health.lock().uptime();
768 assert!(
769 uptime >= std::time::Duration::from_millis(50),
770 "Expected uptime to be at least 50ms, but got {:?}",
771 uptime
772 );
773
774 println!(
775 "✓ DRT uptime test passed (system enabled): uptime = {:?}",
776 uptime
777 );
778 })
779 .await;
780 }
781
782 #[test]
783 fn test_request_plane_mode_from_str() {
784 assert_eq!(
785 "nats".parse::<RequestPlaneMode>().unwrap(),
786 RequestPlaneMode::Nats
787 );
788 assert_eq!(
789 "http".parse::<RequestPlaneMode>().unwrap(),
790 RequestPlaneMode::Http
791 );
792 assert_eq!(
793 "tcp".parse::<RequestPlaneMode>().unwrap(),
794 RequestPlaneMode::Tcp
795 );
796 assert_eq!(
797 "NATS".parse::<RequestPlaneMode>().unwrap(),
798 RequestPlaneMode::Nats
799 );
800 assert_eq!(
801 "HTTP".parse::<RequestPlaneMode>().unwrap(),
802 RequestPlaneMode::Http
803 );
804 assert_eq!(
805 "TCP".parse::<RequestPlaneMode>().unwrap(),
806 RequestPlaneMode::Tcp
807 );
808 assert!("invalid".parse::<RequestPlaneMode>().is_err());
809 }
810
811 #[test]
812 fn test_request_plane_mode_display() {
813 assert_eq!(RequestPlaneMode::Nats.to_string(), "nats");
814 assert_eq!(RequestPlaneMode::Http.to_string(), "http");
815 assert_eq!(RequestPlaneMode::Tcp.to_string(), "tcp");
816 }
817}