saorsa_core/health/
checks.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: saorsalabs@gmail.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Component health checkers for various P2P subsystems
15
16#![allow(dead_code)]
17
18use super::HealthStatus;
19use crate::Result;
20use async_trait::async_trait;
21use serde_json::Value as JsonValue;
22use tokio::time::{Duration, timeout};
23
24// Reduce type complexity with aliases for boxed async fn types
25type BoxedAsyncUsizeFn = Box<
26    dyn Fn() -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<usize>> + Send>>
27        + Send
28        + Sync,
29>;
30type BoxedAsyncBoolFn = Box<
31    dyn Fn() -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<bool>> + Send>>
32        + Send
33        + Sync,
34>;
35
36/// Trait for component health checkers
37#[async_trait]
38pub trait ComponentChecker: Send + Sync {
39    /// Check the health of the component
40    async fn check(&self) -> Result<HealthStatus>;
41
42    /// Get debug information about the component
43    async fn debug_info(&self) -> Option<JsonValue> {
44        None
45    }
46}
47
48/// Network connectivity health checker
49///
50/// This is a placeholder implementation that will be connected to the actual
51/// Network type once it's available
52pub struct NetworkHealthChecker {
53    get_peer_count: BoxedAsyncUsizeFn,
54    min_peers: usize,
55    timeout_duration: Duration,
56}
57
58impl NetworkHealthChecker {
59    /// Create a new network health checker with a peer count function
60    pub fn new<F, Fut>(get_peer_count: F) -> Self
61    where
62        F: Fn() -> Fut + Send + Sync + 'static,
63        Fut: std::future::Future<Output = Result<usize>> + Send + 'static,
64    {
65        Self {
66            get_peer_count: Box::new(move || Box::pin(get_peer_count())),
67            min_peers: 1,
68            timeout_duration: Duration::from_millis(50),
69        }
70    }
71
72    /// Set minimum number of peers for healthy status
73    pub fn with_min_peers(mut self, min_peers: usize) -> Self {
74        self.min_peers = min_peers;
75        self
76    }
77}
78
79#[async_trait]
80impl ComponentChecker for NetworkHealthChecker {
81    async fn check(&self) -> Result<HealthStatus> {
82        // Check network connectivity with timeout
83        let future = (self.get_peer_count)();
84        match timeout(self.timeout_duration, future).await {
85            Ok(Ok(count)) => {
86                if count >= self.min_peers {
87                    Ok(HealthStatus::Healthy)
88                } else if count > 0 {
89                    Ok(HealthStatus::Degraded)
90                } else {
91                    Ok(HealthStatus::Unhealthy)
92                }
93            }
94            Ok(Err(_)) => Ok(HealthStatus::Unhealthy),
95            Err(_) => Ok(HealthStatus::Unhealthy), // Timeout
96        }
97    }
98
99    async fn debug_info(&self) -> Option<JsonValue> {
100        let future = (self.get_peer_count)();
101        if let Ok(Ok(count)) = timeout(self.timeout_duration, future).await {
102            Some(serde_json::json!({
103                "peer_count": count,
104                "min_peers": self.min_peers,
105            }))
106        } else {
107            None
108        }
109    }
110}
111
112/// DHT availability health checker
113///
114/// This is a placeholder implementation that will be connected to the actual
115/// DHT type once it's available
116pub struct DhtHealthChecker {
117    get_routing_table_size: BoxedAsyncUsizeFn,
118    min_nodes: usize,
119    timeout_duration: Duration,
120}
121
122impl DhtHealthChecker {
123    /// Create a new DHT health checker with a routing table size function
124    pub fn new<F, Fut>(get_routing_table_size: F) -> Self
125    where
126        F: Fn() -> Fut + Send + Sync + 'static,
127        Fut: std::future::Future<Output = Result<usize>> + Send + 'static,
128    {
129        Self {
130            get_routing_table_size: Box::new(move || Box::pin(get_routing_table_size())),
131            min_nodes: 3,
132            timeout_duration: Duration::from_millis(50),
133        }
134    }
135
136    /// Set minimum number of DHT nodes for healthy status
137    pub fn with_min_nodes(mut self, min_nodes: usize) -> Self {
138        self.min_nodes = min_nodes;
139        self
140    }
141}
142
143#[async_trait]
144impl ComponentChecker for DhtHealthChecker {
145    async fn check(&self) -> Result<HealthStatus> {
146        // Check DHT routing table
147        let future = (self.get_routing_table_size)();
148        match timeout(self.timeout_duration, future).await {
149            Ok(Ok(size)) => {
150                if size >= self.min_nodes {
151                    Ok(HealthStatus::Healthy)
152                } else if size > 0 {
153                    Ok(HealthStatus::Degraded)
154                } else {
155                    Ok(HealthStatus::Unhealthy)
156                }
157            }
158            Ok(Err(_)) => Ok(HealthStatus::Unhealthy),
159            Err(_) => Ok(HealthStatus::Unhealthy), // Timeout
160        }
161    }
162
163    async fn debug_info(&self) -> Option<JsonValue> {
164        let future = (self.get_routing_table_size)();
165        if let Ok(Ok(size)) = timeout(self.timeout_duration, future).await {
166            Some(serde_json::json!({
167                "routing_table_size": size,
168                "min_nodes": self.min_nodes,
169                "replication_factor": 8, // Default K value
170            }))
171        } else {
172            None
173        }
174    }
175}
176
177/// Storage access health checker
178pub struct StorageHealthChecker {
179    storage_path: std::path::PathBuf,
180    min_free_space: u64,
181    timeout_duration: Duration,
182}
183
184impl StorageHealthChecker {
185    /// Create a new storage health checker
186    pub fn new(storage_path: std::path::PathBuf) -> Self {
187        Self {
188            storage_path,
189            min_free_space: 100 * 1024 * 1024, // 100MB default
190            timeout_duration: Duration::from_millis(50),
191        }
192    }
193
194    /// Set minimum free space for healthy status
195    pub fn with_min_free_space(mut self, bytes: u64) -> Self {
196        self.min_free_space = bytes;
197        self
198    }
199}
200
201#[async_trait]
202impl ComponentChecker for StorageHealthChecker {
203    async fn check(&self) -> Result<HealthStatus> {
204        // Check storage accessibility and free space
205        let path = self.storage_path.clone();
206        let min_free = self.min_free_space;
207
208        match timeout(
209            self.timeout_duration,
210            tokio::task::spawn_blocking(move || check_storage_health(&path, min_free)),
211        )
212        .await
213        {
214            Ok(Ok(status)) => Ok(status),
215            Ok(Err(_)) => Ok(HealthStatus::Unhealthy),
216            Err(_) => Ok(HealthStatus::Unhealthy), // Timeout
217        }
218    }
219
220    async fn debug_info(&self) -> Option<JsonValue> {
221        if let Ok(metadata) = tokio::fs::metadata(&self.storage_path).await {
222            // Get disk usage info (simplified)
223            Some(serde_json::json!({
224                "path": self.storage_path.display().to_string(),
225                "exists": true,
226                "is_dir": metadata.is_dir(),
227                "min_free_space": self.min_free_space,
228            }))
229        } else {
230            Some(serde_json::json!({
231                "path": self.storage_path.display().to_string(),
232                "exists": false,
233            }))
234        }
235    }
236}
237
238/// Helper function to check storage health
239fn check_storage_health(path: &std::path::Path, min_free_space: u64) -> HealthStatus {
240    use std::fs;
241
242    // Check if path exists and is writable
243    if !path.exists() {
244        return HealthStatus::Unhealthy;
245    }
246
247    // Try to create a test file to verify write access
248    let test_file = path.join(".health_check");
249    match fs::write(&test_file, b"health_check") {
250        Ok(_) => {
251            // Clean up test file
252            let _ = fs::remove_file(&test_file);
253
254            // Check free space (platform-specific, simplified here)
255            // In production, use a crate like fs2 or sysinfo
256            if get_free_space(path) >= min_free_space {
257                HealthStatus::Healthy
258            } else {
259                HealthStatus::Degraded
260            }
261        }
262        Err(_) => HealthStatus::Unhealthy,
263    }
264}
265
266/// Get free space for a path (stub implementation)
267fn get_free_space(_path: &std::path::Path) -> u64 {
268    // In a real implementation, use platform-specific APIs or a crate
269    1024 * 1024 * 1024 // 1GB default
270}
271
272use crate::production::ResourceManager;
273use std::sync::Arc;
274
275/// Resource usage health checker
276pub struct ResourceHealthChecker {
277    resource_manager: Arc<ResourceManager>,
278    max_memory_percent: f64,
279    max_cpu_percent: f64,
280    timeout_duration: Duration,
281}
282
283impl ResourceHealthChecker {
284    /// Create a new resource health checker
285    pub fn new(resource_manager: Arc<ResourceManager>) -> Self {
286        Self {
287            resource_manager,
288            max_memory_percent: 80.0,
289            max_cpu_percent: 90.0,
290            timeout_duration: Duration::from_millis(50),
291        }
292    }
293}
294
295#[async_trait]
296impl ComponentChecker for ResourceHealthChecker {
297    async fn check(&self) -> Result<HealthStatus> {
298        match timeout(self.timeout_duration, async {
299            self.resource_manager.get_metrics().await
300        })
301        .await
302        {
303            Ok(metrics) => {
304                // Check CPU usage
305                if metrics.cpu_usage > self.max_cpu_percent {
306                    return Ok(HealthStatus::Unhealthy);
307                }
308
309                // Check memory usage (simplified - compare against configured limit)
310                let memory_percent = if self.resource_manager.config.max_memory_bytes > 0 {
311                    (metrics.memory_used as f64
312                        / self.resource_manager.config.max_memory_bytes as f64)
313                        * 100.0
314                } else {
315                    0.0
316                };
317
318                if memory_percent > self.max_memory_percent {
319                    Ok(HealthStatus::Degraded)
320                } else {
321                    Ok(HealthStatus::Healthy)
322                }
323            }
324            _ => Ok(HealthStatus::Unhealthy), // Timeout
325        }
326    }
327
328    async fn debug_info(&self) -> Option<JsonValue> {
329        let metrics = self.resource_manager.get_metrics().await;
330        Some(serde_json::json!({
331            "memory_used": metrics.memory_used,
332            "active_connections": metrics.active_connections,
333            "bandwidth_usage": metrics.bandwidth_usage,
334            "cpu_usage": metrics.cpu_usage,
335            "dht_ops_per_sec": metrics.dht_metrics.ops_per_sec,
336        }))
337    }
338}
339
340/// Transport health checker
341///
342/// This is a placeholder implementation that will be connected to the actual
343/// Transport type once it's available
344pub struct TransportHealthChecker {
345    is_listening: BoxedAsyncBoolFn,
346    timeout_duration: Duration,
347}
348
349impl TransportHealthChecker {
350    /// Create a new transport health checker with a listening check function
351    pub fn new<F, Fut>(is_listening: F) -> Self
352    where
353        F: Fn() -> Fut + Send + Sync + 'static,
354        Fut: std::future::Future<Output = Result<bool>> + Send + 'static,
355    {
356        Self {
357            is_listening: Box::new(move || Box::pin(is_listening())),
358            timeout_duration: Duration::from_millis(50),
359        }
360    }
361}
362
363#[async_trait]
364impl ComponentChecker for TransportHealthChecker {
365    async fn check(&self) -> Result<HealthStatus> {
366        let future = (self.is_listening)();
367        match timeout(self.timeout_duration, future).await {
368            Ok(Ok(true)) => Ok(HealthStatus::Healthy),
369            Ok(Ok(false)) => Ok(HealthStatus::Unhealthy),
370            Ok(Err(_)) => Ok(HealthStatus::Unhealthy),
371            Err(_) => Ok(HealthStatus::Unhealthy), // Timeout
372        }
373    }
374
375    async fn debug_info(&self) -> Option<JsonValue> {
376        let future = (self.is_listening)();
377        if let Ok(Ok(listening)) = timeout(self.timeout_duration, future).await {
378            Some(serde_json::json!({
379                "is_listening": listening,
380                "transport_type": "p2p",
381            }))
382        } else {
383            None
384        }
385    }
386}
387
388/// Peer connections health checker
389///
390/// This is a placeholder implementation that will be connected to the actual
391/// Network type once it's available
392pub struct PeerHealthChecker {
393    get_peer_count: BoxedAsyncUsizeFn,
394    min_peers: usize,
395    max_peers: usize,
396    timeout_duration: Duration,
397}
398
399impl PeerHealthChecker {
400    /// Create a new peer health checker with a peer count function
401    pub fn new<F, Fut>(get_peer_count: F) -> Self
402    where
403        F: Fn() -> Fut + Send + Sync + 'static,
404        Fut: std::future::Future<Output = Result<usize>> + Send + 'static,
405    {
406        Self {
407            get_peer_count: Box::new(move || Box::pin(get_peer_count())),
408            min_peers: 1,
409            max_peers: 1000,
410            timeout_duration: Duration::from_millis(50),
411        }
412    }
413
414    /// Set peer count thresholds
415    pub fn with_peer_limits(mut self, min: usize, max: usize) -> Self {
416        self.min_peers = min;
417        self.max_peers = max;
418        self
419    }
420}
421
422#[async_trait]
423impl ComponentChecker for PeerHealthChecker {
424    async fn check(&self) -> Result<HealthStatus> {
425        let future = (self.get_peer_count)();
426        match timeout(self.timeout_duration, future).await {
427            Ok(Ok(count)) => {
428                if count < self.min_peers {
429                    Ok(HealthStatus::Unhealthy)
430                } else if count > self.max_peers {
431                    Ok(HealthStatus::Degraded) // Too many peers
432                } else {
433                    Ok(HealthStatus::Healthy)
434                }
435            }
436            Ok(Err(_)) => Ok(HealthStatus::Unhealthy),
437            Err(_) => Ok(HealthStatus::Unhealthy), // Timeout
438        }
439    }
440
441    async fn debug_info(&self) -> Option<JsonValue> {
442        let future = (self.get_peer_count)();
443        if let Ok(Ok(count)) = timeout(self.timeout_duration, future).await {
444            Some(serde_json::json!({
445                "peer_count": count,
446                "min_peers": self.min_peers,
447                "max_peers": self.max_peers,
448            }))
449        } else {
450            None
451        }
452    }
453}
454
455/// Composite health checker that runs multiple checks
456pub struct CompositeHealthChecker {
457    checkers: Vec<(&'static str, Box<dyn ComponentChecker>)>,
458}
459
460impl Default for CompositeHealthChecker {
461    fn default() -> Self {
462        Self::new()
463    }
464}
465
466impl CompositeHealthChecker {
467    /// Create a new composite health checker
468    pub fn new() -> Self {
469        Self {
470            checkers: Vec::new(),
471        }
472    }
473
474    /// Add a component checker
475    pub fn add_checker(mut self, name: &'static str, checker: Box<dyn ComponentChecker>) -> Self {
476        self.checkers.push((name, checker));
477        self
478    }
479}
480
481#[async_trait]
482impl ComponentChecker for CompositeHealthChecker {
483    async fn check(&self) -> Result<HealthStatus> {
484        let mut overall_status = HealthStatus::Healthy;
485
486        for (_, checker) in &self.checkers {
487            match checker.check().await {
488                Ok(HealthStatus::Unhealthy) => return Ok(HealthStatus::Unhealthy),
489                Ok(HealthStatus::Degraded) => overall_status = HealthStatus::Degraded,
490                Ok(HealthStatus::Healthy) => {}
491                Err(_) => return Ok(HealthStatus::Unhealthy),
492            }
493        }
494
495        Ok(overall_status)
496    }
497
498    async fn debug_info(&self) -> Option<JsonValue> {
499        let mut info = serde_json::json!({});
500
501        for (name, checker) in &self.checkers {
502            if let Some(debug) = checker.debug_info().await {
503                info[name] = debug;
504            }
505        }
506
507        Some(info)
508    }
509}
510
511#[cfg(test)]
512mod tests {
513    use super::*;
514
515    // Simple test implementations
516    struct TestNetwork {
517        peer_count: usize,
518    }
519
520    impl TestNetwork {
521        fn new(peer_count: usize) -> Self {
522            Self { peer_count }
523        }
524
525        async fn peer_count(&self) -> Result<usize> {
526            Ok(self.peer_count)
527        }
528
529        async fn debug_info(&self) -> Result<NetworkDebugInfo> {
530            Ok(NetworkDebugInfo {
531                peer_count: self.peer_count,
532                active_connections: self.peer_count,
533                listening_addresses: vec![],
534                protocols: vec![],
535            })
536        }
537    }
538
539    struct NetworkDebugInfo {
540        peer_count: usize,
541        active_connections: usize,
542        listening_addresses: Vec<String>,
543        protocols: Vec<String>,
544    }
545
546    struct TestDHT {
547        routing_table_size: usize,
548    }
549
550    impl TestDHT {
551        fn new(size: usize) -> Self {
552            Self {
553                routing_table_size: size,
554            }
555        }
556
557        async fn routing_table_size(&self) -> Result<usize> {
558            Ok(self.routing_table_size)
559        }
560
561        async fn debug_info(&self) -> Result<DhtDebugInfo> {
562            Ok(DhtDebugInfo {
563                routing_table_size: self.routing_table_size,
564                stored_values: 0,
565                pending_queries: 0,
566                replication_factor: 8,
567            })
568        }
569    }
570
571    struct DhtDebugInfo {
572        routing_table_size: usize,
573        stored_values: usize,
574        pending_queries: usize,
575        replication_factor: usize,
576    }
577
578    struct TestTransport {
579        listening: bool,
580    }
581
582    impl TestTransport {
583        fn new(listening: bool) -> Self {
584            Self { listening }
585        }
586
587        async fn is_listening(&self) -> Result<bool> {
588            Ok(self.listening)
589        }
590
591        async fn debug_info(&self) -> Result<TransportDebugInfo> {
592            Ok(TransportDebugInfo {
593                transport_type: "test".to_string(),
594                listening_addresses: vec![],
595                active_connections: 0,
596                bytes_sent: 0,
597                bytes_received: 0,
598            })
599        }
600    }
601
602    struct TransportDebugInfo {
603        transport_type: String,
604        listening_addresses: Vec<String>,
605        active_connections: usize,
606        bytes_sent: u64,
607        bytes_received: u64,
608    }
609
610    #[tokio::test]
611    async fn test_storage_health_checker() {
612        let temp_dir = tempfile::tempdir().unwrap();
613        let checker = StorageHealthChecker::new(temp_dir.path().to_path_buf());
614
615        let status = checker.check().await.unwrap();
616        assert_eq!(status, HealthStatus::Healthy);
617    }
618
619    // TODO: Uncomment when ResourceHealthChecker is implemented
620    // #[tokio::test]
621    // async fn test_resource_health_checker() {
622    //     let config = ProductionConfig::default();
623    //     let manager = Arc::new(ResourceManager::new(config));
624    //
625    //     let checker = ResourceHealthChecker::new(manager);
626    //     let status = checker.check().await.unwrap();
627    //     assert_eq!(status, HealthStatus::Healthy);
628    // }
629
630    #[tokio::test]
631    async fn test_composite_health_checker() {
632        // Test composite with simple checkers
633        struct AlwaysHealthy;
634        #[async_trait]
635        impl ComponentChecker for AlwaysHealthy {
636            async fn check(&self) -> Result<HealthStatus> {
637                Ok(HealthStatus::Healthy)
638            }
639        }
640
641        struct AlwaysDegraded;
642        #[async_trait]
643        impl ComponentChecker for AlwaysDegraded {
644            async fn check(&self) -> Result<HealthStatus> {
645                Ok(HealthStatus::Degraded)
646            }
647        }
648
649        let checker = CompositeHealthChecker::new()
650            .add_checker("healthy", Box::new(AlwaysHealthy))
651            .add_checker("degraded", Box::new(AlwaysDegraded));
652
653        let status = checker.check().await.unwrap();
654        assert_eq!(status, HealthStatus::Degraded);
655    }
656
657    #[tokio::test]
658    async fn test_composite_health_checker_unhealthy() {
659        struct AlwaysUnhealthy;
660        #[async_trait]
661        impl ComponentChecker for AlwaysUnhealthy {
662            async fn check(&self) -> Result<HealthStatus> {
663                Ok(HealthStatus::Unhealthy)
664            }
665        }
666
667        let checker =
668            CompositeHealthChecker::new().add_checker("unhealthy", Box::new(AlwaysUnhealthy));
669
670        let status = checker.check().await.unwrap();
671        assert_eq!(status, HealthStatus::Unhealthy);
672    }
673}