1pub use crate::component::Component;
5use crate::storage::key_value_store::{
6 EtcdStore, KeyValueStore, KeyValueStoreEnum, KeyValueStoreManager, MemoryStore,
7};
8use crate::transports::nats::DRTNatsClientPrometheusMetrics;
9use crate::{
10 ErrorContext,
11 component::{self, ComponentBuilder, Endpoint, InstanceSource, Namespace},
12 discovery::DiscoveryClient,
13 metrics::PrometheusUpdateCallback,
14 metrics::{MetricsHierarchy, MetricsRegistry},
15 service::ServiceClient,
16 transports::{etcd, nats, tcp},
17};
18
19use super::utils::GracefulShutdownTracker;
20use super::{Arc, DistributedRuntime, OK, OnceCell, Result, Runtime, SystemHealth, Weak, error};
21use std::sync::OnceLock;
22
23use derive_getters::Dissolve;
24use figment::error;
25use std::collections::HashMap;
26use tokio::sync::Mutex;
27use tokio_util::sync::CancellationToken;
28
29impl MetricsHierarchy for DistributedRuntime {
30 fn basename(&self) -> String {
31 "".to_string() }
33
34 fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
35 vec![] }
37
38 fn get_metrics_registry(&self) -> &MetricsRegistry {
39 &self.metrics_registry
40 }
41}
42
43impl std::fmt::Debug for DistributedRuntime {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 write!(f, "DistributedRuntime")
46 }
47}
48
49impl DistributedRuntime {
50 pub async fn new(runtime: Runtime, config: DistributedConfig) -> Result<Self> {
51 let (etcd_config, nats_config, is_static) = config.dissolve();
52
53 let runtime_clone = runtime.clone();
54
55 let (etcd_client, store) = if is_static {
56 (None, KeyValueStoreManager::memory())
57 } else {
58 let etcd_client = etcd::Client::new(etcd_config.clone(), runtime_clone).await?;
59 let store = KeyValueStoreManager::etcd(etcd_client.clone());
60 (Some(etcd_client), store)
61 };
62
63 let nats_client = Some(nats_config.clone().connect().await?);
64
65 let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default();
67 let cancel_token = if config.system_server_enabled() {
70 Some(runtime.clone().child_token())
71 } else {
72 None
73 };
74 let starting_health_status = config.starting_health_status.clone();
75 let use_endpoint_health_status = config.use_endpoint_health_status.clone();
76 let health_endpoint_path = config.system_health_path.clone();
77 let live_endpoint_path = config.system_live_path.clone();
78 let system_health = Arc::new(parking_lot::Mutex::new(SystemHealth::new(
79 starting_health_status,
80 use_endpoint_health_status,
81 health_endpoint_path,
82 live_endpoint_path,
83 )));
84
85 let nats_client_for_metrics = nats_client.clone();
86
87 let distributed_runtime = Self {
88 runtime,
89 etcd_client,
90 store,
91 nats_client,
92 tcp_server: Arc::new(OnceCell::new()),
93 system_status_server: Arc::new(OnceLock::new()),
94 component_registry: component::Registry::new(),
95 is_static,
96 instance_sources: Arc::new(Mutex::new(HashMap::new())),
97 metrics_registry: crate::MetricsRegistry::new(),
98 system_health,
99 };
100
101 if let Some(nats_client_for_metrics) = nats_client_for_metrics {
102 let nats_client_metrics = DRTNatsClientPrometheusMetrics::new(
103 &distributed_runtime,
104 nats_client_for_metrics.client().clone(),
105 )?;
106 let nats_client_callback = Arc::new({
108 let nats_client_clone = nats_client_metrics.clone();
109 move || {
110 nats_client_clone.set_from_client_stats();
111 Ok(())
112 }
113 });
114 distributed_runtime
115 .metrics_registry
116 .add_update_callback(nats_client_callback);
117 }
118
119 distributed_runtime
121 .system_health
122 .lock()
123 .initialize_uptime_gauge(&distributed_runtime)?;
124
125 if let Some(cancel_token) = cancel_token {
127 let host = config.system_host.clone();
129 let port = config.system_port;
130
131 match crate::system_status_server::spawn_system_status_server(
133 &host,
134 port,
135 cancel_token,
136 Arc::new(distributed_runtime.clone()),
137 )
138 .await
139 {
140 Ok((addr, handle)) => {
141 tracing::info!("System status server started successfully on {}", addr);
142
143 let system_status_server_info =
145 crate::system_status_server::SystemStatusServerInfo::new(
146 addr,
147 Some(handle),
148 );
149
150 distributed_runtime
152 .system_status_server
153 .set(Arc::new(system_status_server_info))
154 .expect("System status server info should only be set once");
155 }
156 Err(e) => {
157 tracing::error!("System status server startup failed: {}", e);
158 }
159 }
160 } else {
161 tracing::debug!(
163 "System status server HTTP endpoints disabled, but uptime metrics are being tracked"
164 );
165 }
166
167 if config.health_check_enabled {
169 let health_check_config = crate::health_check::HealthCheckConfig {
170 canary_wait_time: std::time::Duration::from_secs(config.canary_wait_time_secs),
171 request_timeout: std::time::Duration::from_secs(
172 config.health_check_request_timeout_secs,
173 ),
174 };
175
176 match crate::health_check::start_health_check_manager(
178 distributed_runtime.clone(),
179 Some(health_check_config),
180 )
181 .await
182 {
183 Ok(()) => tracing::info!(
184 "Health check manager started (canary_wait_time: {}s, request_timeout: {}s)",
185 config.canary_wait_time_secs,
186 config.health_check_request_timeout_secs
187 ),
188 Err(e) => tracing::error!("Health check manager failed to start: {}", e),
189 }
190 }
191
192 Ok(distributed_runtime)
193 }
194
195 pub async fn from_settings(runtime: Runtime) -> Result<Self> {
196 let config = DistributedConfig::from_settings(false);
197 Self::new(runtime, config).await
198 }
199
200 pub async fn from_settings_without_discovery(runtime: Runtime) -> Result<Self> {
202 let config = DistributedConfig::from_settings(true);
203 Self::new(runtime, config).await
204 }
205
206 pub fn runtime(&self) -> &Runtime {
207 &self.runtime
208 }
209
210 pub fn primary_token(&self) -> CancellationToken {
211 self.runtime.primary_token()
212 }
213
214 pub fn primary_lease(&self) -> Option<etcd::Lease> {
217 self.etcd_client.as_ref().map(|c| c.primary_lease())
218 }
219
220 pub fn shutdown(&self) {
221 self.runtime.shutdown();
222 }
223
224 pub fn namespace(&self, name: impl Into<String>) -> Result<Namespace> {
226 Namespace::new(self.clone(), name.into(), self.is_static)
227 }
228
229 pub(crate) fn discovery_client(&self, namespace: impl Into<String>) -> DiscoveryClient {
242 DiscoveryClient::new(
243 namespace.into(),
244 self.etcd_client
245 .clone()
246 .expect("Attempt to get discovery_client on static DistributedRuntime"),
247 )
248 }
249
250 pub(crate) fn service_client(&self) -> Option<ServiceClient> {
251 self.nats_client().map(|nc| ServiceClient::new(nc.clone()))
252 }
253
254 pub async fn tcp_server(&self) -> Result<Arc<tcp::server::TcpStreamServer>> {
255 Ok(self
256 .tcp_server
257 .get_or_try_init(async move {
258 let options = tcp::server::ServerOptions::default();
259 let server = tcp::server::TcpStreamServer::new(options).await?;
260 OK(server)
261 })
262 .await?
263 .clone())
264 }
265
266 pub fn nats_client(&self) -> Option<&nats::Client> {
267 self.nats_client.as_ref()
268 }
269
270 pub fn system_status_server_info(
272 &self,
273 ) -> Option<Arc<crate::system_status_server::SystemStatusServerInfo>> {
274 self.system_status_server.get().cloned()
275 }
276
277 pub fn etcd_client(&self) -> Option<etcd::Client> {
279 self.etcd_client.clone()
280 }
281
282 pub fn deprecated_etcd_client(&self) -> Option<etcd::Client> {
285 self.etcd_client.clone()
286 }
287
288 pub fn store(&self) -> &KeyValueStoreManager {
291 &self.store
292 }
293
294 pub fn child_token(&self) -> CancellationToken {
295 self.runtime.child_token()
296 }
297
298 pub(crate) fn graceful_shutdown_tracker(&self) -> Arc<GracefulShutdownTracker> {
299 self.runtime.graceful_shutdown_tracker()
300 }
301
302 pub fn instance_sources(&self) -> Arc<Mutex<HashMap<Endpoint, Weak<InstanceSource>>>> {
303 self.instance_sources.clone()
304 }
305}
306
307#[derive(Dissolve)]
308pub struct DistributedConfig {
309 pub etcd_config: etcd::ClientOptions,
310 pub nats_config: nats::ClientOptions,
311 pub is_static: bool,
312}
313
314impl DistributedConfig {
315 pub fn from_settings(is_static: bool) -> DistributedConfig {
316 DistributedConfig {
317 etcd_config: etcd::ClientOptions::default(),
318 nats_config: nats::ClientOptions::default(),
319 is_static,
320 }
321 }
322
323 pub fn for_cli() -> DistributedConfig {
324 let mut config = DistributedConfig {
325 etcd_config: etcd::ClientOptions::default(),
326 nats_config: nats::ClientOptions::default(),
327 is_static: false,
328 };
329
330 config.etcd_config.attach_lease = false;
331
332 config
333 }
334}
335
336pub mod distributed_test_utils {
337 #[cfg(feature = "integration")]
344 pub async fn create_test_drt_async() -> crate::DistributedRuntime {
345 let rt = crate::Runtime::from_current().unwrap();
346 crate::DistributedRuntime::from_settings_without_discovery(rt)
347 .await
348 .unwrap()
349 }
350}
351
352#[cfg(all(test, feature = "integration"))]
353mod tests {
354 use super::distributed_test_utils::create_test_drt_async;
355
356 #[tokio::test]
357 async fn test_drt_uptime_after_delay_system_disabled() {
358 temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async {
360 let drt = create_test_drt_async().await;
362
363 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
365
366 let uptime = drt.system_health.lock().uptime();
368 assert!(
369 uptime >= std::time::Duration::from_millis(50),
370 "Expected uptime to be at least 50ms, but got {:?}",
371 uptime
372 );
373
374 println!(
375 "✓ DRT uptime test passed (system disabled): uptime = {:?}",
376 uptime
377 );
378 })
379 .await;
380 }
381
382 #[tokio::test]
383 async fn test_drt_uptime_after_delay_system_enabled() {
384 temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("true"))], async {
386 let drt = create_test_drt_async().await;
388
389 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
391
392 let uptime = drt.system_health.lock().uptime();
394 assert!(
395 uptime >= std::time::Duration::from_millis(50),
396 "Expected uptime to be at least 50ms, but got {:?}",
397 uptime
398 );
399
400 println!(
401 "✓ DRT uptime test passed (system enabled): uptime = {:?}",
402 uptime
403 );
404 })
405 .await;
406 }
407}