Skip to main content

koi_health/
lib.rs

1//! Koi Health - network health monitoring (Phase 7).
2
3mod checker;
4pub mod http;
5pub mod log;
6mod machine;
7mod service;
8mod state;
9
10use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13
14use chrono::{DateTime, Utc};
15use tokio::sync::{broadcast, RwLock};
16
17use koi_common::capability::{Capability, CapabilityStatus};
18use koi_common::integration::{CertmeshSnapshot, DnsProbe, MdnsSnapshot, ProxySnapshot};
19use koi_common::runtime_state::DomainRuntime;
20
21use crate::checker::{run_checks_loop, run_checks_once, ServiceCheckState};
22use crate::machine::{collect_machine_health, MdnsTracker};
23use crate::state::{load_health_state, save_health_state, HealthCheckConfig, HealthChecksState};
24use crate::state::{DEFAULT_INTERVAL_SECS, DEFAULT_TIMEOUT_SECS};
25
26pub use machine::MachineHealth;
27pub use service::ServiceCheckKind;
28pub use service::ServiceStatus;
29pub use service::ServiceStatus as HealthStatus;
30pub use state::HealthCheckConfig as HealthCheck;
31
32/// Default machine health threshold (seconds since last seen).
33pub const DEFAULT_MACHINE_THRESHOLD_SECS: u64 = 60;
34
35/// Events emitted by the health subsystem when service status changes.
36#[derive(Debug, Clone)]
37pub enum HealthEvent {
38    /// A service's health status changed.
39    StatusChanged { name: String, status: ServiceStatus },
40}
41
42/// Snapshot returned by health status queries.
43#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, utoipa::ToSchema)]
44pub struct HealthSnapshot {
45    pub machines: Vec<MachineHealth>,
46    pub services: Vec<ServiceHealth>,
47}
48
49/// Service health summary (config + current status).
50#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, utoipa::ToSchema)]
51pub struct ServiceHealth {
52    pub name: String,
53    pub kind: ServiceCheckKind,
54    pub target: String,
55    pub interval_secs: u64,
56    pub timeout_secs: u64,
57    pub status: ServiceStatus,
58    pub last_checked: Option<DateTime<Utc>>,
59    pub last_ok: Option<DateTime<Utc>>,
60    pub message: Option<String>,
61}
62
63/// Errors surfaced by the health domain.
64#[derive(Debug, thiserror::Error)]
65pub enum HealthError {
66    #[error("invalid health check: {0}")]
67    InvalidCheck(String),
68
69    #[error("health check not found: {0}")]
70    NotFound(String),
71
72    #[error("io error: {0}")]
73    Io(String),
74}
75
76/// Default timeout for the shared HTTP client used by health checks.
77const HTTP_CLIENT_TIMEOUT_SECS: u64 = 10;
78
79/// Core health facade.
80pub struct HealthCore {
81    mdns_tracker: Option<MdnsTracker>,
82    dns: Option<Arc<dyn DnsProbe>>,
83    certmesh: Option<Arc<dyn CertmeshSnapshot>>,
84    proxy: Option<Arc<dyn ProxySnapshot>>,
85    checks: Arc<RwLock<Vec<HealthCheckConfig>>>,
86    service_states: Arc<RwLock<HashMap<String, ServiceCheckState>>>,
87    machine_threshold: Duration,
88    started_at: Instant,
89    event_tx: broadcast::Sender<HealthEvent>,
90    http_client: reqwest::Client,
91}
92
93impl HealthCore {
94    pub async fn new(
95        mdns: Option<Arc<dyn MdnsSnapshot>>,
96        dns: Option<Arc<dyn DnsProbe>>,
97        certmesh: Option<Arc<dyn CertmeshSnapshot>>,
98        proxy: Option<Arc<dyn ProxySnapshot>>,
99    ) -> Self {
100        let checks = load_health_state()
101            .map(|state| state.checks)
102            .unwrap_or_default();
103
104        let mdns_tracker = match mdns {
105            Some(snapshot) => Some(MdnsTracker::spawn(snapshot).await),
106            None => None,
107        };
108
109        let (event_tx, _) = koi_common::events::event_channel();
110
111        let http_client = reqwest::Client::builder()
112            .timeout(Duration::from_secs(HTTP_CLIENT_TIMEOUT_SECS))
113            .build()
114            .unwrap_or_default();
115
116        Self {
117            mdns_tracker,
118            dns,
119            certmesh,
120            proxy,
121            checks: Arc::new(RwLock::new(checks)),
122            service_states: Arc::new(RwLock::new(HashMap::new())),
123            machine_threshold: Duration::from_secs(DEFAULT_MACHINE_THRESHOLD_SECS),
124            started_at: Instant::now(),
125            event_tx,
126            http_client,
127        }
128    }
129
130    pub fn started_at(&self) -> Instant {
131        self.started_at
132    }
133
134    /// Shared HTTP client for health checks.
135    pub(crate) fn http_client(&self) -> &reqwest::Client {
136        &self.http_client
137    }
138
139    pub async fn snapshot(&self) -> HealthSnapshot {
140        let mdns_snapshot = self
141            .mdns_tracker
142            .as_ref()
143            .map(|tracker| tracker.snapshot())
144            .unwrap_or_default();
145
146        let machines = collect_machine_health(
147            &mdns_snapshot,
148            self.dns.as_ref(),
149            self.certmesh.as_ref(),
150            self.machine_threshold,
151        );
152
153        let mut checks = self.checks.read().await.clone();
154        checks.extend(self.proxy_checks());
155        let states = self.service_states.read().await.clone();
156        let services = checks
157            .into_iter()
158            .map(|check| {
159                let state = states.get(&check.name).cloned().unwrap_or_default();
160                ServiceHealth {
161                    name: check.name,
162                    kind: check.kind,
163                    target: check.target,
164                    interval_secs: check.interval_secs,
165                    timeout_secs: check.timeout_secs,
166                    status: state.status,
167                    last_checked: state.last_checked,
168                    last_ok: state.last_ok,
169                    message: state.message,
170                }
171            })
172            .collect();
173
174        HealthSnapshot { machines, services }
175    }
176
177    pub async fn list_checks(&self) -> Vec<HealthCheckConfig> {
178        self.checks.read().await.clone()
179    }
180
181    pub async fn add_check(&self, check: HealthCheckConfig) -> Result<(), HealthError> {
182        service::validate_check(&check).map_err(HealthError::InvalidCheck)?;
183
184        let mut checks = self.checks.write().await;
185        if checks.iter().any(|c| c.name == check.name) {
186            return Err(HealthError::InvalidCheck(format!(
187                "check already exists: {}",
188                check.name
189            )));
190        }
191
192        checks.push(check);
193        let state = HealthChecksState {
194            checks: checks.clone(),
195        };
196        save_health_state(&state).map_err(|e| HealthError::Io(e.to_string()))?;
197        Ok(())
198    }
199
200    pub async fn remove_check(&self, name: &str) -> Result<(), HealthError> {
201        let mut checks = self.checks.write().await;
202        let before = checks.len();
203        checks.retain(|c| c.name != name);
204        if checks.len() == before {
205            return Err(HealthError::NotFound(name.to_string()));
206        }
207        let state = HealthChecksState {
208            checks: checks.clone(),
209        };
210        save_health_state(&state).map_err(|e| HealthError::Io(e.to_string()))?;
211
212        let mut states = self.service_states.write().await;
213        states.remove(name);
214        Ok(())
215    }
216
217    pub async fn run_checks_once(&self) {
218        run_checks_once(self, &self.service_states).await;
219    }
220
221    /// Subscribe to health events.
222    pub fn subscribe(&self) -> broadcast::Receiver<HealthEvent> {
223        self.event_tx.subscribe()
224    }
225
226    /// Emit a health event (used by the checker loop).
227    pub(crate) fn emit(&self, event: HealthEvent) {
228        let _ = self.event_tx.send(event);
229    }
230
231    /// Generate health checks from proxy entries.
232    pub(crate) fn proxy_checks(&self) -> Vec<HealthCheckConfig> {
233        let Some(proxy) = &self.proxy else {
234            return Vec::new();
235        };
236        proxy
237            .entries()
238            .into_iter()
239            .map(|entry| HealthCheckConfig {
240                name: format!("proxy:{}", entry.name),
241                kind: ServiceCheckKind::Http,
242                target: entry.backend,
243                interval_secs: DEFAULT_INTERVAL_SECS,
244                timeout_secs: DEFAULT_TIMEOUT_SECS,
245            })
246            .collect()
247    }
248}
249
250#[cfg(test)]
251mod tests {
252    use super::*;
253    use crate::service::ServiceCheckKind;
254    use crate::state::HealthCheckConfig;
255
256    /// Drives a real status transition through HealthCore: subscribe → add a TCP
257    /// check pointing at a closed local port → run_checks_once() (real TCP attempt
258    /// fails) → the Unknown→Down transition emits StatusChanged through the core's
259    /// own channel. Fails if the checker stops emitting (tests Koi, not tokio).
260    #[tokio::test]
261    async fn run_checks_emits_status_changed_through_core() {
262        let _ = koi_common::test::ensure_data_dir("koi-health-event-tests");
263
264        let core = HealthCore::new(None, None, None, None).await;
265        let mut rx = core.subscribe();
266
267        // Unique name so concurrent / repeated tests in the same process don't
268        // collide on the persisted check list. Port 1 is effectively never
269        // listening, so the TCP connect fails fast on localhost (no network).
270        let name = format!("evt-{}", koi_common::id::generate_short_id());
271        core.add_check(HealthCheckConfig {
272            name: name.clone(),
273            kind: ServiceCheckKind::Tcp,
274            target: "127.0.0.1:1".to_string(),
275            interval_secs: 1,
276            timeout_secs: 1,
277        })
278        .await
279        .expect("add_check should succeed");
280
281        // First run: Unknown -> Down, which emits a StatusChanged for our check.
282        core.run_checks_once().await;
283
284        let event = rx
285            .try_recv()
286            .expect("a StatusChanged should have been emitted");
287        let HealthEvent::StatusChanged {
288            name: evt_name,
289            status,
290        } = event;
291        assert_eq!(evt_name, name);
292        assert!(
293            matches!(status, ServiceStatus::Down),
294            "closed local port should report Down, got {status:?}"
295        );
296
297        // Cleanup the persisted check so the shared state file stays tidy.
298        let _ = core.remove_check(&name).await;
299    }
300}
301
302#[async_trait::async_trait]
303impl Capability for HealthCore {
304    fn name(&self) -> &str {
305        "health"
306    }
307
308    async fn status(&self) -> CapabilityStatus {
309        let (total, up) = match self.service_states.try_read() {
310            Ok(services) => {
311                let total = services.len();
312                let up = services
313                    .values()
314                    .filter(|s| matches!(s.status, ServiceStatus::Up))
315                    .count();
316                (total, up)
317            }
318            Err(_) => (0, 0),
319        };
320        let summary = format!("{} services up ({} total)", up, total);
321        CapabilityStatus {
322            name: "health".to_string(),
323            summary,
324            healthy: true,
325        }
326    }
327}
328
329/// Runtime controller for the background service checks.
330///
331/// A thin wrapper over the shared [`DomainRuntime`] start/stop machine; the only
332/// health-specific piece is the spawned loop (`run_checks_loop(core, token)`).
333#[derive(Clone)]
334pub struct HealthRuntime {
335    inner: DomainRuntime<HealthCore>,
336}
337
338#[derive(Debug, Clone, Copy, serde::Serialize)]
339pub struct HealthRuntimeStatus {
340    pub running: bool,
341}
342
343impl HealthRuntime {
344    pub fn new(core: Arc<HealthCore>) -> Self {
345        Self {
346            inner: DomainRuntime::new(core),
347        }
348    }
349
350    pub fn core(&self) -> Arc<HealthCore> {
351        self.inner.core()
352    }
353
354    pub async fn start(&self) -> Result<bool, HealthError> {
355        let core = self.inner.core();
356        // DomainRuntime::start signals already-running via Ok(false) and never yields
357        // AlreadyRunning for this launcher; the Result<_, HealthError> shape is preserved.
358        let started = self
359            .inner
360            .start(move |token| tokio::spawn(run_checks_loop(core, token)))
361            .await
362            .unwrap_or(false);
363        Ok(started)
364    }
365
366    pub async fn stop(&self) -> bool {
367        self.inner.stop().await
368    }
369
370    pub async fn status(&self) -> HealthRuntimeStatus {
371        HealthRuntimeStatus {
372            running: self.inner.status().await.running,
373        }
374    }
375}