1mod checker;
4pub mod http;
5pub mod log;
6mod machine;
7mod service;
8mod state;
9
10use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13
14use chrono::{DateTime, Utc};
15use tokio::sync::{broadcast, RwLock};
16
17use koi_common::capability::{Capability, CapabilityStatus};
18use koi_common::integration::{CertmeshSnapshot, DnsProbe, MdnsSnapshot, ProxySnapshot};
19use koi_common::runtime_state::DomainRuntime;
20
21use crate::checker::{run_checks_loop, run_checks_once, ServiceCheckState};
22use crate::machine::{collect_machine_health, MdnsTracker};
23use crate::state::{load_health_state, save_health_state, HealthCheckConfig, HealthChecksState};
24use crate::state::{DEFAULT_INTERVAL_SECS, DEFAULT_TIMEOUT_SECS};
25
26pub use machine::MachineHealth;
27pub use service::ServiceCheckKind;
28pub use service::ServiceStatus;
29pub use service::ServiceStatus as HealthStatus;
30pub use state::HealthCheckConfig as HealthCheck;
31
32pub const DEFAULT_MACHINE_THRESHOLD_SECS: u64 = 60;
34
35#[derive(Debug, Clone)]
37pub enum HealthEvent {
38 StatusChanged { name: String, status: ServiceStatus },
40}
41
42#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, utoipa::ToSchema)]
44pub struct HealthSnapshot {
45 pub machines: Vec<MachineHealth>,
46 pub services: Vec<ServiceHealth>,
47}
48
49#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, utoipa::ToSchema)]
51pub struct ServiceHealth {
52 pub name: String,
53 pub kind: ServiceCheckKind,
54 pub target: String,
55 pub interval_secs: u64,
56 pub timeout_secs: u64,
57 pub status: ServiceStatus,
58 pub last_checked: Option<DateTime<Utc>>,
59 pub last_ok: Option<DateTime<Utc>>,
60 pub message: Option<String>,
61}
62
63#[derive(Debug, thiserror::Error)]
65pub enum HealthError {
66 #[error("invalid health check: {0}")]
67 InvalidCheck(String),
68
69 #[error("health check not found: {0}")]
70 NotFound(String),
71
72 #[error("io error: {0}")]
73 Io(String),
74}
75
76const HTTP_CLIENT_TIMEOUT_SECS: u64 = 10;
78
79pub struct HealthCore {
81 mdns_tracker: Option<MdnsTracker>,
82 dns: Option<Arc<dyn DnsProbe>>,
83 certmesh: Option<Arc<dyn CertmeshSnapshot>>,
84 proxy: Option<Arc<dyn ProxySnapshot>>,
85 checks: Arc<RwLock<Vec<HealthCheckConfig>>>,
86 service_states: Arc<RwLock<HashMap<String, ServiceCheckState>>>,
87 machine_threshold: Duration,
88 started_at: Instant,
89 event_tx: broadcast::Sender<HealthEvent>,
90 http_client: reqwest::Client,
91}
92
93impl HealthCore {
94 pub async fn new(
95 mdns: Option<Arc<dyn MdnsSnapshot>>,
96 dns: Option<Arc<dyn DnsProbe>>,
97 certmesh: Option<Arc<dyn CertmeshSnapshot>>,
98 proxy: Option<Arc<dyn ProxySnapshot>>,
99 ) -> Self {
100 let checks = load_health_state()
101 .map(|state| state.checks)
102 .unwrap_or_default();
103
104 let mdns_tracker = match mdns {
105 Some(snapshot) => Some(MdnsTracker::spawn(snapshot).await),
106 None => None,
107 };
108
109 let (event_tx, _) = koi_common::events::event_channel();
110
111 let http_client = reqwest::Client::builder()
112 .timeout(Duration::from_secs(HTTP_CLIENT_TIMEOUT_SECS))
113 .build()
114 .unwrap_or_default();
115
116 Self {
117 mdns_tracker,
118 dns,
119 certmesh,
120 proxy,
121 checks: Arc::new(RwLock::new(checks)),
122 service_states: Arc::new(RwLock::new(HashMap::new())),
123 machine_threshold: Duration::from_secs(DEFAULT_MACHINE_THRESHOLD_SECS),
124 started_at: Instant::now(),
125 event_tx,
126 http_client,
127 }
128 }
129
130 pub fn started_at(&self) -> Instant {
131 self.started_at
132 }
133
134 pub(crate) fn http_client(&self) -> &reqwest::Client {
136 &self.http_client
137 }
138
139 pub async fn snapshot(&self) -> HealthSnapshot {
140 let mdns_snapshot = self
141 .mdns_tracker
142 .as_ref()
143 .map(|tracker| tracker.snapshot())
144 .unwrap_or_default();
145
146 let machines = collect_machine_health(
147 &mdns_snapshot,
148 self.dns.as_ref(),
149 self.certmesh.as_ref(),
150 self.machine_threshold,
151 );
152
153 let mut checks = self.checks.read().await.clone();
154 checks.extend(self.proxy_checks());
155 let states = self.service_states.read().await.clone();
156 let services = checks
157 .into_iter()
158 .map(|check| {
159 let state = states.get(&check.name).cloned().unwrap_or_default();
160 ServiceHealth {
161 name: check.name,
162 kind: check.kind,
163 target: check.target,
164 interval_secs: check.interval_secs,
165 timeout_secs: check.timeout_secs,
166 status: state.status,
167 last_checked: state.last_checked,
168 last_ok: state.last_ok,
169 message: state.message,
170 }
171 })
172 .collect();
173
174 HealthSnapshot { machines, services }
175 }
176
177 pub async fn list_checks(&self) -> Vec<HealthCheckConfig> {
178 self.checks.read().await.clone()
179 }
180
181 pub async fn add_check(&self, check: HealthCheckConfig) -> Result<(), HealthError> {
182 service::validate_check(&check).map_err(HealthError::InvalidCheck)?;
183
184 let mut checks = self.checks.write().await;
185 if checks.iter().any(|c| c.name == check.name) {
186 return Err(HealthError::InvalidCheck(format!(
187 "check already exists: {}",
188 check.name
189 )));
190 }
191
192 checks.push(check);
193 let state = HealthChecksState {
194 checks: checks.clone(),
195 };
196 save_health_state(&state).map_err(|e| HealthError::Io(e.to_string()))?;
197 Ok(())
198 }
199
200 pub async fn remove_check(&self, name: &str) -> Result<(), HealthError> {
201 let mut checks = self.checks.write().await;
202 let before = checks.len();
203 checks.retain(|c| c.name != name);
204 if checks.len() == before {
205 return Err(HealthError::NotFound(name.to_string()));
206 }
207 let state = HealthChecksState {
208 checks: checks.clone(),
209 };
210 save_health_state(&state).map_err(|e| HealthError::Io(e.to_string()))?;
211
212 let mut states = self.service_states.write().await;
213 states.remove(name);
214 Ok(())
215 }
216
217 pub async fn run_checks_once(&self) {
218 run_checks_once(self, &self.service_states).await;
219 }
220
221 pub fn subscribe(&self) -> broadcast::Receiver<HealthEvent> {
223 self.event_tx.subscribe()
224 }
225
226 pub(crate) fn emit(&self, event: HealthEvent) {
228 let _ = self.event_tx.send(event);
229 }
230
231 pub(crate) fn proxy_checks(&self) -> Vec<HealthCheckConfig> {
233 let Some(proxy) = &self.proxy else {
234 return Vec::new();
235 };
236 proxy
237 .entries()
238 .into_iter()
239 .map(|entry| HealthCheckConfig {
240 name: format!("proxy:{}", entry.name),
241 kind: ServiceCheckKind::Http,
242 target: entry.backend,
243 interval_secs: DEFAULT_INTERVAL_SECS,
244 timeout_secs: DEFAULT_TIMEOUT_SECS,
245 })
246 .collect()
247 }
248}
249
250#[cfg(test)]
251mod tests {
252 use super::*;
253 use crate::service::ServiceCheckKind;
254 use crate::state::HealthCheckConfig;
255
256 #[tokio::test]
261 async fn run_checks_emits_status_changed_through_core() {
262 let _ = koi_common::test::ensure_data_dir("koi-health-event-tests");
263
264 let core = HealthCore::new(None, None, None, None).await;
265 let mut rx = core.subscribe();
266
267 let name = format!("evt-{}", koi_common::id::generate_short_id());
271 core.add_check(HealthCheckConfig {
272 name: name.clone(),
273 kind: ServiceCheckKind::Tcp,
274 target: "127.0.0.1:1".to_string(),
275 interval_secs: 1,
276 timeout_secs: 1,
277 })
278 .await
279 .expect("add_check should succeed");
280
281 core.run_checks_once().await;
283
284 let event = rx
285 .try_recv()
286 .expect("a StatusChanged should have been emitted");
287 let HealthEvent::StatusChanged {
288 name: evt_name,
289 status,
290 } = event;
291 assert_eq!(evt_name, name);
292 assert!(
293 matches!(status, ServiceStatus::Down),
294 "closed local port should report Down, got {status:?}"
295 );
296
297 let _ = core.remove_check(&name).await;
299 }
300
301 #[tokio::test]
306 async fn run_checks_probe_concurrently() {
307 let _ = koi_common::test::ensure_data_dir("koi-health-concurrency-tests");
308
309 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
313 let addr = listener.local_addr().unwrap();
314 let _server = tokio::spawn(async move {
315 let mut held = Vec::new();
316 while let Ok((stream, _)) = listener.accept().await {
317 held.push(stream);
318 }
319 });
320
321 let core = HealthCore::new(None, None, None, None).await;
322
323 let timeout_secs = 1u64;
324 let n = 4usize;
325 let mut names = Vec::new();
326 for _ in 0..n {
327 let name = format!("conc-{}", koi_common::id::generate_short_id());
328 core.add_check(HealthCheckConfig {
329 name: name.clone(),
330 kind: ServiceCheckKind::Http,
331 target: format!("http://{addr}/"),
332 interval_secs: 1,
333 timeout_secs,
334 })
335 .await
336 .expect("add_check should succeed");
337 names.push(name);
338 }
339
340 let start = std::time::Instant::now();
341 core.run_checks_once().await;
342 let elapsed = start.elapsed();
343
344 assert!(
347 elapsed >= std::time::Duration::from_millis(700),
348 "probes should have hit their ~1s timeout, took {elapsed:?}"
349 );
350 assert!(
351 elapsed < std::time::Duration::from_millis(3000),
352 "{n} x {timeout_secs}s checks must run concurrently (sequential would be ~{}s), took {elapsed:?}",
353 n as u64 * timeout_secs
354 );
355
356 for name in &names {
357 let _ = core.remove_check(name).await;
358 }
359 }
360}
361
362#[async_trait::async_trait]
363impl Capability for HealthCore {
364 fn name(&self) -> &str {
365 "health"
366 }
367
368 async fn status(&self) -> CapabilityStatus {
369 let (total, up) = match self.service_states.try_read() {
370 Ok(services) => {
371 let total = services.len();
372 let up = services
373 .values()
374 .filter(|s| matches!(s.status, ServiceStatus::Up))
375 .count();
376 (total, up)
377 }
378 Err(_) => (0, 0),
379 };
380 let summary = format!("{} services up ({} total)", up, total);
381 CapabilityStatus {
382 name: "health".to_string(),
383 summary,
384 healthy: true,
385 }
386 }
387}
388
389#[derive(Clone)]
394pub struct HealthRuntime {
395 inner: DomainRuntime<HealthCore>,
396}
397
398#[derive(Debug, Clone, Copy, serde::Serialize)]
399pub struct HealthRuntimeStatus {
400 pub running: bool,
401}
402
403impl HealthRuntime {
404 pub fn new(core: Arc<HealthCore>) -> Self {
405 Self {
406 inner: DomainRuntime::new(core),
407 }
408 }
409
410 pub fn core(&self) -> Arc<HealthCore> {
411 self.inner.core()
412 }
413
414 pub async fn start(&self) -> Result<bool, HealthError> {
415 let core = self.inner.core();
416 let started = self
419 .inner
420 .start(move |token| tokio::spawn(run_checks_loop(core, token)))
421 .await
422 .unwrap_or(false);
423 Ok(started)
424 }
425
426 pub async fn stop(&self) -> bool {
427 self.inner.stop().await
428 }
429
430 pub async fn status(&self) -> HealthRuntimeStatus {
431 HealthRuntimeStatus {
432 running: self.inner.status().await.running,
433 }
434 }
435}