Skip to main content

koi_health/
lib.rs

1//! Koi Health - network health monitoring (Phase 7).
2
3mod checker;
4pub mod http;
5pub mod log;
6mod machine;
7mod service;
8mod state;
9
10use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13
14use chrono::{DateTime, Utc};
15use tokio::sync::{broadcast, RwLock};
16
17use koi_common::capability::{Capability, CapabilityStatus};
18use koi_common::integration::{CertmeshSnapshot, DnsProbe, MdnsSnapshot, ProxySnapshot};
19use koi_common::runtime_state::DomainRuntime;
20
21use crate::checker::{run_checks_loop, run_checks_once, ServiceCheckState};
22use crate::machine::{collect_machine_health, MdnsTracker};
23use crate::state::{load_health_state, save_health_state, HealthCheckConfig, HealthChecksState};
24use crate::state::{DEFAULT_INTERVAL_SECS, DEFAULT_TIMEOUT_SECS};
25
26pub use machine::MachineHealth;
27pub use service::ServiceCheckKind;
28pub use service::ServiceStatus;
29pub use service::ServiceStatus as HealthStatus;
30pub use state::HealthCheckConfig as HealthCheck;
31
32/// Default machine health threshold (seconds since last seen).
33pub const DEFAULT_MACHINE_THRESHOLD_SECS: u64 = 60;
34
35/// Events emitted by the health subsystem when service status changes.
36#[derive(Debug, Clone)]
37pub enum HealthEvent {
38    /// A service's health status changed.
39    StatusChanged { name: String, status: ServiceStatus },
40}
41
42/// Snapshot returned by health status queries.
43#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, utoipa::ToSchema)]
44pub struct HealthSnapshot {
45    pub machines: Vec<MachineHealth>,
46    pub services: Vec<ServiceHealth>,
47}
48
49/// Service health summary (config + current status).
50#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, utoipa::ToSchema)]
51pub struct ServiceHealth {
52    pub name: String,
53    pub kind: ServiceCheckKind,
54    pub target: String,
55    pub interval_secs: u64,
56    pub timeout_secs: u64,
57    pub status: ServiceStatus,
58    pub last_checked: Option<DateTime<Utc>>,
59    pub last_ok: Option<DateTime<Utc>>,
60    pub message: Option<String>,
61}
62
63/// Errors surfaced by the health domain.
64#[derive(Debug, thiserror::Error)]
65pub enum HealthError {
66    #[error("invalid health check: {0}")]
67    InvalidCheck(String),
68
69    #[error("health check not found: {0}")]
70    NotFound(String),
71
72    #[error("io error: {0}")]
73    Io(String),
74}
75
76/// Default timeout for the shared HTTP client used by health checks.
77const HTTP_CLIENT_TIMEOUT_SECS: u64 = 10;
78
79/// Core health facade.
80pub struct HealthCore {
81    mdns_tracker: Option<MdnsTracker>,
82    dns: Option<Arc<dyn DnsProbe>>,
83    certmesh: Option<Arc<dyn CertmeshSnapshot>>,
84    proxy: Option<Arc<dyn ProxySnapshot>>,
85    checks: Arc<RwLock<Vec<HealthCheckConfig>>>,
86    service_states: Arc<RwLock<HashMap<String, ServiceCheckState>>>,
87    machine_threshold: Duration,
88    started_at: Instant,
89    event_tx: broadcast::Sender<HealthEvent>,
90    http_client: reqwest::Client,
91}
92
93impl HealthCore {
94    pub async fn new(
95        mdns: Option<Arc<dyn MdnsSnapshot>>,
96        dns: Option<Arc<dyn DnsProbe>>,
97        certmesh: Option<Arc<dyn CertmeshSnapshot>>,
98        proxy: Option<Arc<dyn ProxySnapshot>>,
99    ) -> Self {
100        let checks = load_health_state()
101            .map(|state| state.checks)
102            .unwrap_or_default();
103
104        let mdns_tracker = match mdns {
105            Some(snapshot) => Some(MdnsTracker::spawn(snapshot).await),
106            None => None,
107        };
108
109        let (event_tx, _) = koi_common::events::event_channel();
110
111        let http_client = reqwest::Client::builder()
112            .timeout(Duration::from_secs(HTTP_CLIENT_TIMEOUT_SECS))
113            .build()
114            .unwrap_or_default();
115
116        Self {
117            mdns_tracker,
118            dns,
119            certmesh,
120            proxy,
121            checks: Arc::new(RwLock::new(checks)),
122            service_states: Arc::new(RwLock::new(HashMap::new())),
123            machine_threshold: Duration::from_secs(DEFAULT_MACHINE_THRESHOLD_SECS),
124            started_at: Instant::now(),
125            event_tx,
126            http_client,
127        }
128    }
129
130    pub fn started_at(&self) -> Instant {
131        self.started_at
132    }
133
134    /// Shared HTTP client for health checks.
135    pub(crate) fn http_client(&self) -> &reqwest::Client {
136        &self.http_client
137    }
138
139    pub async fn snapshot(&self) -> HealthSnapshot {
140        let mdns_snapshot = self
141            .mdns_tracker
142            .as_ref()
143            .map(|tracker| tracker.snapshot())
144            .unwrap_or_default();
145
146        let machines = collect_machine_health(
147            &mdns_snapshot,
148            self.dns.as_ref(),
149            self.certmesh.as_ref(),
150            self.machine_threshold,
151        );
152
153        let mut checks = self.checks.read().await.clone();
154        checks.extend(self.proxy_checks());
155        let states = self.service_states.read().await.clone();
156        let services = checks
157            .into_iter()
158            .map(|check| {
159                let state = states.get(&check.name).cloned().unwrap_or_default();
160                ServiceHealth {
161                    name: check.name,
162                    kind: check.kind,
163                    target: check.target,
164                    interval_secs: check.interval_secs,
165                    timeout_secs: check.timeout_secs,
166                    status: state.status,
167                    last_checked: state.last_checked,
168                    last_ok: state.last_ok,
169                    message: state.message,
170                }
171            })
172            .collect();
173
174        HealthSnapshot { machines, services }
175    }
176
177    pub async fn list_checks(&self) -> Vec<HealthCheckConfig> {
178        self.checks.read().await.clone()
179    }
180
181    pub async fn add_check(&self, check: HealthCheckConfig) -> Result<(), HealthError> {
182        service::validate_check(&check).map_err(HealthError::InvalidCheck)?;
183
184        let mut checks = self.checks.write().await;
185        if checks.iter().any(|c| c.name == check.name) {
186            return Err(HealthError::InvalidCheck(format!(
187                "check already exists: {}",
188                check.name
189            )));
190        }
191
192        checks.push(check);
193        let state = HealthChecksState {
194            checks: checks.clone(),
195        };
196        save_health_state(&state).map_err(|e| HealthError::Io(e.to_string()))?;
197        Ok(())
198    }
199
200    pub async fn remove_check(&self, name: &str) -> Result<(), HealthError> {
201        let mut checks = self.checks.write().await;
202        let before = checks.len();
203        checks.retain(|c| c.name != name);
204        if checks.len() == before {
205            return Err(HealthError::NotFound(name.to_string()));
206        }
207        let state = HealthChecksState {
208            checks: checks.clone(),
209        };
210        save_health_state(&state).map_err(|e| HealthError::Io(e.to_string()))?;
211
212        let mut states = self.service_states.write().await;
213        states.remove(name);
214        Ok(())
215    }
216
217    pub async fn run_checks_once(&self) {
218        run_checks_once(self, &self.service_states).await;
219    }
220
221    /// Subscribe to health events.
222    pub fn subscribe(&self) -> broadcast::Receiver<HealthEvent> {
223        self.event_tx.subscribe()
224    }
225
226    /// Emit a health event (used by the checker loop).
227    pub(crate) fn emit(&self, event: HealthEvent) {
228        let _ = self.event_tx.send(event);
229    }
230
231    /// Generate health checks from proxy entries.
232    pub(crate) fn proxy_checks(&self) -> Vec<HealthCheckConfig> {
233        let Some(proxy) = &self.proxy else {
234            return Vec::new();
235        };
236        proxy
237            .entries()
238            .into_iter()
239            .map(|entry| HealthCheckConfig {
240                name: format!("proxy:{}", entry.name),
241                kind: ServiceCheckKind::Http,
242                target: entry.backend,
243                interval_secs: DEFAULT_INTERVAL_SECS,
244                timeout_secs: DEFAULT_TIMEOUT_SECS,
245            })
246            .collect()
247    }
248}
249
250#[cfg(test)]
251mod tests {
252    use super::*;
253    use crate::service::ServiceCheckKind;
254    use crate::state::HealthCheckConfig;
255
256    /// Drives a real status transition through HealthCore: subscribe → add a TCP
257    /// check pointing at a closed local port → run_checks_once() (real TCP attempt
258    /// fails) → the Unknown→Down transition emits StatusChanged through the core's
259    /// own channel. Fails if the checker stops emitting (tests Koi, not tokio).
260    #[tokio::test]
261    async fn run_checks_emits_status_changed_through_core() {
262        let _ = koi_common::test::ensure_data_dir("koi-health-event-tests");
263
264        let core = HealthCore::new(None, None, None, None).await;
265        let mut rx = core.subscribe();
266
267        // Unique name so concurrent / repeated tests in the same process don't
268        // collide on the persisted check list. Port 1 is effectively never
269        // listening, so the TCP connect fails fast on localhost (no network).
270        let name = format!("evt-{}", koi_common::id::generate_short_id());
271        core.add_check(HealthCheckConfig {
272            name: name.clone(),
273            kind: ServiceCheckKind::Tcp,
274            target: "127.0.0.1:1".to_string(),
275            interval_secs: 1,
276            timeout_secs: 1,
277        })
278        .await
279        .expect("add_check should succeed");
280
281        // First run: Unknown -> Down, which emits a StatusChanged for our check.
282        core.run_checks_once().await;
283
284        let event = rx
285            .try_recv()
286            .expect("a StatusChanged should have been emitted");
287        let HealthEvent::StatusChanged {
288            name: evt_name,
289            status,
290        } = event;
291        assert_eq!(evt_name, name);
292        assert!(
293            matches!(status, ServiceStatus::Down),
294            "closed local port should report Down, got {status:?}"
295        );
296
297        // Cleanup the persisted check so the shared state file stays tidy.
298        let _ = core.remove_check(&name).await;
299    }
300
301    /// Probes for a tick must run CONCURRENTLY: N checks against a server that
302    /// accepts but never replies each hang until their own timeout, so the tick
303    /// should take ~one timeout, not N. Guards the regression where a slow /
304    /// timing-out check serialized (delayed) all the others.
305    #[tokio::test]
306    async fn run_checks_probe_concurrently() {
307        let _ = koi_common::test::ensure_data_dir("koi-health-concurrency-tests");
308
309        // Accept connections but never respond, and HOLD the streams open so the
310        // client side blocks until its request timeout. Dropping the streams
311        // would close the connection and fail fast, defeating the measurement.
312        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
313        let addr = listener.local_addr().unwrap();
314        let _server = tokio::spawn(async move {
315            let mut held = Vec::new();
316            while let Ok((stream, _)) = listener.accept().await {
317                held.push(stream);
318            }
319        });
320
321        let core = HealthCore::new(None, None, None, None).await;
322
323        let timeout_secs = 1u64;
324        let n = 4usize;
325        let mut names = Vec::new();
326        for _ in 0..n {
327            let name = format!("conc-{}", koi_common::id::generate_short_id());
328            core.add_check(HealthCheckConfig {
329                name: name.clone(),
330                kind: ServiceCheckKind::Http,
331                target: format!("http://{addr}/"),
332                interval_secs: 1,
333                timeout_secs,
334            })
335            .await
336            .expect("add_check should succeed");
337            names.push(name);
338        }
339
340        let start = std::time::Instant::now();
341        core.run_checks_once().await;
342        let elapsed = start.elapsed();
343
344        // Lower bound: the probes really hit their ~1s timeout (not a fast-fail).
345        // Upper bound: well under the sequential floor (n * timeout = 4s).
346        assert!(
347            elapsed >= std::time::Duration::from_millis(700),
348            "probes should have hit their ~1s timeout, took {elapsed:?}"
349        );
350        assert!(
351            elapsed < std::time::Duration::from_millis(3000),
352            "{n} x {timeout_secs}s checks must run concurrently (sequential would be ~{}s), took {elapsed:?}",
353            n as u64 * timeout_secs
354        );
355
356        for name in &names {
357            let _ = core.remove_check(name).await;
358        }
359    }
360}
361
362#[async_trait::async_trait]
363impl Capability for HealthCore {
364    fn name(&self) -> &str {
365        "health"
366    }
367
368    async fn status(&self) -> CapabilityStatus {
369        let (total, up) = match self.service_states.try_read() {
370            Ok(services) => {
371                let total = services.len();
372                let up = services
373                    .values()
374                    .filter(|s| matches!(s.status, ServiceStatus::Up))
375                    .count();
376                (total, up)
377            }
378            Err(_) => (0, 0),
379        };
380        let summary = format!("{} services up ({} total)", up, total);
381        CapabilityStatus {
382            name: "health".to_string(),
383            summary,
384            healthy: true,
385        }
386    }
387}
388
389/// Runtime controller for the background service checks.
390///
391/// A thin wrapper over the shared [`DomainRuntime`] start/stop machine; the only
392/// health-specific piece is the spawned loop (`run_checks_loop(core, token)`).
393#[derive(Clone)]
394pub struct HealthRuntime {
395    inner: DomainRuntime<HealthCore>,
396}
397
398#[derive(Debug, Clone, Copy, serde::Serialize)]
399pub struct HealthRuntimeStatus {
400    pub running: bool,
401}
402
403impl HealthRuntime {
404    pub fn new(core: Arc<HealthCore>) -> Self {
405        Self {
406            inner: DomainRuntime::new(core),
407        }
408    }
409
410    pub fn core(&self) -> Arc<HealthCore> {
411        self.inner.core()
412    }
413
414    pub async fn start(&self) -> Result<bool, HealthError> {
415        let core = self.inner.core();
416        // DomainRuntime::start signals already-running via Ok(false) and never yields
417        // AlreadyRunning for this launcher; the Result<_, HealthError> shape is preserved.
418        let started = self
419            .inner
420            .start(move |token| tokio::spawn(run_checks_loop(core, token)))
421            .await
422            .unwrap_or(false);
423        Ok(started)
424    }
425
426    pub async fn stop(&self) -> bool {
427        self.inner.stop().await
428    }
429
430    pub async fn status(&self) -> HealthRuntimeStatus {
431        HealthRuntimeStatus {
432            running: self.inner.status().await.running,
433        }
434    }
435}