dynamo_runtime/
system_health.rs1use std::{
19 collections::HashMap,
20 sync::{Arc, OnceLock},
21 time::Instant,
22};
23use tokio::sync::mpsc;
24
25use crate::component;
26use crate::config::HealthStatus;
27use crate::metrics::{MetricsHierarchy, prometheus_names::distributed_runtime};
28
29#[derive(Clone, Debug)]
31pub struct HealthCheckTarget {
32 pub instance: component::Instance,
33 pub payload: serde_json::Value,
34}
35
36#[derive(Clone)]
41pub struct SystemHealth {
42 system_health: HealthStatus,
43 endpoint_health: Arc<std::sync::RwLock<HashMap<String, HealthStatus>>>,
44 health_check_targets: Arc<std::sync::RwLock<HashMap<String, HealthCheckTarget>>>,
46 health_check_notifiers: Arc<std::sync::RwLock<HashMap<String, Arc<tokio::sync::Notify>>>>,
48 new_endpoint_tx: mpsc::UnboundedSender<String>,
52 new_endpoint_rx: Arc<parking_lot::Mutex<Option<mpsc::UnboundedReceiver<String>>>>,
53 use_endpoint_health_status: Vec<String>,
54 health_check_enabled: bool,
55 health_path: String,
56 live_path: String,
57 start_time: Instant,
58 uptime_gauge: OnceLock<prometheus::Gauge>,
59}
60
61impl SystemHealth {
62 pub fn new(
63 starting_health_status: HealthStatus,
64 use_endpoint_health_status: Vec<String>,
65 health_check_enabled: bool,
66 health_path: String,
67 live_path: String,
68 ) -> Self {
69 let initial_endpoint_status = if health_check_enabled {
71 HealthStatus::NotReady
72 } else {
73 starting_health_status.clone()
74 };
75 let mut endpoint_health = HashMap::new();
76 for endpoint in &use_endpoint_health_status {
77 endpoint_health.insert(endpoint.clone(), initial_endpoint_status.clone());
78 }
79
80 let (tx, rx) = mpsc::unbounded_channel();
82
83 SystemHealth {
84 system_health: starting_health_status,
85 endpoint_health: Arc::new(std::sync::RwLock::new(endpoint_health)),
86 health_check_targets: Arc::new(std::sync::RwLock::new(HashMap::new())),
87 health_check_notifiers: Arc::new(std::sync::RwLock::new(HashMap::new())),
88 new_endpoint_tx: tx,
89 new_endpoint_rx: Arc::new(parking_lot::Mutex::new(Some(rx))),
90 use_endpoint_health_status,
91 health_check_enabled,
92 health_path,
93 live_path,
94 start_time: Instant::now(),
95 uptime_gauge: OnceLock::new(),
96 }
97 }
98
99 pub fn health_check_enabled(&self) -> bool {
100 self.health_check_enabled
101 }
102
103 pub fn set_endpoint_registered(&self, endpoint: &str) {
106 if !self.health_check_enabled {
107 self.set_endpoint_health_status(endpoint, HealthStatus::Ready);
108 }
109 }
110
111 pub fn set_health_status(&mut self, status: HealthStatus) {
112 self.system_health = status;
113 }
114
115 pub fn set_endpoint_health_status(&self, endpoint: &str, status: HealthStatus) {
116 let mut endpoint_health = self.endpoint_health.write().unwrap();
117 endpoint_health.insert(endpoint.to_string(), status);
118 }
119
120 pub fn get_health_status(&self) -> (bool, HashMap<String, String>) {
123 let health_check_targets = self.health_check_targets.read().unwrap();
124 let endpoint_health = self.endpoint_health.read().unwrap();
125 let mut endpoints: HashMap<String, String> = HashMap::new();
126
127 for (endpoint, status) in endpoint_health.iter() {
128 endpoints.insert(
129 endpoint.clone(),
130 if *status == HealthStatus::Ready {
131 "ready".to_string()
132 } else {
133 "notready".to_string()
134 },
135 );
136 }
137
138 let healthy = if !self.use_endpoint_health_status.is_empty() {
139 self.use_endpoint_health_status.iter().all(|endpoint| {
140 endpoint_health
141 .get(endpoint)
142 .is_some_and(|status| *status == HealthStatus::Ready)
143 })
144 } else {
145 if !health_check_targets.is_empty() {
147 health_check_targets
148 .iter()
149 .all(|(endpoint_subject, _target)| {
150 endpoint_health
151 .get(endpoint_subject)
152 .is_some_and(|status| *status == HealthStatus::Ready)
153 })
154 } else {
155 self.system_health == HealthStatus::Ready
157 }
158 };
159
160 (healthy, endpoints)
161 }
162
163 pub fn register_health_check_target(
165 &self,
166 endpoint_subject: &str,
167 instance: component::Instance,
168 payload: serde_json::Value,
169 ) {
170 let key = endpoint_subject.to_owned();
171
172 let inserted = {
174 let mut targets = self.health_check_targets.write().unwrap();
175 match targets.entry(key.clone()) {
176 std::collections::hash_map::Entry::Occupied(_) => false,
177 std::collections::hash_map::Entry::Vacant(v) => {
178 v.insert(HealthCheckTarget { instance, payload });
179 true
180 }
181 }
182 };
183
184 if !inserted {
185 tracing::warn!(
186 "Attempted to re-register health check for endpoint '{}'; ignoring.",
187 key
188 );
189 return;
190 }
191
192 {
194 let mut notifiers = self.health_check_notifiers.write().unwrap();
195 notifiers
196 .entry(key.clone())
197 .or_insert_with(|| Arc::new(tokio::sync::Notify::new()));
198 }
199
200 {
202 let mut endpoint_health = self.endpoint_health.write().unwrap();
203 endpoint_health
204 .entry(key.clone())
205 .or_insert(HealthStatus::NotReady);
206 }
207
208 if let Err(e) = self.new_endpoint_tx.send(key.clone()) {
209 tracing::error!(
210 "Failed to send endpoint '{}' registration to health check manager: {}. \
211 Health checks will not be performed for this endpoint.",
212 key,
213 e
214 );
215 }
216 }
217
218 pub fn get_health_check_targets(&self) -> Vec<(String, HealthCheckTarget)> {
220 let targets = self.health_check_targets.read().unwrap();
221 targets
222 .iter()
223 .map(|(k, v)| (k.clone(), v.clone()))
224 .collect()
225 }
226
227 pub fn has_health_check_targets(&self) -> bool {
229 let targets = self.health_check_targets.read().unwrap();
230 !targets.is_empty()
231 }
232
233 pub fn get_health_check_endpoints(&self) -> Vec<String> {
235 let targets = self.health_check_targets.read().unwrap();
236 targets.keys().cloned().collect()
237 }
238
239 pub fn get_health_check_target(&self, endpoint: &str) -> Option<HealthCheckTarget> {
241 let targets = self.health_check_targets.read().unwrap();
242 targets.get(endpoint).cloned()
243 }
244
245 pub fn get_endpoint_health_status(&self, endpoint: &str) -> Option<HealthStatus> {
247 let endpoint_health = self.endpoint_health.read().unwrap();
248 endpoint_health.get(endpoint).cloned()
249 }
250
251 pub fn get_endpoint_health_check_notifier(
253 &self,
254 endpoint_subject: &str,
255 ) -> Option<Arc<tokio::sync::Notify>> {
256 let notifiers = self.health_check_notifiers.read().unwrap();
257 notifiers.get(endpoint_subject).cloned()
258 }
259
260 pub fn take_new_endpoint_receiver(&self) -> Option<mpsc::UnboundedReceiver<String>> {
263 self.new_endpoint_rx.lock().take()
264 }
265
266 pub fn initialize_uptime_gauge<T: MetricsHierarchy>(&self, registry: &T) -> anyhow::Result<()> {
268 let gauge = registry.metrics().create_gauge(
269 distributed_runtime::UPTIME_SECONDS,
270 "Total uptime of the DistributedRuntime in seconds",
271 &[],
272 )?;
273 self.uptime_gauge
274 .set(gauge)
275 .map_err(|_| anyhow::anyhow!("uptime_gauge already initialized"))?;
276 Ok(())
277 }
278
279 pub fn uptime(&self) -> std::time::Duration {
281 self.start_time.elapsed()
282 }
283
284 pub fn update_uptime_gauge(&self) {
286 if let Some(gauge) = self.uptime_gauge.get() {
287 gauge.set(self.uptime().as_secs_f64());
288 }
289 }
290
291 pub fn health_path(&self) -> &str {
293 &self.health_path
294 }
295
296 pub fn live_path(&self) -> &str {
298 &self.live_path
299 }
300}