Skip to main content

koi_health/
lib.rs

1//! Koi Health - network health monitoring (Phase 7).
2
3mod checker;
4pub mod http;
5pub mod log;
6mod machine;
7mod service;
8mod state;
9
10use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13
14use chrono::{DateTime, Utc};
15use tokio::sync::{broadcast, RwLock};
16use tokio_util::sync::CancellationToken;
17
18use koi_common::capability::{Capability, CapabilityStatus};
19use koi_common::integration::{CertmeshSnapshot, DnsProbe, MdnsSnapshot, ProxySnapshot};
20
21use crate::checker::{run_checks_loop, run_checks_once, ServiceCheckState};
22use crate::machine::{collect_machine_health, MdnsTracker};
23use crate::state::{load_health_state, save_health_state, HealthCheckConfig, HealthChecksState};
24use crate::state::{DEFAULT_INTERVAL_SECS, DEFAULT_TIMEOUT_SECS};
25
26pub use machine::MachineHealth;
27pub use service::ServiceCheckKind;
28pub use service::ServiceStatus;
29pub use service::ServiceStatus as HealthStatus;
30pub use state::HealthCheckConfig as HealthCheck;
31
32/// Default machine health threshold (seconds since last seen).
33pub const DEFAULT_MACHINE_THRESHOLD_SECS: u64 = 60;
34
35/// Capacity for the health event broadcast channel.
36const BROADCAST_CHANNEL_CAPACITY: usize = 256;
37
38/// Events emitted by the health subsystem when service status changes.
39#[derive(Debug, Clone)]
40pub enum HealthEvent {
41    /// A service's health status changed.
42    StatusChanged { name: String, status: ServiceStatus },
43}
44
45/// Snapshot returned by health status queries.
46#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, utoipa::ToSchema)]
47pub struct HealthSnapshot {
48    pub machines: Vec<MachineHealth>,
49    pub services: Vec<ServiceHealth>,
50}
51
52/// Service health summary (config + current status).
53#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, utoipa::ToSchema)]
54pub struct ServiceHealth {
55    pub name: String,
56    pub kind: ServiceCheckKind,
57    pub target: String,
58    pub interval_secs: u64,
59    pub timeout_secs: u64,
60    pub status: ServiceStatus,
61    pub last_checked: Option<DateTime<Utc>>,
62    pub last_ok: Option<DateTime<Utc>>,
63    pub message: Option<String>,
64}
65
66/// Errors surfaced by the health domain.
67#[derive(Debug, thiserror::Error)]
68pub enum HealthError {
69    #[error("invalid health check: {0}")]
70    InvalidCheck(String),
71
72    #[error("health check not found: {0}")]
73    NotFound(String),
74
75    #[error("io error: {0}")]
76    Io(String),
77}
78
79/// Default timeout for the shared HTTP client used by health checks.
80const HTTP_CLIENT_TIMEOUT_SECS: u64 = 10;
81
82/// Core health facade.
83pub struct HealthCore {
84    mdns_tracker: Option<MdnsTracker>,
85    dns: Option<Arc<dyn DnsProbe>>,
86    certmesh: Option<Arc<dyn CertmeshSnapshot>>,
87    proxy: Option<Arc<dyn ProxySnapshot>>,
88    checks: Arc<RwLock<Vec<HealthCheckConfig>>>,
89    service_states: Arc<RwLock<HashMap<String, ServiceCheckState>>>,
90    machine_threshold: Duration,
91    started_at: Instant,
92    event_tx: broadcast::Sender<HealthEvent>,
93    http_client: reqwest::Client,
94}
95
96impl HealthCore {
97    pub async fn new(
98        mdns: Option<Arc<dyn MdnsSnapshot>>,
99        dns: Option<Arc<dyn DnsProbe>>,
100        certmesh: Option<Arc<dyn CertmeshSnapshot>>,
101        proxy: Option<Arc<dyn ProxySnapshot>>,
102    ) -> Self {
103        let checks = load_health_state()
104            .map(|state| state.checks)
105            .unwrap_or_default();
106
107        let mdns_tracker = match mdns {
108            Some(snapshot) => Some(MdnsTracker::spawn(snapshot).await),
109            None => None,
110        };
111
112        let (event_tx, _) = broadcast::channel(BROADCAST_CHANNEL_CAPACITY);
113
114        let http_client = reqwest::Client::builder()
115            .timeout(Duration::from_secs(HTTP_CLIENT_TIMEOUT_SECS))
116            .build()
117            .unwrap_or_default();
118
119        Self {
120            mdns_tracker,
121            dns,
122            certmesh,
123            proxy,
124            checks: Arc::new(RwLock::new(checks)),
125            service_states: Arc::new(RwLock::new(HashMap::new())),
126            machine_threshold: Duration::from_secs(DEFAULT_MACHINE_THRESHOLD_SECS),
127            started_at: Instant::now(),
128            event_tx,
129            http_client,
130        }
131    }
132
133    pub fn started_at(&self) -> Instant {
134        self.started_at
135    }
136
137    /// Shared HTTP client for health checks.
138    pub(crate) fn http_client(&self) -> &reqwest::Client {
139        &self.http_client
140    }
141
142    pub async fn snapshot(&self) -> HealthSnapshot {
143        let mdns_snapshot = self
144            .mdns_tracker
145            .as_ref()
146            .map(|tracker| tracker.snapshot())
147            .unwrap_or_default();
148
149        let machines = collect_machine_health(
150            &mdns_snapshot,
151            self.dns.as_ref(),
152            self.certmesh.as_ref(),
153            self.machine_threshold,
154        );
155
156        let mut checks = self.checks.read().await.clone();
157        checks.extend(self.proxy_checks());
158        let states = self.service_states.read().await.clone();
159        let services = checks
160            .into_iter()
161            .map(|check| {
162                let state = states.get(&check.name).cloned().unwrap_or_default();
163                ServiceHealth {
164                    name: check.name,
165                    kind: check.kind,
166                    target: check.target,
167                    interval_secs: check.interval_secs,
168                    timeout_secs: check.timeout_secs,
169                    status: state.status,
170                    last_checked: state.last_checked,
171                    last_ok: state.last_ok,
172                    message: state.message,
173                }
174            })
175            .collect();
176
177        HealthSnapshot { machines, services }
178    }
179
180    pub async fn list_checks(&self) -> Vec<HealthCheckConfig> {
181        self.checks.read().await.clone()
182    }
183
184    pub async fn add_check(&self, check: HealthCheckConfig) -> Result<(), HealthError> {
185        service::validate_check(&check).map_err(HealthError::InvalidCheck)?;
186
187        let mut checks = self.checks.write().await;
188        if checks.iter().any(|c| c.name == check.name) {
189            return Err(HealthError::InvalidCheck(format!(
190                "check already exists: {}",
191                check.name
192            )));
193        }
194
195        checks.push(check);
196        let state = HealthChecksState {
197            checks: checks.clone(),
198        };
199        save_health_state(&state).map_err(|e| HealthError::Io(e.to_string()))?;
200        Ok(())
201    }
202
203    pub async fn remove_check(&self, name: &str) -> Result<(), HealthError> {
204        let mut checks = self.checks.write().await;
205        let before = checks.len();
206        checks.retain(|c| c.name != name);
207        if checks.len() == before {
208            return Err(HealthError::NotFound(name.to_string()));
209        }
210        let state = HealthChecksState {
211            checks: checks.clone(),
212        };
213        save_health_state(&state).map_err(|e| HealthError::Io(e.to_string()))?;
214
215        let mut states = self.service_states.write().await;
216        states.remove(name);
217        Ok(())
218    }
219
220    pub async fn run_checks_once(&self) {
221        run_checks_once(self, &self.service_states).await;
222    }
223
224    /// Subscribe to health events.
225    pub fn subscribe(&self) -> broadcast::Receiver<HealthEvent> {
226        self.event_tx.subscribe()
227    }
228
229    /// Emit a health event (used by the checker loop).
230    pub(crate) fn emit(&self, event: HealthEvent) {
231        let _ = self.event_tx.send(event);
232    }
233
234    /// Generate health checks from proxy entries.
235    pub(crate) fn proxy_checks(&self) -> Vec<HealthCheckConfig> {
236        let Some(proxy) = &self.proxy else {
237            return Vec::new();
238        };
239        proxy
240            .entries()
241            .into_iter()
242            .map(|entry| HealthCheckConfig {
243                name: format!("proxy:{}", entry.name),
244                kind: ServiceCheckKind::Http,
245                target: entry.backend,
246                interval_secs: DEFAULT_INTERVAL_SECS,
247                timeout_secs: DEFAULT_TIMEOUT_SECS,
248            })
249            .collect()
250    }
251}
252
253#[cfg(test)]
254mod tests {
255    use super::*;
256
257    #[test]
258    fn subscribe_receives_emitted_status_changed() {
259        let (tx, _) = broadcast::channel::<HealthEvent>(16);
260        let mut rx = tx.subscribe();
261
262        let _ = tx.send(HealthEvent::StatusChanged {
263            name: "my-tcp".to_string(),
264            status: ServiceStatus::Up,
265        });
266
267        let event = rx.try_recv().expect("should receive event");
268        match event {
269            HealthEvent::StatusChanged { name, status } => {
270                assert_eq!(name, "my-tcp");
271                assert!(matches!(status, ServiceStatus::Up));
272            }
273        }
274    }
275
276    #[test]
277    fn subscribe_receives_status_down() {
278        let (tx, _) = broadcast::channel::<HealthEvent>(16);
279        let mut rx = tx.subscribe();
280
281        let _ = tx.send(HealthEvent::StatusChanged {
282            name: "my-http".to_string(),
283            status: ServiceStatus::Down,
284        });
285
286        let event = rx.try_recv().expect("should receive event");
287        match event {
288            HealthEvent::StatusChanged { name, status } => {
289                assert_eq!(name, "my-http");
290                assert!(matches!(status, ServiceStatus::Down));
291            }
292        }
293    }
294
295    #[test]
296    fn no_event_when_no_send() {
297        let (tx, _) = broadcast::channel::<HealthEvent>(16);
298        let mut rx = tx.subscribe();
299        assert!(rx.try_recv().is_err());
300        drop(tx);
301    }
302}
303
304impl Capability for HealthCore {
305    fn name(&self) -> &str {
306        "health"
307    }
308
309    fn status(&self) -> CapabilityStatus {
310        let (total, up) = match self.service_states.try_read() {
311            Ok(services) => {
312                let total = services.len();
313                let up = services
314                    .values()
315                    .filter(|s| matches!(s.status, ServiceStatus::Up))
316                    .count();
317                (total, up)
318            }
319            Err(_) => (0, 0),
320        };
321        let summary = format!("{} services up ({} total)", up, total);
322        CapabilityStatus {
323            name: "health".to_string(),
324            summary,
325            healthy: true,
326        }
327    }
328}
329
330/// Runtime controller for the background service checks.
331pub struct HealthRuntime {
332    core: Arc<HealthCore>,
333    state: Arc<tokio::sync::Mutex<RuntimeState>>,
334}
335
336#[derive(Debug, Clone, Copy, serde::Serialize)]
337pub struct HealthRuntimeStatus {
338    pub running: bool,
339}
340
341struct RuntimeState {
342    running: bool,
343    cancel: Option<CancellationToken>,
344}
345
346impl HealthRuntime {
347    pub fn new(core: Arc<HealthCore>) -> Self {
348        Self {
349            core,
350            state: Arc::new(tokio::sync::Mutex::new(RuntimeState {
351                running: false,
352                cancel: None,
353            })),
354        }
355    }
356
357    pub fn core(&self) -> Arc<HealthCore> {
358        Arc::clone(&self.core)
359    }
360
361    pub async fn start(&self) -> Result<bool, HealthError> {
362        let mut state = self.state.lock().await;
363        if state.running {
364            return Ok(false);
365        }
366        let token = CancellationToken::new();
367        state.cancel = Some(token.clone());
368        state.running = true;
369        drop(state);
370
371        let core = Arc::clone(&self.core);
372        let state = Arc::clone(&self.state);
373        tokio::spawn(async move {
374            run_checks_loop(core, token).await;
375            let mut guard = state.lock().await;
376            guard.running = false;
377            guard.cancel = None;
378        });
379
380        Ok(true)
381    }
382
383    pub async fn stop(&self) -> bool {
384        let mut state = self.state.lock().await;
385        if let Some(token) = state.cancel.take() {
386            token.cancel();
387            state.running = false;
388            true
389        } else {
390            false
391        }
392    }
393
394    pub async fn status(&self) -> HealthRuntimeStatus {
395        let state = self.state.lock().await;
396        HealthRuntimeStatus {
397            running: state.running,
398        }
399    }
400}