1#![allow(dead_code)]
17
18use super::HealthStatus;
19use crate::Result;
20use async_trait::async_trait;
21use serde_json::Value as JsonValue;
22use tokio::time::{Duration, timeout};
23
24type BoxedAsyncUsizeFn = Box<
26 dyn Fn() -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<usize>> + Send>>
27 + Send
28 + Sync,
29>;
30type BoxedAsyncBoolFn = Box<
31 dyn Fn() -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<bool>> + Send>>
32 + Send
33 + Sync,
34>;
35
36#[async_trait]
38pub trait ComponentChecker: Send + Sync {
39 async fn check(&self) -> Result<HealthStatus>;
41
42 async fn debug_info(&self) -> Option<JsonValue> {
44 None
45 }
46}
47
48pub struct NetworkHealthChecker {
53 get_peer_count: BoxedAsyncUsizeFn,
54 min_peers: usize,
55 timeout_duration: Duration,
56}
57
58impl NetworkHealthChecker {
59 pub fn new<F, Fut>(get_peer_count: F) -> Self
61 where
62 F: Fn() -> Fut + Send + Sync + 'static,
63 Fut: std::future::Future<Output = Result<usize>> + Send + 'static,
64 {
65 Self {
66 get_peer_count: Box::new(move || Box::pin(get_peer_count())),
67 min_peers: 1,
68 timeout_duration: Duration::from_millis(50),
69 }
70 }
71
72 pub fn with_min_peers(mut self, min_peers: usize) -> Self {
74 self.min_peers = min_peers;
75 self
76 }
77}
78
79#[async_trait]
80impl ComponentChecker for NetworkHealthChecker {
81 async fn check(&self) -> Result<HealthStatus> {
82 let future = (self.get_peer_count)();
84 match timeout(self.timeout_duration, future).await {
85 Ok(Ok(count)) => {
86 if count >= self.min_peers {
87 Ok(HealthStatus::Healthy)
88 } else if count > 0 {
89 Ok(HealthStatus::Degraded)
90 } else {
91 Ok(HealthStatus::Unhealthy)
92 }
93 }
94 Ok(Err(_)) => Ok(HealthStatus::Unhealthy),
95 Err(_) => Ok(HealthStatus::Unhealthy), }
97 }
98
99 async fn debug_info(&self) -> Option<JsonValue> {
100 let future = (self.get_peer_count)();
101 if let Ok(Ok(count)) = timeout(self.timeout_duration, future).await {
102 Some(serde_json::json!({
103 "peer_count": count,
104 "min_peers": self.min_peers,
105 }))
106 } else {
107 None
108 }
109 }
110}
111
112pub struct DhtHealthChecker {
117 get_routing_table_size: BoxedAsyncUsizeFn,
118 min_nodes: usize,
119 timeout_duration: Duration,
120}
121
122impl DhtHealthChecker {
123 pub fn new<F, Fut>(get_routing_table_size: F) -> Self
125 where
126 F: Fn() -> Fut + Send + Sync + 'static,
127 Fut: std::future::Future<Output = Result<usize>> + Send + 'static,
128 {
129 Self {
130 get_routing_table_size: Box::new(move || Box::pin(get_routing_table_size())),
131 min_nodes: 3,
132 timeout_duration: Duration::from_millis(50),
133 }
134 }
135
136 pub fn with_min_nodes(mut self, min_nodes: usize) -> Self {
138 self.min_nodes = min_nodes;
139 self
140 }
141}
142
143#[async_trait]
144impl ComponentChecker for DhtHealthChecker {
145 async fn check(&self) -> Result<HealthStatus> {
146 let future = (self.get_routing_table_size)();
148 match timeout(self.timeout_duration, future).await {
149 Ok(Ok(size)) => {
150 if size >= self.min_nodes {
151 Ok(HealthStatus::Healthy)
152 } else if size > 0 {
153 Ok(HealthStatus::Degraded)
154 } else {
155 Ok(HealthStatus::Unhealthy)
156 }
157 }
158 Ok(Err(_)) => Ok(HealthStatus::Unhealthy),
159 Err(_) => Ok(HealthStatus::Unhealthy), }
161 }
162
163 async fn debug_info(&self) -> Option<JsonValue> {
164 let future = (self.get_routing_table_size)();
165 if let Ok(Ok(size)) = timeout(self.timeout_duration, future).await {
166 Some(serde_json::json!({
167 "routing_table_size": size,
168 "min_nodes": self.min_nodes,
169 "replication_factor": 8, }))
171 } else {
172 None
173 }
174 }
175}
176
177pub struct StorageHealthChecker {
179 storage_path: std::path::PathBuf,
180 min_free_space: u64,
181 timeout_duration: Duration,
182}
183
184impl StorageHealthChecker {
185 pub fn new(storage_path: std::path::PathBuf) -> Self {
187 Self {
188 storage_path,
189 min_free_space: 100 * 1024 * 1024, timeout_duration: Duration::from_millis(50),
191 }
192 }
193
194 pub fn with_min_free_space(mut self, bytes: u64) -> Self {
196 self.min_free_space = bytes;
197 self
198 }
199}
200
201#[async_trait]
202impl ComponentChecker for StorageHealthChecker {
203 async fn check(&self) -> Result<HealthStatus> {
204 let path = self.storage_path.clone();
206 let min_free = self.min_free_space;
207
208 match timeout(
209 self.timeout_duration,
210 tokio::task::spawn_blocking(move || check_storage_health(&path, min_free)),
211 )
212 .await
213 {
214 Ok(Ok(status)) => Ok(status),
215 Ok(Err(_)) => Ok(HealthStatus::Unhealthy),
216 Err(_) => Ok(HealthStatus::Unhealthy), }
218 }
219
220 async fn debug_info(&self) -> Option<JsonValue> {
221 if let Ok(metadata) = tokio::fs::metadata(&self.storage_path).await {
222 Some(serde_json::json!({
224 "path": self.storage_path.display().to_string(),
225 "exists": true,
226 "is_dir": metadata.is_dir(),
227 "min_free_space": self.min_free_space,
228 }))
229 } else {
230 Some(serde_json::json!({
231 "path": self.storage_path.display().to_string(),
232 "exists": false,
233 }))
234 }
235 }
236}
237
238fn check_storage_health(path: &std::path::Path, min_free_space: u64) -> HealthStatus {
240 use std::fs;
241
242 if !path.exists() {
244 return HealthStatus::Unhealthy;
245 }
246
247 let test_file = path.join(".health_check");
249 match fs::write(&test_file, b"health_check") {
250 Ok(_) => {
251 let _ = fs::remove_file(&test_file);
253
254 if get_free_space(path) >= min_free_space {
257 HealthStatus::Healthy
258 } else {
259 HealthStatus::Degraded
260 }
261 }
262 Err(_) => HealthStatus::Unhealthy,
263 }
264}
265
266fn get_free_space(_path: &std::path::Path) -> u64 {
268 1024 * 1024 * 1024 }
271
272use crate::production::ResourceManager;
273use std::sync::Arc;
274
275pub struct ResourceHealthChecker {
277 resource_manager: Arc<ResourceManager>,
278 max_memory_percent: f64,
279 max_cpu_percent: f64,
280 timeout_duration: Duration,
281}
282
283impl ResourceHealthChecker {
284 pub fn new(resource_manager: Arc<ResourceManager>) -> Self {
286 Self {
287 resource_manager,
288 max_memory_percent: 80.0,
289 max_cpu_percent: 90.0,
290 timeout_duration: Duration::from_millis(50),
291 }
292 }
293}
294
295#[async_trait]
296impl ComponentChecker for ResourceHealthChecker {
297 async fn check(&self) -> Result<HealthStatus> {
298 match timeout(self.timeout_duration, async {
299 self.resource_manager.get_metrics().await
300 })
301 .await
302 {
303 Ok(metrics) => {
304 if metrics.cpu_usage > self.max_cpu_percent {
306 return Ok(HealthStatus::Unhealthy);
307 }
308
309 let memory_percent = if self.resource_manager.config.max_memory_bytes > 0 {
311 (metrics.memory_used as f64
312 / self.resource_manager.config.max_memory_bytes as f64)
313 * 100.0
314 } else {
315 0.0
316 };
317
318 if memory_percent > self.max_memory_percent {
319 Ok(HealthStatus::Degraded)
320 } else {
321 Ok(HealthStatus::Healthy)
322 }
323 }
324 _ => Ok(HealthStatus::Unhealthy), }
326 }
327
328 async fn debug_info(&self) -> Option<JsonValue> {
329 let metrics = self.resource_manager.get_metrics().await;
330 Some(serde_json::json!({
331 "memory_used": metrics.memory_used,
332 "active_connections": metrics.active_connections,
333 "bandwidth_usage": metrics.bandwidth_usage,
334 "cpu_usage": metrics.cpu_usage,
335 "dht_ops_per_sec": metrics.dht_metrics.ops_per_sec,
336 }))
337 }
338}
339
340pub struct TransportHealthChecker {
345 is_listening: BoxedAsyncBoolFn,
346 timeout_duration: Duration,
347}
348
349impl TransportHealthChecker {
350 pub fn new<F, Fut>(is_listening: F) -> Self
352 where
353 F: Fn() -> Fut + Send + Sync + 'static,
354 Fut: std::future::Future<Output = Result<bool>> + Send + 'static,
355 {
356 Self {
357 is_listening: Box::new(move || Box::pin(is_listening())),
358 timeout_duration: Duration::from_millis(50),
359 }
360 }
361}
362
363#[async_trait]
364impl ComponentChecker for TransportHealthChecker {
365 async fn check(&self) -> Result<HealthStatus> {
366 let future = (self.is_listening)();
367 match timeout(self.timeout_duration, future).await {
368 Ok(Ok(true)) => Ok(HealthStatus::Healthy),
369 Ok(Ok(false)) => Ok(HealthStatus::Unhealthy),
370 Ok(Err(_)) => Ok(HealthStatus::Unhealthy),
371 Err(_) => Ok(HealthStatus::Unhealthy), }
373 }
374
375 async fn debug_info(&self) -> Option<JsonValue> {
376 let future = (self.is_listening)();
377 if let Ok(Ok(listening)) = timeout(self.timeout_duration, future).await {
378 Some(serde_json::json!({
379 "is_listening": listening,
380 "transport_type": "p2p",
381 }))
382 } else {
383 None
384 }
385 }
386}
387
388pub struct PeerHealthChecker {
393 get_peer_count: BoxedAsyncUsizeFn,
394 min_peers: usize,
395 max_peers: usize,
396 timeout_duration: Duration,
397}
398
399impl PeerHealthChecker {
400 pub fn new<F, Fut>(get_peer_count: F) -> Self
402 where
403 F: Fn() -> Fut + Send + Sync + 'static,
404 Fut: std::future::Future<Output = Result<usize>> + Send + 'static,
405 {
406 Self {
407 get_peer_count: Box::new(move || Box::pin(get_peer_count())),
408 min_peers: 1,
409 max_peers: 1000,
410 timeout_duration: Duration::from_millis(50),
411 }
412 }
413
414 pub fn with_peer_limits(mut self, min: usize, max: usize) -> Self {
416 self.min_peers = min;
417 self.max_peers = max;
418 self
419 }
420}
421
422#[async_trait]
423impl ComponentChecker for PeerHealthChecker {
424 async fn check(&self) -> Result<HealthStatus> {
425 let future = (self.get_peer_count)();
426 match timeout(self.timeout_duration, future).await {
427 Ok(Ok(count)) => {
428 if count < self.min_peers {
429 Ok(HealthStatus::Unhealthy)
430 } else if count > self.max_peers {
431 Ok(HealthStatus::Degraded) } else {
433 Ok(HealthStatus::Healthy)
434 }
435 }
436 Ok(Err(_)) => Ok(HealthStatus::Unhealthy),
437 Err(_) => Ok(HealthStatus::Unhealthy), }
439 }
440
441 async fn debug_info(&self) -> Option<JsonValue> {
442 let future = (self.get_peer_count)();
443 if let Ok(Ok(count)) = timeout(self.timeout_duration, future).await {
444 Some(serde_json::json!({
445 "peer_count": count,
446 "min_peers": self.min_peers,
447 "max_peers": self.max_peers,
448 }))
449 } else {
450 None
451 }
452 }
453}
454
455pub struct CompositeHealthChecker {
457 checkers: Vec<(&'static str, Box<dyn ComponentChecker>)>,
458}
459
460impl Default for CompositeHealthChecker {
461 fn default() -> Self {
462 Self::new()
463 }
464}
465
466impl CompositeHealthChecker {
467 pub fn new() -> Self {
469 Self {
470 checkers: Vec::new(),
471 }
472 }
473
474 pub fn add_checker(mut self, name: &'static str, checker: Box<dyn ComponentChecker>) -> Self {
476 self.checkers.push((name, checker));
477 self
478 }
479}
480
481#[async_trait]
482impl ComponentChecker for CompositeHealthChecker {
483 async fn check(&self) -> Result<HealthStatus> {
484 let mut overall_status = HealthStatus::Healthy;
485
486 for (_, checker) in &self.checkers {
487 match checker.check().await {
488 Ok(HealthStatus::Unhealthy) => return Ok(HealthStatus::Unhealthy),
489 Ok(HealthStatus::Degraded) => overall_status = HealthStatus::Degraded,
490 Ok(HealthStatus::Healthy) => {}
491 Err(_) => return Ok(HealthStatus::Unhealthy),
492 }
493 }
494
495 Ok(overall_status)
496 }
497
498 async fn debug_info(&self) -> Option<JsonValue> {
499 let mut info = serde_json::json!({});
500
501 for (name, checker) in &self.checkers {
502 if let Some(debug) = checker.debug_info().await {
503 info[name] = debug;
504 }
505 }
506
507 Some(info)
508 }
509}
510
511#[cfg(test)]
512mod tests {
513 use super::*;
514
515 struct TestNetwork {
517 peer_count: usize,
518 }
519
520 impl TestNetwork {
521 fn new(peer_count: usize) -> Self {
522 Self { peer_count }
523 }
524
525 async fn peer_count(&self) -> Result<usize> {
526 Ok(self.peer_count)
527 }
528
529 async fn debug_info(&self) -> Result<NetworkDebugInfo> {
530 Ok(NetworkDebugInfo {
531 peer_count: self.peer_count,
532 active_connections: self.peer_count,
533 listening_addresses: vec![],
534 protocols: vec![],
535 })
536 }
537 }
538
539 struct NetworkDebugInfo {
540 peer_count: usize,
541 active_connections: usize,
542 listening_addresses: Vec<String>,
543 protocols: Vec<String>,
544 }
545
546 struct TestDHT {
547 routing_table_size: usize,
548 }
549
550 impl TestDHT {
551 fn new(size: usize) -> Self {
552 Self {
553 routing_table_size: size,
554 }
555 }
556
557 async fn routing_table_size(&self) -> Result<usize> {
558 Ok(self.routing_table_size)
559 }
560
561 async fn debug_info(&self) -> Result<DhtDebugInfo> {
562 Ok(DhtDebugInfo {
563 routing_table_size: self.routing_table_size,
564 stored_values: 0,
565 pending_queries: 0,
566 replication_factor: 8,
567 })
568 }
569 }
570
571 struct DhtDebugInfo {
572 routing_table_size: usize,
573 stored_values: usize,
574 pending_queries: usize,
575 replication_factor: usize,
576 }
577
578 struct TestTransport {
579 listening: bool,
580 }
581
582 impl TestTransport {
583 fn new(listening: bool) -> Self {
584 Self { listening }
585 }
586
587 async fn is_listening(&self) -> Result<bool> {
588 Ok(self.listening)
589 }
590
591 async fn debug_info(&self) -> Result<TransportDebugInfo> {
592 Ok(TransportDebugInfo {
593 transport_type: "test".to_string(),
594 listening_addresses: vec![],
595 active_connections: 0,
596 bytes_sent: 0,
597 bytes_received: 0,
598 })
599 }
600 }
601
602 struct TransportDebugInfo {
603 transport_type: String,
604 listening_addresses: Vec<String>,
605 active_connections: usize,
606 bytes_sent: u64,
607 bytes_received: u64,
608 }
609
610 #[tokio::test]
611 async fn test_storage_health_checker() {
612 let temp_dir = tempfile::tempdir().unwrap();
613 let checker = StorageHealthChecker::new(temp_dir.path().to_path_buf());
614
615 let status = checker.check().await.unwrap();
616 assert_eq!(status, HealthStatus::Healthy);
617 }
618
619 #[tokio::test]
631 async fn test_composite_health_checker() {
632 struct AlwaysHealthy;
634 #[async_trait]
635 impl ComponentChecker for AlwaysHealthy {
636 async fn check(&self) -> Result<HealthStatus> {
637 Ok(HealthStatus::Healthy)
638 }
639 }
640
641 struct AlwaysDegraded;
642 #[async_trait]
643 impl ComponentChecker for AlwaysDegraded {
644 async fn check(&self) -> Result<HealthStatus> {
645 Ok(HealthStatus::Degraded)
646 }
647 }
648
649 let checker = CompositeHealthChecker::new()
650 .add_checker("healthy", Box::new(AlwaysHealthy))
651 .add_checker("degraded", Box::new(AlwaysDegraded));
652
653 let status = checker.check().await.unwrap();
654 assert_eq!(status, HealthStatus::Degraded);
655 }
656
657 #[tokio::test]
658 async fn test_composite_health_checker_unhealthy() {
659 struct AlwaysUnhealthy;
660 #[async_trait]
661 impl ComponentChecker for AlwaysUnhealthy {
662 async fn check(&self) -> Result<HealthStatus> {
663 Ok(HealthStatus::Unhealthy)
664 }
665 }
666
667 let checker =
668 CompositeHealthChecker::new().add_checker("unhealthy", Box::new(AlwaysUnhealthy));
669
670 let status = checker.check().await.unwrap();
671 assert_eq!(status, HealthStatus::Unhealthy);
672 }
673}