saorsa_core/adaptive/
churn.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: saorsalabs@gmail.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Churn handling and recovery system
15//!
16//! This module implements churn detection and recovery mechanisms:
17//! - Node failure detection with 30-second heartbeat timeout
18//! - Proactive content replication based on churn predictions
19//! - Routing table repair and maintenance
20//! - Trust score updates for unexpected departures
21//! - Topology rebalancing for network health
22//! - Graceful degradation under high churn
23
24use crate::adaptive::{
25    ContentHash, NodeId, TrustProvider,
26    gossip::{AdaptiveGossipSub, GossipMessage},
27    learning::ChurnPredictor,
28    replication::ReplicationManager,
29    routing::AdaptiveRouter,
30};
31use anyhow::Result;
32use std::{
33    collections::{HashMap, HashSet},
34    sync::Arc,
35    time::{Duration, Instant},
36};
37use tokio::sync::RwLock;
38
39/// Churn detection and recovery system
40pub struct ChurnHandler {
41    /// Local node ID
42    node_id: NodeId,
43
44    /// Churn predictor for proactive measures
45    predictor: Arc<ChurnPredictor>,
46
47    /// Node monitoring system
48    node_monitor: Arc<NodeMonitor>,
49
50    /// Recovery manager for content
51    recovery_manager: Arc<RecoveryManager>,
52
53    /// Trust provider for reputation updates
54    trust_provider: Arc<dyn TrustProvider>,
55
56    /// Replication manager
57    replication_manager: Arc<ReplicationManager>,
58
59    /// Routing system
60    router: Arc<AdaptiveRouter>,
61
62    /// Gossip system for announcements
63    gossip: Arc<AdaptiveGossipSub>,
64
65    /// Configuration
66    config: ChurnConfig,
67
68    /// Churn statistics
69    stats: Arc<RwLock<ChurnStats>>,
70}
71
72/// Configuration for churn handling
73#[derive(Debug, Clone)]
74pub struct ChurnConfig {
75    /// Heartbeat timeout (default: 30 seconds)
76    pub heartbeat_timeout: Duration,
77
78    /// Gossip absence timeout (default: 5 minutes)
79    pub gossip_timeout: Duration,
80
81    /// Prediction threshold for proactive measures (default: 0.7)
82    pub prediction_threshold: f64,
83
84    /// Monitoring interval (default: 30 seconds)
85    pub monitoring_interval: Duration,
86
87    /// Maximum acceptable churn rate (default: 30%)
88    pub max_churn_rate: f64,
89}
90
91impl Default for ChurnConfig {
92    fn default() -> Self {
93        Self {
94            heartbeat_timeout: Duration::from_secs(30),
95            gossip_timeout: Duration::from_secs(300),
96            prediction_threshold: 0.7,
97            monitoring_interval: Duration::from_secs(30),
98            max_churn_rate: 0.3,
99        }
100    }
101}
102
103/// Node monitoring system
104pub struct NodeMonitor {
105    /// Node status tracking
106    node_status: Arc<RwLock<HashMap<NodeId, NodeStatus>>>,
107
108    /// Heartbeat tracking
109    heartbeats: Arc<RwLock<HashMap<NodeId, Instant>>>,
110
111    /// Configuration
112    config: ChurnConfig,
113}
114
115/// Status of a monitored node
116#[derive(Debug, Clone)]
117pub struct NodeStatus {
118    /// Node identifier
119    pub node_id: NodeId,
120
121    /// Last seen timestamp
122    pub last_seen: Instant,
123
124    /// Last heartbeat received
125    pub last_heartbeat: Option<Instant>,
126
127    /// Last gossip activity
128    pub last_gossip: Option<Instant>,
129
130    /// Current status
131    pub status: NodeState,
132
133    /// Reliability score (0.0-1.0)
134    pub reliability: f64,
135
136    /// Content stored by this node
137    pub stored_content: HashSet<ContentHash>,
138}
139
140/// Node state in the network
141#[derive(Debug, Clone, PartialEq)]
142pub enum NodeState {
143    /// Node is active and healthy
144    Active,
145
146    /// Node is suspicious (missed heartbeats)
147    Suspicious,
148
149    /// Node is departing (predicted or announced)
150    Departing,
151
152    /// Node has failed
153    Failed,
154}
155
156/// Recovery manager for handling failures
157pub struct RecoveryManager {
158    /// Content tracking
159    content_tracker: Arc<RwLock<HashMap<ContentHash, ContentTracker>>>,
160
161    /// Recovery queue
162    recovery_queue: Arc<RwLock<Vec<RecoveryTask>>>,
163
164    /// Active recoveries
165    _active_recoveries: Arc<RwLock<HashMap<ContentHash, RecoveryStatus>>>,
166}
167
168/// Content tracking information
169#[derive(Debug, Clone)]
170#[allow(dead_code)]
171struct ContentTracker {
172    /// Content hash
173    hash: ContentHash,
174
175    /// Nodes storing this content
176    storing_nodes: HashSet<NodeId>,
177
178    /// Target replication factor
179    target_replicas: u32,
180
181    /// Last verification time
182    last_verified: Instant,
183}
184
185/// Recovery task
186#[derive(Debug, Clone)]
187#[allow(dead_code)]
188struct RecoveryTask {
189    /// Content to recover
190    content_hash: ContentHash,
191
192    /// Failed nodes
193    failed_nodes: Vec<NodeId>,
194
195    /// Priority level
196    priority: RecoveryPriority,
197
198    /// Creation time
199    created_at: Instant,
200}
201
202/// Recovery priority levels
203#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
204pub enum RecoveryPriority {
205    /// Low priority - can wait
206    Low,
207
208    /// Normal priority
209    Normal,
210
211    /// High priority - important content
212    High,
213
214    /// Critical - immediate action needed
215    Critical,
216}
217
218/// Recovery status
219#[derive(Debug, Clone)]
220#[allow(dead_code)]
221struct RecoveryStatus {
222    /// Start time
223    started_at: Instant,
224
225    /// Nodes contacted for recovery
226    contacted_nodes: Vec<NodeId>,
227
228    /// Successful recoveries
229    successful_nodes: Vec<NodeId>,
230
231    /// Failed attempts
232    failed_attempts: u32,
233}
234
235/// Churn handling statistics
236#[derive(Debug, Default, Clone)]
237pub struct ChurnStats {
238    /// Total nodes monitored
239    pub total_nodes: u64,
240
241    /// Active nodes
242    pub active_nodes: u64,
243
244    /// Failed nodes
245    pub failed_nodes: u64,
246
247    /// Suspicious nodes
248    pub suspicious_nodes: u64,
249
250    /// Current churn rate
251    pub churn_rate: f64,
252
253    /// Successful recoveries
254    pub successful_recoveries: u64,
255
256    /// Failed recoveries
257    pub failed_recoveries: u64,
258
259    /// Proactive replications
260    pub proactive_replications: u64,
261
262    /// Average detection time
263    pub avg_detection_time_ms: f64,
264}
265
266impl ChurnHandler {
267    /// Create a new churn handler
268    pub fn new(
269        node_id: NodeId,
270        predictor: Arc<ChurnPredictor>,
271        trust_provider: Arc<dyn TrustProvider>,
272        replication_manager: Arc<ReplicationManager>,
273        router: Arc<AdaptiveRouter>,
274        gossip: Arc<AdaptiveGossipSub>,
275        config: ChurnConfig,
276    ) -> Self {
277        let node_monitor = Arc::new(NodeMonitor::new(config.clone()));
278        let recovery_manager = Arc::new(RecoveryManager::new());
279
280        Self {
281            node_id,
282            predictor,
283            node_monitor,
284            recovery_manager,
285            trust_provider,
286            replication_manager,
287            router,
288            gossip,
289            config,
290            stats: Arc::new(RwLock::new(ChurnStats::default())),
291        }
292    }
293
294    /// Start monitoring network for churn
295    pub async fn start_monitoring(&self) {
296        let monitoring_interval = self.config.monitoring_interval;
297        let handler = self.clone_for_task();
298
299        tokio::spawn(async move {
300            let mut interval = tokio::time::interval(monitoring_interval);
301
302            loop {
303                interval.tick().await;
304
305                if let Err(_e) = handler.monitor_cycle().await {
306                    // log::error!("Churn monitoring error: {}", e);
307                }
308            }
309        });
310    }
311
312    /// Perform one monitoring cycle
313    async fn monitor_cycle(&self) -> Result<()> {
314        let nodes = self.node_monitor.get_all_nodes().await;
315        let mut stats = self.stats.write().await;
316
317        stats.total_nodes = nodes.len() as u64;
318        stats.active_nodes = 0;
319        stats.suspicious_nodes = 0;
320        stats.failed_nodes = 0;
321
322        for node_id in nodes {
323            let node_status = self.node_monitor.get_node_status(&node_id).await;
324
325            match node_status.status {
326                NodeState::Active => {
327                    stats.active_nodes += 1;
328
329                    // Check churn prediction
330                    let prediction = self.predictor.predict(&node_id).await;
331
332                    if prediction.probability_1h > self.config.prediction_threshold {
333                        self.handle_imminent_departure(&node_id).await?;
334                        stats.proactive_replications += 1;
335                    }
336                }
337                NodeState::Suspicious => {
338                    stats.suspicious_nodes += 1;
339
340                    // Check if node should be marked as failed
341                    if node_status.last_seen.elapsed() > self.config.heartbeat_timeout {
342                        self.handle_node_failure(&node_id).await?;
343                    }
344                }
345                NodeState::Failed => {
346                    stats.failed_nodes += 1;
347                }
348                _ => {}
349            }
350        }
351
352        // Calculate churn rate
353        if stats.total_nodes > 0 {
354            stats.churn_rate = stats.failed_nodes as f64 / stats.total_nodes as f64;
355        }
356
357        // Check if network is experiencing high churn
358        if stats.churn_rate > self.config.max_churn_rate {
359            self.handle_high_churn().await?;
360        }
361
362        Ok(())
363    }
364
365    /// Handle imminent node departure (predicted)
366    async fn handle_imminent_departure(&self, node_id: &NodeId) -> Result<()> {
367        // log::info!("Handling imminent departure for node {:?}", node_id);
368
369        // 1. Mark node as departing
370        self.node_monitor
371            .update_node_state(node_id, NodeState::Departing)
372            .await;
373
374        // 2. Get content stored by this node
375        let stored_content = self.get_content_stored_by(node_id).await?;
376
377        // 3. Start aggressive replication
378        for content_hash in stored_content {
379            self.recovery_manager
380                .increase_replication(&content_hash, RecoveryPriority::High)
381                .await?;
382        }
383
384        // 4. Reroute ongoing connections
385        self.router.mark_node_unreliable(node_id).await;
386
387        // 5. Notify network via gossip
388        let message = GossipMessage {
389            topic: "node_departing".to_string(),
390            data: bincode::serialize(&node_id)
391                .map_err(|e| anyhow::anyhow!("Serialization error: {}", e))?,
392            from: self.node_id.clone(),
393            seqno: 0, // Will be set by gossip subsystem
394            timestamp: std::time::SystemTime::now()
395                .duration_since(std::time::UNIX_EPOCH)
396                .map(|d| d.as_secs())
397                .unwrap_or(0),
398        };
399        self.gossip.publish("node_departing", message).await?;
400
401        Ok(())
402    }
403
404    /// Handle confirmed node failure
405    async fn handle_node_failure(&self, node_id: &NodeId) -> Result<()> {
406        let start_time = Instant::now();
407        // log::warn!("Handling node failure for {:?}", node_id);
408
409        // 1. Mark node as failed
410        self.node_monitor
411            .update_node_state(node_id, NodeState::Failed)
412            .await;
413
414        // 2. Remove from all routing structures
415        self.remove_from_routing_tables(node_id).await?;
416
417        // 3. Identify lost content
418        let lost_content = self.identify_lost_content(node_id).await?;
419
420        // 4. Queue recovery tasks
421        for content_hash in lost_content {
422            self.recovery_manager
423                .queue_recovery(
424                    content_hash,
425                    vec![node_id.clone()],
426                    RecoveryPriority::Critical,
427                )
428                .await?;
429        }
430
431        // 5. Update trust scores
432        self.penalize_unexpected_departure(node_id).await;
433
434        // 6. Trigger topology rebalancing
435        self.trigger_topology_rebalance().await?;
436
437        // Update stats
438        let mut stats = self.stats.write().await;
439        stats.failed_nodes += 1;
440        let detection_time = start_time.elapsed().as_millis() as f64;
441        stats.avg_detection_time_ms =
442            (stats.avg_detection_time_ms * (stats.failed_nodes - 1) as f64 + detection_time)
443                / stats.failed_nodes as f64;
444
445        Ok(())
446    }
447
448    /// Handle high churn conditions
449    async fn handle_high_churn(&self) -> Result<()> {
450        // log::warn!("Network experiencing high churn, entering defensive mode");
451
452        // 1. Increase replication factors globally
453        self.replication_manager
454            .increase_global_replication(1.5)
455            .await;
456
457        // 2. Reduce gossip fanout to conserve bandwidth
458        self.gossip.reduce_fanout(0.75).await;
459
460        // 3. Enable aggressive caching
461        self.router.enable_aggressive_caching().await;
462
463        // 4. Notify applications of degraded conditions
464        let churn_rate = self.stats.read().await.churn_rate;
465        let message = GossipMessage {
466            topic: "high_churn_alert".to_string(),
467            data: bincode::serialize(&churn_rate)
468                .map_err(|e| anyhow::anyhow!("Serialization error: {}", e))?,
469            from: self.node_id.clone(),
470            seqno: 0, // Will be set by gossip subsystem
471            timestamp: std::time::SystemTime::now()
472                .duration_since(std::time::UNIX_EPOCH)
473                .map(|d| d.as_secs())
474                .unwrap_or(0),
475        };
476        self.gossip.publish("high_churn_alert", message).await?;
477
478        Ok(())
479    }
480
481    /// Remove failed node from routing tables
482    async fn remove_from_routing_tables(&self, node_id: &NodeId) -> Result<()> {
483        // Remove from Kademlia routing table
484        self.router.remove_node(node_id).await;
485
486        // Remove from hyperbolic space
487        self.router.remove_hyperbolic_coordinate(node_id).await;
488
489        // Remove from SOM
490        self.router.remove_from_som(node_id).await;
491
492        // Remove from trust system
493        self.trust_provider.remove_node(node_id);
494
495        Ok(())
496    }
497
498    /// Get content stored by a specific node
499    async fn get_content_stored_by(&self, node_id: &NodeId) -> Result<Vec<ContentHash>> {
500        let status = self.node_monitor.get_node_status(node_id).await;
501        Ok(status.stored_content.into_iter().collect())
502    }
503
504    /// Identify content that may be lost due to node failure
505    async fn identify_lost_content(&self, failed_node: &NodeId) -> Result<Vec<ContentHash>> {
506        let all_content = self.get_content_stored_by(failed_node).await?;
507        let mut at_risk_content = Vec::new();
508
509        for content_hash in all_content {
510            let remaining_replicas = self
511                .recovery_manager
512                .get_remaining_replicas(&content_hash, failed_node)
513                .await?;
514
515            // If below minimum replication factor, mark as at risk
516            if remaining_replicas < 5 {
517                at_risk_content.push(content_hash);
518            }
519        }
520
521        Ok(at_risk_content)
522    }
523
524    /// Penalize node for unexpected departure
525    async fn penalize_unexpected_departure(&self, node_id: &NodeId) {
526        self.trust_provider.update_trust(
527            &NodeId { hash: [0u8; 32] }, // System node
528            node_id,
529            false, // Negative interaction
530        );
531    }
532
533    /// Trigger topology rebalancing after failures
534    async fn trigger_topology_rebalance(&self) -> Result<()> {
535        // Adjust hyperbolic coordinates
536        self.router.rebalance_hyperbolic_space().await;
537
538        // Update SOM grid if needed
539        self.router.update_som_grid().await;
540
541        // Recompute trust scores
542        self.router.trigger_trust_recomputation().await;
543
544        Ok(())
545    }
546
547    /// Handle heartbeat from a node
548    pub async fn handle_heartbeat(&self, node_id: &NodeId) -> Result<()> {
549        self.node_monitor.record_heartbeat(node_id).await;
550        Ok(())
551    }
552
553    /// Handle gossip activity from a node
554    pub async fn handle_gossip_activity(&self, node_id: &NodeId) -> Result<()> {
555        self.node_monitor.record_gossip_activity(node_id).await;
556        Ok(())
557    }
558
559    /// Get churn statistics
560    pub async fn get_stats(&self) -> ChurnStats {
561        self.stats.read().await.clone()
562    }
563
564    /// Clone for spawning tasks
565    fn clone_for_task(&self) -> Self {
566        Self {
567            node_id: self.node_id.clone(),
568            predictor: self.predictor.clone(),
569            node_monitor: self.node_monitor.clone(),
570            recovery_manager: self.recovery_manager.clone(),
571            trust_provider: self.trust_provider.clone(),
572            replication_manager: self.replication_manager.clone(),
573            router: self.router.clone(),
574            gossip: self.gossip.clone(),
575            config: self.config.clone(),
576            stats: self.stats.clone(),
577        }
578    }
579}
580
581impl NodeMonitor {
582    /// Create a new node monitor
583    pub fn new(config: ChurnConfig) -> Self {
584        Self {
585            node_status: Arc::new(RwLock::new(HashMap::new())),
586            heartbeats: Arc::new(RwLock::new(HashMap::new())),
587            config,
588        }
589    }
590
591    /// Get all monitored nodes
592    pub async fn get_all_nodes(&self) -> Vec<NodeId> {
593        self.node_status.read().await.keys().cloned().collect()
594    }
595
596    /// Get node status
597    pub async fn get_node_status(&self, node_id: &NodeId) -> NodeStatus {
598        self.node_status
599            .read()
600            .await
601            .get(node_id)
602            .cloned()
603            .unwrap_or(NodeStatus {
604                node_id: node_id.clone(),
605                last_seen: Instant::now(),
606                last_heartbeat: None,
607                last_gossip: None,
608                status: NodeState::Failed,
609                reliability: 0.0,
610                stored_content: HashSet::new(),
611            })
612    }
613
614    /// Update node state
615    pub async fn update_node_state(&self, node_id: &NodeId, state: NodeState) {
616        if let Some(status) = self.node_status.write().await.get_mut(node_id) {
617            status.status = state;
618        }
619    }
620
621    /// Record heartbeat from node
622    pub async fn record_heartbeat(&self, node_id: &NodeId) {
623        let now = Instant::now();
624        self.heartbeats.write().await.insert(node_id.clone(), now);
625
626        if let Some(status) = self.node_status.write().await.get_mut(node_id) {
627            status.last_heartbeat = Some(now);
628            status.last_seen = now;
629            status.status = NodeState::Active;
630        }
631    }
632
633    /// Record gossip activity
634    pub async fn record_gossip_activity(&self, node_id: &NodeId) {
635        let now = Instant::now();
636
637        if let Some(status) = self.node_status.write().await.get_mut(node_id) {
638            status.last_gossip = Some(now);
639            status.last_seen = now;
640        }
641    }
642
643    /// Check if node is alive
644    pub async fn is_alive(&self, node_id: &NodeId) -> bool {
645        if let Some(last_heartbeat) = self.heartbeats.read().await.get(node_id) {
646            last_heartbeat.elapsed() < self.config.heartbeat_timeout
647        } else {
648            false
649        }
650    }
651}
652
653impl Default for RecoveryManager {
654    fn default() -> Self {
655        Self::new()
656    }
657}
658
659impl RecoveryManager {
660    /// Create a new recovery manager
661    pub fn new() -> Self {
662        Self {
663            content_tracker: Arc::new(RwLock::new(HashMap::new())),
664            recovery_queue: Arc::new(RwLock::new(Vec::new())),
665            _active_recoveries: Arc::new(RwLock::new(HashMap::new())),
666        }
667    }
668
669    /// Increase replication for content
670    pub async fn increase_replication(
671        &self,
672        content_hash: &ContentHash,
673        priority: RecoveryPriority,
674    ) -> Result<()> {
675        // Queue a replication task
676        self.queue_recovery(*content_hash, vec![], priority).await
677    }
678
679    /// Queue content for recovery
680    pub async fn queue_recovery(
681        &self,
682        content_hash: ContentHash,
683        failed_nodes: Vec<NodeId>,
684        priority: RecoveryPriority,
685    ) -> Result<()> {
686        let task = RecoveryTask {
687            content_hash,
688            failed_nodes,
689            priority,
690            created_at: Instant::now(),
691        };
692
693        let mut queue = self.recovery_queue.write().await;
694        queue.push(task);
695
696        // Sort by priority
697        queue.sort_by_key(|task| std::cmp::Reverse(task.priority));
698
699        Ok(())
700    }
701
702    /// Get remaining replicas for content
703    pub async fn get_remaining_replicas(
704        &self,
705        content_hash: &ContentHash,
706        exclude_node: &NodeId,
707    ) -> Result<u32> {
708        if let Some(tracker) = self.content_tracker.read().await.get(content_hash) {
709            let remaining = tracker
710                .storing_nodes
711                .iter()
712                .filter(|&n| n != exclude_node)
713                .count() as u32;
714            Ok(remaining)
715        } else {
716            Ok(0)
717        }
718    }
719}
720
721#[cfg(test)]
722mod tests {
723    use super::*;
724    use crate::adaptive::som::{GridSize, SomConfig};
725    use crate::adaptive::trust::MockTrustProvider;
726    use rand::RngCore;
727
728    async fn create_test_churn_handler() -> ChurnHandler {
729        let predictor = Arc::new(ChurnPredictor::new());
730        let trust_provider = Arc::new(MockTrustProvider::new());
731        let replication_manager = Arc::new(ReplicationManager::new(
732            Default::default(),
733            trust_provider.clone(),
734            predictor.clone(),
735            Arc::new(AdaptiveRouter::new(
736                trust_provider.clone(),
737                Arc::new(crate::adaptive::hyperbolic::HyperbolicSpace::new()),
738                Arc::new(crate::adaptive::som::SelfOrganizingMap::new(SomConfig {
739                    initial_learning_rate: 0.3,
740                    initial_radius: 5.0,
741                    iterations: 100,
742                    grid_size: GridSize::Fixed(10, 10),
743                })),
744            )),
745        ));
746        let router = Arc::new(AdaptiveRouter::new(
747            trust_provider.clone(),
748            Arc::new(crate::adaptive::hyperbolic::HyperbolicSpace::new()),
749            Arc::new(crate::adaptive::som::SelfOrganizingMap::new(SomConfig {
750                initial_learning_rate: 0.3,
751                initial_radius: 5.0,
752                iterations: 100,
753                grid_size: GridSize::Fixed(10, 10),
754            })),
755        ));
756        // Create a test NodeId
757        use crate::peer_record::UserId;
758        let mut hash = [0u8; 32];
759        rand::thread_rng().fill_bytes(&mut hash);
760        let node_id = UserId::from_bytes(hash);
761
762        let gossip = Arc::new(AdaptiveGossipSub::new(
763            node_id.clone(),
764            trust_provider.clone(),
765        ));
766
767        // Create default ChurnConfig
768        let config = ChurnConfig {
769            heartbeat_timeout: Duration::from_secs(30),
770            gossip_timeout: Duration::from_secs(300),
771            monitoring_interval: Duration::from_secs(5),
772            prediction_threshold: 0.7,
773            max_churn_rate: 0.2,
774        };
775
776        ChurnHandler::new(
777            node_id,
778            predictor,
779            trust_provider,
780            replication_manager,
781            router,
782            gossip,
783            config,
784        )
785    }
786
787    #[tokio::test]
788    async fn test_node_monitoring() {
789        let handler = create_test_churn_handler().await;
790        let node_id = NodeId { hash: [1u8; 32] };
791
792        // Record heartbeat
793        handler.handle_heartbeat(&node_id).await.unwrap();
794
795        // Check node is alive
796        assert!(handler.node_monitor.is_alive(&node_id).await);
797
798        // Check status
799        let status = handler.node_monitor.get_node_status(&node_id).await;
800        assert_eq!(status.status, NodeState::Active);
801        assert!(status.last_heartbeat.is_some());
802    }
803
804    #[tokio::test]
805    async fn test_failure_detection() {
806        let mut config = ChurnConfig::default();
807        config.heartbeat_timeout = Duration::from_millis(100); // Short timeout for testing
808
809        let handler = create_test_churn_handler().await;
810        let node_id = NodeId { hash: [1u8; 32] };
811
812        // Record initial heartbeat
813        handler.handle_heartbeat(&node_id).await.unwrap();
814
815        // Wait for timeout
816        tokio::time::sleep(Duration::from_millis(150)).await;
817
818        // Node should no longer be alive
819        assert!(!handler.node_monitor.is_alive(&node_id).await);
820    }
821
822    #[tokio::test]
823    async fn test_proactive_replication() {
824        let handler = create_test_churn_handler().await;
825        let node_id = NodeId { hash: [1u8; 32] };
826
827        // Add node with high churn probability
828        handler
829            .predictor
830            .update_node_features(
831                &node_id,
832                vec![
833                    0.1, // Low online duration
834                    0.9, // High response time
835                    0.1, // Low resource contribution
836                    0.1, // Low message frequency
837                    0.0, // Time of day
838                    0.0, // Day of week
839                    0.1, // Low historical reliability
840                    0.0, 0.0, 0.0,
841                ],
842            )
843            .await
844            .unwrap();
845
846        // Handle imminent departure
847        handler.handle_imminent_departure(&node_id).await.unwrap();
848
849        // Check node marked as departing
850        let status = handler.node_monitor.get_node_status(&node_id).await;
851        assert_eq!(status.status, NodeState::Departing);
852    }
853
854    #[tokio::test]
855    async fn test_churn_rate_calculation() {
856        let handler = create_test_churn_handler().await;
857
858        // Add some nodes
859        for i in 0..10 {
860            let node_id = NodeId { hash: [i; 32] };
861            handler.handle_heartbeat(&node_id).await.unwrap();
862        }
863
864        // Mark some as failed
865        for i in 0..3 {
866            let node_id = NodeId { hash: [i; 32] };
867            handler
868                .node_monitor
869                .update_node_state(&node_id, NodeState::Failed)
870                .await;
871        }
872
873        // Run monitoring cycle
874        handler.monitor_cycle().await.unwrap();
875
876        // Check stats
877        let stats = handler.get_stats().await;
878        assert_eq!(stats.total_nodes, 10);
879        assert_eq!(stats.failed_nodes, 3);
880        assert!((stats.churn_rate - 0.3).abs() < 0.01); // 30% churn rate
881    }
882}