Skip to main content

dynamo_runtime/
system_health.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! System health monitoring and health check management
17
18use std::{
19    collections::HashMap,
20    sync::{Arc, OnceLock},
21    time::Instant,
22};
23use tokio::sync::mpsc;
24
25use crate::component;
26use crate::config::HealthStatus;
27use crate::metrics::{MetricsHierarchy, prometheus_names::distributed_runtime};
28
29/// Health check target containing instance info and payload
30#[derive(Clone, Debug)]
31pub struct HealthCheckTarget {
32    pub instance: component::Instance,
33    pub payload: serde_json::Value,
34}
35
36/// Current Health Status
37/// If use_endpoint_health_status is set then
38/// initialize the endpoint_health hashmap to the
39/// starting health status
40#[derive(Clone)]
41pub struct SystemHealth {
42    system_health: HealthStatus,
43    endpoint_health: Arc<std::sync::RwLock<HashMap<String, HealthStatus>>>,
44    /// Maps endpoint subject to health check target (instance + payload)
45    health_check_targets: Arc<std::sync::RwLock<HashMap<String, HealthCheckTarget>>>,
46    /// Maps endpoint subject to its specific health check notifier
47    health_check_notifiers: Arc<std::sync::RwLock<HashMap<String, Arc<tokio::sync::Notify>>>>,
48    /// Channel for new endpoint registrations
49    /// This solves the race condition where HealthCheckManager starts before endpoints are registered
50    /// Using a channel ensures no registrations are lost.
51    new_endpoint_tx: mpsc::UnboundedSender<String>,
52    new_endpoint_rx: Arc<parking_lot::Mutex<Option<mpsc::UnboundedReceiver<String>>>>,
53    use_endpoint_health_status: Vec<String>,
54    health_check_enabled: bool,
55    health_path: String,
56    live_path: String,
57    start_time: Instant,
58    uptime_gauge: OnceLock<prometheus::Gauge>,
59}
60
61impl SystemHealth {
62    pub fn new(
63        starting_health_status: HealthStatus,
64        use_endpoint_health_status: Vec<String>,
65        health_check_enabled: bool,
66        health_path: String,
67        live_path: String,
68    ) -> Self {
69        // Force NotReady when canary is enabled — canary verifies before marking Ready.
70        let initial_endpoint_status = if health_check_enabled {
71            HealthStatus::NotReady
72        } else {
73            starting_health_status.clone()
74        };
75        let mut endpoint_health = HashMap::new();
76        for endpoint in &use_endpoint_health_status {
77            endpoint_health.insert(endpoint.clone(), initial_endpoint_status.clone());
78        }
79
80        // Create the channel for endpoint registration notifications
81        let (tx, rx) = mpsc::unbounded_channel();
82
83        SystemHealth {
84            system_health: starting_health_status,
85            endpoint_health: Arc::new(std::sync::RwLock::new(endpoint_health)),
86            health_check_targets: Arc::new(std::sync::RwLock::new(HashMap::new())),
87            health_check_notifiers: Arc::new(std::sync::RwLock::new(HashMap::new())),
88            new_endpoint_tx: tx,
89            new_endpoint_rx: Arc::new(parking_lot::Mutex::new(Some(rx))),
90            use_endpoint_health_status,
91            health_check_enabled,
92            health_path,
93            live_path,
94            start_time: Instant::now(),
95            uptime_gauge: OnceLock::new(),
96        }
97    }
98
99    pub fn health_check_enabled(&self) -> bool {
100        self.health_check_enabled
101    }
102
103    /// Signal endpoint transport registration. Sets Ready when canary is disabled;
104    /// no-op when canary is enabled (canary will set Ready after verification).
105    pub fn set_endpoint_registered(&self, endpoint: &str) {
106        if !self.health_check_enabled {
107            self.set_endpoint_health_status(endpoint, HealthStatus::Ready);
108        }
109    }
110
111    pub fn set_health_status(&mut self, status: HealthStatus) {
112        self.system_health = status;
113    }
114
115    pub fn set_endpoint_health_status(&self, endpoint: &str, status: HealthStatus) {
116        let mut endpoint_health = self.endpoint_health.write().unwrap();
117        endpoint_health.insert(endpoint.to_string(), status);
118    }
119
120    /// Returns the overall health status and endpoint health statuses
121    /// System health is determined by ALL endpoints that have registered health checks
122    pub fn get_health_status(&self) -> (bool, HashMap<String, String>) {
123        let health_check_targets = self.health_check_targets.read().unwrap();
124        let endpoint_health = self.endpoint_health.read().unwrap();
125        let mut endpoints: HashMap<String, String> = HashMap::new();
126
127        for (endpoint, status) in endpoint_health.iter() {
128            endpoints.insert(
129                endpoint.clone(),
130                if *status == HealthStatus::Ready {
131                    "ready".to_string()
132                } else {
133                    "notready".to_string()
134                },
135            );
136        }
137
138        let healthy = if !self.use_endpoint_health_status.is_empty() {
139            self.use_endpoint_health_status.iter().all(|endpoint| {
140                endpoint_health
141                    .get(endpoint)
142                    .is_some_and(|status| *status == HealthStatus::Ready)
143            })
144        } else {
145            // If we have registered health check targets, use them to determine health
146            if !health_check_targets.is_empty() {
147                health_check_targets
148                    .iter()
149                    .all(|(endpoint_subject, _target)| {
150                        endpoint_health
151                            .get(endpoint_subject)
152                            .is_some_and(|status| *status == HealthStatus::Ready)
153                    })
154            } else {
155                // No health check targets registered, use simple system health
156                self.system_health == HealthStatus::Ready
157            }
158        };
159
160        (healthy, endpoints)
161    }
162
163    /// Register a health check target for an endpoint
164    pub fn register_health_check_target(
165        &self,
166        endpoint_subject: &str,
167        instance: component::Instance,
168        payload: serde_json::Value,
169    ) {
170        let key = endpoint_subject.to_owned();
171
172        // Atomically check+insert under a single write lock to avoid races.
173        let inserted = {
174            let mut targets = self.health_check_targets.write().unwrap();
175            match targets.entry(key.clone()) {
176                std::collections::hash_map::Entry::Occupied(_) => false,
177                std::collections::hash_map::Entry::Vacant(v) => {
178                    v.insert(HealthCheckTarget { instance, payload });
179                    true
180                }
181            }
182        };
183
184        if !inserted {
185            tracing::warn!(
186                "Attempted to re-register health check for endpoint '{}'; ignoring.",
187                key
188            );
189            return;
190        }
191
192        // Create and store a unique notifier for this endpoint (idempotent).
193        {
194            let mut notifiers = self.health_check_notifiers.write().unwrap();
195            notifiers
196                .entry(key.clone())
197                .or_insert_with(|| Arc::new(tokio::sync::Notify::new()));
198        }
199
200        // Initialize endpoint health status conservatively to NotReady.
201        {
202            let mut endpoint_health = self.endpoint_health.write().unwrap();
203            endpoint_health
204                .entry(key.clone())
205                .or_insert(HealthStatus::NotReady);
206        }
207
208        if let Err(e) = self.new_endpoint_tx.send(key.clone()) {
209            tracing::error!(
210                "Failed to send endpoint '{}' registration to health check manager: {}. \
211                 Health checks will not be performed for this endpoint.",
212                key,
213                e
214            );
215        }
216    }
217
218    /// Get all health check targets
219    pub fn get_health_check_targets(&self) -> Vec<(String, HealthCheckTarget)> {
220        let targets = self.health_check_targets.read().unwrap();
221        targets
222            .iter()
223            .map(|(k, v)| (k.clone(), v.clone()))
224            .collect()
225    }
226
227    /// Check if any health check targets are registered
228    pub fn has_health_check_targets(&self) -> bool {
229        let targets = self.health_check_targets.read().unwrap();
230        !targets.is_empty()
231    }
232
233    /// Get list of endpoints with health check targets
234    pub fn get_health_check_endpoints(&self) -> Vec<String> {
235        let targets = self.health_check_targets.read().unwrap();
236        targets.keys().cloned().collect()
237    }
238
239    /// Get health check target for a specific endpoint
240    pub fn get_health_check_target(&self, endpoint: &str) -> Option<HealthCheckTarget> {
241        let targets = self.health_check_targets.read().unwrap();
242        targets.get(endpoint).cloned()
243    }
244
245    /// Get the endpoint health status (Ready/NotReady)
246    pub fn get_endpoint_health_status(&self, endpoint: &str) -> Option<HealthStatus> {
247        let endpoint_health = self.endpoint_health.read().unwrap();
248        endpoint_health.get(endpoint).cloned()
249    }
250
251    /// Get the endpoint-specific health check notifier
252    pub fn get_endpoint_health_check_notifier(
253        &self,
254        endpoint_subject: &str,
255    ) -> Option<Arc<tokio::sync::Notify>> {
256        let notifiers = self.health_check_notifiers.read().unwrap();
257        notifiers.get(endpoint_subject).cloned()
258    }
259
260    /// Take the receiver for new endpoint registrations (can only be called once)
261    /// This is used by HealthCheckManager to receive notifications of new endpoints
262    pub fn take_new_endpoint_receiver(&self) -> Option<mpsc::UnboundedReceiver<String>> {
263        self.new_endpoint_rx.lock().take()
264    }
265
266    /// Initialize the uptime gauge using the provided metrics registry
267    pub fn initialize_uptime_gauge<T: MetricsHierarchy>(&self, registry: &T) -> anyhow::Result<()> {
268        let gauge = registry.metrics().create_gauge(
269            distributed_runtime::UPTIME_SECONDS,
270            "Total uptime of the DistributedRuntime in seconds",
271            &[],
272        )?;
273        self.uptime_gauge
274            .set(gauge)
275            .map_err(|_| anyhow::anyhow!("uptime_gauge already initialized"))?;
276        Ok(())
277    }
278
279    /// Get the current uptime as a Duration
280    pub fn uptime(&self) -> std::time::Duration {
281        self.start_time.elapsed()
282    }
283
284    /// Update the uptime gauge with the current uptime value
285    pub fn update_uptime_gauge(&self) {
286        if let Some(gauge) = self.uptime_gauge.get() {
287            gauge.set(self.uptime().as_secs_f64());
288        }
289    }
290
291    /// Get the health check path
292    pub fn health_path(&self) -> &str {
293        &self.health_path
294    }
295
296    /// Get the liveness check path
297    pub fn live_path(&self) -> &str {
298        &self.live_path
299    }
300}