1mod checker;
4pub mod http;
5pub mod log;
6mod machine;
7mod service;
8mod state;
9
10use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13
14use chrono::{DateTime, Utc};
15use tokio::sync::{broadcast, RwLock};
16use tokio_util::sync::CancellationToken;
17
18use koi_common::capability::{Capability, CapabilityStatus};
19use koi_common::integration::{CertmeshSnapshot, DnsProbe, MdnsSnapshot, ProxySnapshot};
20
21use crate::checker::{run_checks_loop, run_checks_once, ServiceCheckState};
22use crate::machine::{collect_machine_health, MdnsTracker};
23use crate::state::{load_health_state, save_health_state, HealthCheckConfig, HealthChecksState};
24use crate::state::{DEFAULT_INTERVAL_SECS, DEFAULT_TIMEOUT_SECS};
25
26pub use machine::MachineHealth;
27pub use service::ServiceCheckKind;
28pub use service::ServiceStatus;
29pub use service::ServiceStatus as HealthStatus;
30pub use state::HealthCheckConfig as HealthCheck;
31
32pub const DEFAULT_MACHINE_THRESHOLD_SECS: u64 = 60;
34
35const BROADCAST_CHANNEL_CAPACITY: usize = 256;
37
38#[derive(Debug, Clone)]
40pub enum HealthEvent {
41 StatusChanged { name: String, status: ServiceStatus },
43}
44
45#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, utoipa::ToSchema)]
47pub struct HealthSnapshot {
48 pub machines: Vec<MachineHealth>,
49 pub services: Vec<ServiceHealth>,
50}
51
52#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, utoipa::ToSchema)]
54pub struct ServiceHealth {
55 pub name: String,
56 pub kind: ServiceCheckKind,
57 pub target: String,
58 pub interval_secs: u64,
59 pub timeout_secs: u64,
60 pub status: ServiceStatus,
61 pub last_checked: Option<DateTime<Utc>>,
62 pub last_ok: Option<DateTime<Utc>>,
63 pub message: Option<String>,
64}
65
66#[derive(Debug, thiserror::Error)]
68pub enum HealthError {
69 #[error("invalid health check: {0}")]
70 InvalidCheck(String),
71
72 #[error("health check not found: {0}")]
73 NotFound(String),
74
75 #[error("io error: {0}")]
76 Io(String),
77}
78
79const HTTP_CLIENT_TIMEOUT_SECS: u64 = 10;
81
82pub struct HealthCore {
84 mdns_tracker: Option<MdnsTracker>,
85 dns: Option<Arc<dyn DnsProbe>>,
86 certmesh: Option<Arc<dyn CertmeshSnapshot>>,
87 proxy: Option<Arc<dyn ProxySnapshot>>,
88 checks: Arc<RwLock<Vec<HealthCheckConfig>>>,
89 service_states: Arc<RwLock<HashMap<String, ServiceCheckState>>>,
90 machine_threshold: Duration,
91 started_at: Instant,
92 event_tx: broadcast::Sender<HealthEvent>,
93 http_client: reqwest::Client,
94}
95
96impl HealthCore {
97 pub async fn new(
98 mdns: Option<Arc<dyn MdnsSnapshot>>,
99 dns: Option<Arc<dyn DnsProbe>>,
100 certmesh: Option<Arc<dyn CertmeshSnapshot>>,
101 proxy: Option<Arc<dyn ProxySnapshot>>,
102 ) -> Self {
103 let checks = load_health_state()
104 .map(|state| state.checks)
105 .unwrap_or_default();
106
107 let mdns_tracker = match mdns {
108 Some(snapshot) => Some(MdnsTracker::spawn(snapshot).await),
109 None => None,
110 };
111
112 let (event_tx, _) = broadcast::channel(BROADCAST_CHANNEL_CAPACITY);
113
114 let http_client = reqwest::Client::builder()
115 .timeout(Duration::from_secs(HTTP_CLIENT_TIMEOUT_SECS))
116 .build()
117 .unwrap_or_default();
118
119 Self {
120 mdns_tracker,
121 dns,
122 certmesh,
123 proxy,
124 checks: Arc::new(RwLock::new(checks)),
125 service_states: Arc::new(RwLock::new(HashMap::new())),
126 machine_threshold: Duration::from_secs(DEFAULT_MACHINE_THRESHOLD_SECS),
127 started_at: Instant::now(),
128 event_tx,
129 http_client,
130 }
131 }
132
133 pub fn started_at(&self) -> Instant {
134 self.started_at
135 }
136
137 pub(crate) fn http_client(&self) -> &reqwest::Client {
139 &self.http_client
140 }
141
142 pub async fn snapshot(&self) -> HealthSnapshot {
143 let mdns_snapshot = self
144 .mdns_tracker
145 .as_ref()
146 .map(|tracker| tracker.snapshot())
147 .unwrap_or_default();
148
149 let machines = collect_machine_health(
150 &mdns_snapshot,
151 self.dns.as_ref(),
152 self.certmesh.as_ref(),
153 self.machine_threshold,
154 );
155
156 let mut checks = self.checks.read().await.clone();
157 checks.extend(self.proxy_checks());
158 let states = self.service_states.read().await.clone();
159 let services = checks
160 .into_iter()
161 .map(|check| {
162 let state = states.get(&check.name).cloned().unwrap_or_default();
163 ServiceHealth {
164 name: check.name,
165 kind: check.kind,
166 target: check.target,
167 interval_secs: check.interval_secs,
168 timeout_secs: check.timeout_secs,
169 status: state.status,
170 last_checked: state.last_checked,
171 last_ok: state.last_ok,
172 message: state.message,
173 }
174 })
175 .collect();
176
177 HealthSnapshot { machines, services }
178 }
179
180 pub async fn list_checks(&self) -> Vec<HealthCheckConfig> {
181 self.checks.read().await.clone()
182 }
183
184 pub async fn add_check(&self, check: HealthCheckConfig) -> Result<(), HealthError> {
185 service::validate_check(&check).map_err(HealthError::InvalidCheck)?;
186
187 let mut checks = self.checks.write().await;
188 if checks.iter().any(|c| c.name == check.name) {
189 return Err(HealthError::InvalidCheck(format!(
190 "check already exists: {}",
191 check.name
192 )));
193 }
194
195 checks.push(check);
196 let state = HealthChecksState {
197 checks: checks.clone(),
198 };
199 save_health_state(&state).map_err(|e| HealthError::Io(e.to_string()))?;
200 Ok(())
201 }
202
203 pub async fn remove_check(&self, name: &str) -> Result<(), HealthError> {
204 let mut checks = self.checks.write().await;
205 let before = checks.len();
206 checks.retain(|c| c.name != name);
207 if checks.len() == before {
208 return Err(HealthError::NotFound(name.to_string()));
209 }
210 let state = HealthChecksState {
211 checks: checks.clone(),
212 };
213 save_health_state(&state).map_err(|e| HealthError::Io(e.to_string()))?;
214
215 let mut states = self.service_states.write().await;
216 states.remove(name);
217 Ok(())
218 }
219
220 pub async fn run_checks_once(&self) {
221 run_checks_once(self, &self.service_states).await;
222 }
223
224 pub fn subscribe(&self) -> broadcast::Receiver<HealthEvent> {
226 self.event_tx.subscribe()
227 }
228
229 pub(crate) fn emit(&self, event: HealthEvent) {
231 let _ = self.event_tx.send(event);
232 }
233
234 pub(crate) fn proxy_checks(&self) -> Vec<HealthCheckConfig> {
236 let Some(proxy) = &self.proxy else {
237 return Vec::new();
238 };
239 proxy
240 .entries()
241 .into_iter()
242 .map(|entry| HealthCheckConfig {
243 name: format!("proxy:{}", entry.name),
244 kind: ServiceCheckKind::Http,
245 target: entry.backend,
246 interval_secs: DEFAULT_INTERVAL_SECS,
247 timeout_secs: DEFAULT_TIMEOUT_SECS,
248 })
249 .collect()
250 }
251}
252
253#[cfg(test)]
254mod tests {
255 use super::*;
256
257 #[test]
258 fn subscribe_receives_emitted_status_changed() {
259 let (tx, _) = broadcast::channel::<HealthEvent>(16);
260 let mut rx = tx.subscribe();
261
262 let _ = tx.send(HealthEvent::StatusChanged {
263 name: "my-tcp".to_string(),
264 status: ServiceStatus::Up,
265 });
266
267 let event = rx.try_recv().expect("should receive event");
268 match event {
269 HealthEvent::StatusChanged { name, status } => {
270 assert_eq!(name, "my-tcp");
271 assert!(matches!(status, ServiceStatus::Up));
272 }
273 }
274 }
275
276 #[test]
277 fn subscribe_receives_status_down() {
278 let (tx, _) = broadcast::channel::<HealthEvent>(16);
279 let mut rx = tx.subscribe();
280
281 let _ = tx.send(HealthEvent::StatusChanged {
282 name: "my-http".to_string(),
283 status: ServiceStatus::Down,
284 });
285
286 let event = rx.try_recv().expect("should receive event");
287 match event {
288 HealthEvent::StatusChanged { name, status } => {
289 assert_eq!(name, "my-http");
290 assert!(matches!(status, ServiceStatus::Down));
291 }
292 }
293 }
294
295 #[test]
296 fn no_event_when_no_send() {
297 let (tx, _) = broadcast::channel::<HealthEvent>(16);
298 let mut rx = tx.subscribe();
299 assert!(rx.try_recv().is_err());
300 drop(tx);
301 }
302}
303
304impl Capability for HealthCore {
305 fn name(&self) -> &str {
306 "health"
307 }
308
309 fn status(&self) -> CapabilityStatus {
310 let (total, up) = match self.service_states.try_read() {
311 Ok(services) => {
312 let total = services.len();
313 let up = services
314 .values()
315 .filter(|s| matches!(s.status, ServiceStatus::Up))
316 .count();
317 (total, up)
318 }
319 Err(_) => (0, 0),
320 };
321 let summary = format!("{} services up ({} total)", up, total);
322 CapabilityStatus {
323 name: "health".to_string(),
324 summary,
325 healthy: true,
326 }
327 }
328}
329
330pub struct HealthRuntime {
332 core: Arc<HealthCore>,
333 state: Arc<tokio::sync::Mutex<RuntimeState>>,
334}
335
336#[derive(Debug, Clone, Copy, serde::Serialize)]
337pub struct HealthRuntimeStatus {
338 pub running: bool,
339}
340
341struct RuntimeState {
342 running: bool,
343 cancel: Option<CancellationToken>,
344}
345
346impl HealthRuntime {
347 pub fn new(core: Arc<HealthCore>) -> Self {
348 Self {
349 core,
350 state: Arc::new(tokio::sync::Mutex::new(RuntimeState {
351 running: false,
352 cancel: None,
353 })),
354 }
355 }
356
357 pub fn core(&self) -> Arc<HealthCore> {
358 Arc::clone(&self.core)
359 }
360
361 pub async fn start(&self) -> Result<bool, HealthError> {
362 let mut state = self.state.lock().await;
363 if state.running {
364 return Ok(false);
365 }
366 let token = CancellationToken::new();
367 state.cancel = Some(token.clone());
368 state.running = true;
369 drop(state);
370
371 let core = Arc::clone(&self.core);
372 let state = Arc::clone(&self.state);
373 tokio::spawn(async move {
374 run_checks_loop(core, token).await;
375 let mut guard = state.lock().await;
376 guard.running = false;
377 guard.cancel = None;
378 });
379
380 Ok(true)
381 }
382
383 pub async fn stop(&self) -> bool {
384 let mut state = self.state.lock().await;
385 if let Some(token) = state.cancel.take() {
386 token.cancel();
387 state.running = false;
388 true
389 } else {
390 false
391 }
392 }
393
394 pub async fn status(&self) -> HealthRuntimeStatus {
395 let state = self.state.lock().await;
396 HealthRuntimeStatus {
397 running: state.running,
398 }
399 }
400}