1use crate::component::Component;
5use crate::pipeline::PipelineError;
6use crate::storage::key_value_store::{
7 EtcdStore, KeyValueStore, KeyValueStoreEnum, KeyValueStoreManager, KeyValueStoreSelect,
8 MemoryStore,
9};
10use crate::transports::nats::DRTNatsClientPrometheusMetrics;
11use crate::{
12 component::{self, ComponentBuilder, Endpoint, InstanceSource, Namespace},
13 discovery::Discovery,
14 metrics::PrometheusUpdateCallback,
15 metrics::{MetricsHierarchy, MetricsRegistry},
16 service::ServiceClient,
17 transports::{etcd, nats, tcp},
18};
19use crate::{discovery, system_status_server, transports};
20
21use super::utils::GracefulShutdownTracker;
22use crate::SystemHealth;
23use crate::runtime::Runtime;
24
25use async_once_cell::OnceCell;
26use std::sync::{Arc, OnceLock, Weak};
27
28use anyhow::Result;
29use derive_getters::Dissolve;
30use figment::error;
31use std::collections::HashMap;
32use tokio::sync::Mutex;
33use tokio_util::sync::CancellationToken;
34
35#[derive(Clone)]
38pub struct DistributedRuntime {
39 runtime: Runtime,
41
42 etcd_client: Option<transports::etcd::Client>,
44 nats_client: Option<transports::nats::Client>,
45 store: KeyValueStoreManager,
46 tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>,
47 network_manager: Arc<OnceCell<Arc<crate::pipeline::network::manager::NetworkManager>>>,
48 system_status_server: Arc<OnceLock<Arc<system_status_server::SystemStatusServerInfo>>>,
49
50 discovery_client: Arc<dyn discovery::Discovery>,
52
53 discovery_metadata: Option<Arc<tokio::sync::RwLock<discovery::DiscoveryMetadata>>>,
56
57 component_registry: component::Registry,
63
64 is_static: bool,
67
68 instance_sources: Arc<tokio::sync::Mutex<HashMap<Endpoint, Weak<InstanceSource>>>>,
69
70 system_health: Arc<parking_lot::Mutex<SystemHealth>>,
72
73 metrics_registry: MetricsRegistry,
75}
76
77impl MetricsHierarchy for DistributedRuntime {
78 fn basename(&self) -> String {
79 "".to_string() }
81
82 fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
83 vec![] }
85
86 fn get_metrics_registry(&self) -> &MetricsRegistry {
87 &self.metrics_registry
88 }
89}
90
91impl std::fmt::Debug for DistributedRuntime {
92 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93 write!(f, "DistributedRuntime")
94 }
95}
96
97impl DistributedRuntime {
98 pub async fn new(runtime: Runtime, config: DistributedConfig) -> Result<Self> {
99 let (selected_kv_store, nats_config, is_static) = config.dissolve();
100
101 let runtime_clone = runtime.clone();
102
103 let (etcd_client, store) = match (is_static, selected_kv_store) {
104 (false, KeyValueStoreSelect::Etcd(etcd_config)) => {
105 let etcd_client = etcd::Client::new(*etcd_config, runtime_clone).await.inspect_err(|err|
106 tracing::error!(%err, "Could not connect to etcd. Pass `--store-kv ..` to use a different backend or start etcd."))?;
109 let store = KeyValueStoreManager::etcd(etcd_client.clone());
110 (Some(etcd_client), store)
111 }
112 (false, KeyValueStoreSelect::File(root)) => (None, KeyValueStoreManager::file(root)),
113 (true, _) | (false, KeyValueStoreSelect::Memory) => {
114 (None, KeyValueStoreManager::memory())
115 }
116 };
117
118 let nats_client = Some(nats_config.clone().connect().await?);
119
120 let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default();
122 let cancel_token = if config.system_server_enabled() {
125 Some(runtime.clone().child_token())
126 } else {
127 None
128 };
129 let starting_health_status = config.starting_health_status.clone();
130 let use_endpoint_health_status = config.use_endpoint_health_status.clone();
131 let health_endpoint_path = config.system_health_path.clone();
132 let live_endpoint_path = config.system_live_path.clone();
133 let system_health = Arc::new(parking_lot::Mutex::new(SystemHealth::new(
134 starting_health_status,
135 use_endpoint_health_status,
136 health_endpoint_path,
137 live_endpoint_path,
138 )));
139
140 let nats_client_for_metrics = nats_client.clone();
141
142 let discovery_backend =
144 std::env::var("DYN_DISCOVERY_BACKEND").unwrap_or_else(|_| "kv_store".to_string());
145
146 let (discovery_client, discovery_metadata) = match discovery_backend.as_str() {
147 "kubernetes" => {
148 tracing::info!("Initializing Kubernetes discovery backend");
149 let metadata = Arc::new(tokio::sync::RwLock::new(
150 crate::discovery::DiscoveryMetadata::new(),
151 ));
152 let client = crate::discovery::KubeDiscoveryClient::new(
153 metadata.clone(),
154 runtime.primary_token(),
155 )
156 .await
157 .inspect_err(
158 |err| tracing::error!(%err, "Failed to initialize Kubernetes discovery client"),
159 )?;
160 (Arc::new(client) as Arc<dyn Discovery>, Some(metadata))
161 }
162 _ => {
163 tracing::info!("Initializing KV store discovery backend");
164 use crate::discovery::KVStoreDiscovery;
165 (
166 Arc::new(KVStoreDiscovery::new(
167 store.clone(),
168 runtime.primary_token(),
169 )) as Arc<dyn Discovery>,
170 None,
171 )
172 }
173 };
174
175 let distributed_runtime = Self {
176 runtime,
177 etcd_client,
178 store,
179 nats_client,
180 tcp_server: Arc::new(OnceCell::new()),
181 network_manager: Arc::new(OnceCell::new()),
182 system_status_server: Arc::new(OnceLock::new()),
183 discovery_client,
184 discovery_metadata,
185 component_registry: component::Registry::new_with_static(is_static),
186 is_static,
187 instance_sources: Arc::new(Mutex::new(HashMap::new())),
188 metrics_registry: crate::MetricsRegistry::new(),
189 system_health,
190 };
191
192 if let Some(nats_client_for_metrics) = nats_client_for_metrics {
193 let nats_client_metrics = DRTNatsClientPrometheusMetrics::new(
194 &distributed_runtime,
195 nats_client_for_metrics.client().clone(),
196 )?;
197 let nats_client_callback = Arc::new({
199 let nats_client_clone = nats_client_metrics.clone();
200 move || {
201 nats_client_clone.set_from_client_stats();
202 Ok(())
203 }
204 });
205 distributed_runtime
206 .metrics_registry
207 .add_update_callback(nats_client_callback);
208 }
209
210 distributed_runtime
212 .system_health
213 .lock()
214 .initialize_uptime_gauge(&distributed_runtime)?;
215
216 if let Some(cancel_token) = cancel_token {
218 let host = config.system_host.clone();
220 let port = config.system_port as u16;
221
222 match crate::system_status_server::spawn_system_status_server(
224 &host,
225 port,
226 cancel_token,
227 Arc::new(distributed_runtime.clone()),
228 distributed_runtime.discovery_metadata.clone(),
229 )
230 .await
231 {
232 Ok((addr, handle)) => {
233 tracing::info!("System status server started successfully on {}", addr);
234
235 let system_status_server_info =
237 crate::system_status_server::SystemStatusServerInfo::new(
238 addr,
239 Some(handle),
240 );
241
242 distributed_runtime
244 .system_status_server
245 .set(Arc::new(system_status_server_info))
246 .expect("System status server info should only be set once");
247 }
248 Err(e) => {
249 tracing::error!("System status server startup failed: {}", e);
250 }
251 }
252 } else {
253 tracing::debug!(
255 "System status server HTTP endpoints disabled, but uptime metrics are being tracked"
256 );
257 }
258
259 if config.health_check_enabled {
261 let health_check_config = crate::health_check::HealthCheckConfig {
262 canary_wait_time: std::time::Duration::from_secs(config.canary_wait_time_secs),
263 request_timeout: std::time::Duration::from_secs(
264 config.health_check_request_timeout_secs,
265 ),
266 };
267
268 match crate::health_check::start_health_check_manager(
270 distributed_runtime.clone(),
271 Some(health_check_config),
272 )
273 .await
274 {
275 Ok(()) => tracing::info!(
276 "Health check manager started (canary_wait_time: {}s, request_timeout: {}s)",
277 config.canary_wait_time_secs,
278 config.health_check_request_timeout_secs
279 ),
280 Err(e) => tracing::error!("Health check manager failed to start: {}", e),
281 }
282 }
283
284 Ok(distributed_runtime)
285 }
286
287 pub async fn from_settings(runtime: Runtime) -> Result<Self> {
288 let config = DistributedConfig::from_settings(false);
289 Self::new(runtime, config).await
290 }
291
292 pub async fn from_settings_without_discovery(runtime: Runtime) -> Result<Self> {
294 let config = DistributedConfig::from_settings(true);
295 Self::new(runtime, config).await
296 }
297
298 pub fn runtime(&self) -> &Runtime {
299 &self.runtime
300 }
301
302 pub fn primary_token(&self) -> CancellationToken {
303 self.runtime.primary_token()
304 }
305
306 pub fn component_registry(&self) -> &component::Registry {
309 &self.component_registry
310 }
311
312 pub fn system_health(&self) -> Arc<parking_lot::Mutex<SystemHealth>> {
314 self.system_health.clone()
315 }
316
317 pub fn connection_id(&self) -> u64 {
318 self.discovery_client.instance_id()
319 }
320
321 pub fn shutdown(&self) {
322 self.runtime.shutdown();
323 self.store.shutdown();
324 }
325
326 pub fn namespace(&self, name: impl Into<String>) -> Result<Namespace> {
328 Namespace::new(self.clone(), name.into(), self.is_static)
329 }
330
331 pub fn discovery(&self) -> Arc<dyn Discovery> {
333 self.discovery_client.clone()
334 }
335
336 pub(crate) fn service_client(&self) -> Option<ServiceClient> {
337 self.nats_client().map(|nc| ServiceClient::new(nc.clone()))
338 }
339
340 pub async fn tcp_server(&self) -> Result<Arc<tcp::server::TcpStreamServer>> {
341 Ok(self
342 .tcp_server
343 .get_or_try_init(async move {
344 let options = tcp::server::ServerOptions::default();
345 let server = tcp::server::TcpStreamServer::new(options).await?;
346 Ok::<_, PipelineError>(server)
347 })
348 .await?
349 .clone())
350 }
351
352 pub async fn network_manager(
357 &self,
358 ) -> Result<Arc<crate::pipeline::network::manager::NetworkManager>> {
359 use crate::pipeline::network::manager::NetworkManager;
360
361 let manager = self
362 .network_manager
363 .get_or_try_init(async {
364 let nats_client = self.nats_client().map(|c| c.client().clone());
366
367 anyhow::Ok(NetworkManager::new(
369 self.child_token(),
370 nats_client,
371 self.component_registry.clone(),
372 ))
373 })
374 .await?;
375
376 Ok(manager.clone())
377 }
378
379 pub async fn request_plane_server(
383 &self,
384 ) -> Result<Arc<dyn crate::pipeline::network::ingress::unified_server::RequestPlaneServer>>
385 {
386 let manager = self.network_manager().await?;
387 manager.server().await
388 }
389
390 #[deprecated(note = "Use request_plane_server() or network_manager().server() instead")]
392 pub async fn http_server(
393 &self,
394 ) -> Result<Arc<crate::pipeline::network::ingress::http_endpoint::SharedHttpServer>> {
395 let _server = self.request_plane_server().await?;
397 anyhow::bail!(
400 "http_server() is deprecated. Use request_plane_server() instead, which returns a trait object that works with all transport types."
401 )
402 }
403
404 #[deprecated(note = "Use request_plane_server() or network_manager().server() instead")]
406 pub async fn shared_tcp_server(
407 &self,
408 ) -> Result<Arc<crate::pipeline::network::ingress::shared_tcp_endpoint::SharedTcpServer>> {
409 let _server = self.request_plane_server().await?;
411 anyhow::bail!(
414 "shared_tcp_server() is deprecated. Use request_plane_server() instead, which returns a trait object that works with all transport types."
415 )
416 }
417
418 pub fn nats_client(&self) -> Option<&nats::Client> {
419 self.nats_client.as_ref()
420 }
421
422 pub fn system_status_server_info(
424 &self,
425 ) -> Option<Arc<crate::system_status_server::SystemStatusServerInfo>> {
426 self.system_status_server.get().cloned()
427 }
428
429 pub fn etcd_client(&self) -> Option<etcd::Client> {
434 self.etcd_client.clone()
435 }
436
437 pub fn store(&self) -> &KeyValueStoreManager {
440 &self.store
441 }
442
443 pub fn child_token(&self) -> CancellationToken {
444 self.runtime.child_token()
445 }
446
447 pub(crate) fn graceful_shutdown_tracker(&self) -> Arc<GracefulShutdownTracker> {
448 self.runtime.graceful_shutdown_tracker()
449 }
450
451 pub fn instance_sources(&self) -> Arc<Mutex<HashMap<Endpoint, Weak<InstanceSource>>>> {
452 self.instance_sources.clone()
453 }
454}
455
456#[derive(Dissolve)]
457pub struct DistributedConfig {
458 pub store_backend: KeyValueStoreSelect,
459 pub nats_config: nats::ClientOptions,
460 pub is_static: bool,
461}
462
463impl DistributedConfig {
464 pub fn from_settings(is_static: bool) -> DistributedConfig {
465 DistributedConfig {
466 store_backend: KeyValueStoreSelect::Etcd(Box::default()),
467 nats_config: nats::ClientOptions::default(),
468 is_static,
469 }
470 }
471
472 pub fn for_cli() -> DistributedConfig {
473 let etcd_config = etcd::ClientOptions {
474 attach_lease: false,
475 ..Default::default()
476 };
477 DistributedConfig {
478 store_backend: KeyValueStoreSelect::Etcd(Box::new(etcd_config)),
479 nats_config: nats::ClientOptions::default(),
480 is_static: false,
481 }
482 }
483}
484
485pub mod distributed_test_utils {
486 #[cfg(feature = "integration")]
493 pub async fn create_test_drt_async() -> crate::DistributedRuntime {
494 let rt = crate::Runtime::from_current().unwrap();
495 crate::DistributedRuntime::from_settings_without_discovery(rt)
496 .await
497 .unwrap()
498 }
499}
500
501#[cfg(all(test, feature = "integration"))]
502mod tests {
503 use super::distributed_test_utils::create_test_drt_async;
504
505 #[tokio::test]
506 async fn test_drt_uptime_after_delay_system_disabled() {
507 temp_env::async_with_vars([("DYN_SYSTEM_PORT", None::<&str>)], async {
509 let drt = create_test_drt_async().await;
511
512 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
514
515 let uptime = drt.system_health.lock().uptime();
517 assert!(
518 uptime >= std::time::Duration::from_millis(50),
519 "Expected uptime to be at least 50ms, but got {:?}",
520 uptime
521 );
522
523 println!(
524 "✓ DRT uptime test passed (system disabled): uptime = {:?}",
525 uptime
526 );
527 })
528 .await;
529 }
530
531 #[tokio::test]
532 async fn test_drt_uptime_after_delay_system_enabled() {
533 temp_env::async_with_vars([("DYN_SYSTEM_PORT", Some("8081"))], async {
535 let drt = create_test_drt_async().await;
537
538 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
540
541 let uptime = drt.system_health.lock().uptime();
543 assert!(
544 uptime >= std::time::Duration::from_millis(50),
545 "Expected uptime to be at least 50ms, but got {:?}",
546 uptime
547 );
548
549 println!(
550 "✓ DRT uptime test passed (system enabled): uptime = {:?}",
551 uptime
552 );
553 })
554 .await;
555 }
556}