saorsa_core/adaptive/
churn.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: david@saorsalabs.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Churn handling and recovery system
15//!
16//! This module implements churn detection and recovery mechanisms:
17//! - Node failure detection with 30-second heartbeat timeout
18//! - Proactive content replication based on churn predictions
19//! - Routing table repair and maintenance
20//! - Trust score updates for unexpected departures
21//! - Topology rebalancing for network health
22//! - Graceful degradation under high churn
23
24use crate::adaptive::{
25    ContentHash, NodeId, TrustProvider,
26    gossip::{AdaptiveGossipSub, GossipMessage},
27    learning::ChurnPredictor,
28    replication::ReplicationManager,
29    routing::AdaptiveRouter,
30};
31use crate::dht::{NodeFailureTracker, ReplicationGracePeriodConfig};
32use anyhow::Result;
33use std::{
34    collections::{HashMap, HashSet},
35    sync::Arc,
36    time::{Duration, Instant},
37};
38use tokio::sync::RwLock;
39
40/// Churn detection and recovery system
41pub struct ChurnHandler {
42    /// Local node ID
43    node_id: NodeId,
44
45    /// Churn predictor for proactive measures
46    predictor: Arc<ChurnPredictor>,
47
48    /// Node monitoring system
49    node_monitor: Arc<NodeMonitor>,
50
51    /// Recovery manager for content
52    recovery_manager: Arc<RecoveryManager>,
53
54    /// Trust provider for reputation updates
55    trust_provider: Arc<dyn TrustProvider>,
56
57    /// Replication manager
58    replication_manager: Arc<ReplicationManager>,
59
60    /// Routing system
61    router: Arc<AdaptiveRouter>,
62
63    /// Gossip system for announcements
64    gossip: Arc<AdaptiveGossipSub>,
65
66    /// Configuration
67    config: ChurnConfig,
68
69    /// Churn statistics
70    stats: Arc<RwLock<ChurnStats>>,
71}
72
73/// Configuration for churn handling
74#[derive(Debug, Clone)]
75pub struct ChurnConfig {
76    /// Heartbeat timeout (default: 30 seconds)
77    pub heartbeat_timeout: Duration,
78
79    /// Gossip absence timeout (default: 5 minutes)
80    pub gossip_timeout: Duration,
81
82    /// Prediction threshold for proactive measures (default: 0.7)
83    pub prediction_threshold: f64,
84
85    /// Monitoring interval (default: 30 seconds)
86    pub monitoring_interval: Duration,
87
88    /// Maximum acceptable churn rate (default: 30%)
89    pub max_churn_rate: f64,
90}
91
92impl Default for ChurnConfig {
93    fn default() -> Self {
94        Self {
95            heartbeat_timeout: Duration::from_secs(30),
96            gossip_timeout: Duration::from_secs(300),
97            prediction_threshold: 0.7,
98            monitoring_interval: Duration::from_secs(30),
99            max_churn_rate: 0.3,
100        }
101    }
102}
103
104/// Node monitoring system
105pub struct NodeMonitor {
106    /// Node status tracking
107    node_status: Arc<RwLock<HashMap<NodeId, NodeStatus>>>,
108
109    /// Heartbeat tracking
110    heartbeats: Arc<RwLock<HashMap<NodeId, Instant>>>,
111
112    /// Configuration
113    config: ChurnConfig,
114}
115
116/// Status of a monitored node
117#[derive(Debug, Clone)]
118pub struct NodeStatus {
119    /// Node identifier
120    pub node_id: NodeId,
121
122    /// Last seen timestamp
123    pub last_seen: Instant,
124
125    /// Last heartbeat received
126    pub last_heartbeat: Option<Instant>,
127
128    /// Last gossip activity
129    pub last_gossip: Option<Instant>,
130
131    /// Current status
132    pub status: NodeState,
133
134    /// Reliability score (0.0-1.0)
135    pub reliability: f64,
136
137    /// Content stored by this node
138    pub stored_content: HashSet<ContentHash>,
139}
140
141/// Node state in the network
142#[derive(Debug, Clone, PartialEq)]
143pub enum NodeState {
144    /// Node is active and healthy
145    Active,
146
147    /// Node is suspicious (missed heartbeats)
148    Suspicious,
149
150    /// Node is departing (predicted or announced)
151    Departing,
152
153    /// Node has failed
154    Failed,
155}
156
157/// Recovery manager for handling failures
158pub struct RecoveryManager {
159    /// Content tracking
160    content_tracker: Arc<RwLock<HashMap<ContentHash, ContentTracker>>>,
161
162    /// Recovery queue
163    recovery_queue: Arc<RwLock<Vec<RecoveryTask>>>,
164
165    /// Active recoveries
166    _active_recoveries: Arc<RwLock<HashMap<ContentHash, RecoveryStatus>>>,
167
168    /// Node failure tracker for grace period management
169    node_failure_tracker: Arc<RwLock<Option<Arc<dyn NodeFailureTracker>>>>,
170}
171
172/// Content tracking information
173#[derive(Debug, Clone)]
174#[allow(dead_code)]
175struct ContentTracker {
176    /// Content hash
177    hash: ContentHash,
178
179    /// Nodes storing this content
180    storing_nodes: HashSet<NodeId>,
181
182    /// Target replication factor
183    target_replicas: u32,
184
185    /// Last verification time
186    last_verified: Instant,
187}
188
189/// Recovery task
190#[derive(Debug, Clone)]
191#[allow(dead_code)]
192struct RecoveryTask {
193    /// Content to recover
194    content_hash: ContentHash,
195
196    /// Failed nodes
197    failed_nodes: Vec<NodeId>,
198
199    /// Priority level
200    priority: RecoveryPriority,
201
202    /// Creation time
203    created_at: Instant,
204}
205
206/// Recovery priority levels
207#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
208pub enum RecoveryPriority {
209    /// Low priority - can wait
210    Low,
211
212    /// Normal priority
213    Normal,
214
215    /// High priority - important content
216    High,
217
218    /// Critical - immediate action needed
219    Critical,
220}
221
222/// Recovery status
223#[derive(Debug, Clone)]
224#[allow(dead_code)]
225struct RecoveryStatus {
226    /// Start time
227    started_at: Instant,
228
229    /// Nodes contacted for recovery
230    contacted_nodes: Vec<NodeId>,
231
232    /// Successful recoveries
233    successful_nodes: Vec<NodeId>,
234
235    /// Failed attempts
236    failed_attempts: u32,
237}
238
239/// Churn handling statistics
240#[derive(Debug, Default, Clone)]
241pub struct ChurnStats {
242    /// Total nodes monitored
243    pub total_nodes: u64,
244
245    /// Active nodes
246    pub active_nodes: u64,
247
248    /// Failed nodes
249    pub failed_nodes: u64,
250
251    /// Suspicious nodes
252    pub suspicious_nodes: u64,
253
254    /// Current churn rate
255    pub churn_rate: f64,
256
257    /// Successful recoveries
258    pub successful_recoveries: u64,
259
260    /// Failed recoveries
261    pub failed_recoveries: u64,
262
263    /// Proactive replications
264    pub proactive_replications: u64,
265
266    /// Average detection time
267    pub avg_detection_time_ms: f64,
268
269    /// Grace period prevented replications
270    pub grace_period_preventions: u64,
271
272    /// Successful node re-registrations during grace period
273    pub successful_reregistrations: u64,
274
275    /// Average grace period duration for re-registered nodes
276    pub avg_grace_period_duration_ms: f64,
277}
278
279impl ChurnHandler {
280    /// Create a new churn handler
281    pub fn new(
282        node_id: NodeId,
283        predictor: Arc<ChurnPredictor>,
284        trust_provider: Arc<dyn TrustProvider>,
285        replication_manager: Arc<ReplicationManager>,
286        router: Arc<AdaptiveRouter>,
287        gossip: Arc<AdaptiveGossipSub>,
288        config: ChurnConfig,
289    ) -> Self {
290        let node_monitor = Arc::new(NodeMonitor::new(config.clone()));
291        let recovery_manager = Arc::new(RecoveryManager::new());
292
293        Self {
294            node_id,
295            predictor,
296            node_monitor,
297            recovery_manager,
298            trust_provider,
299            replication_manager,
300            router,
301            gossip,
302            config,
303            stats: Arc::new(RwLock::new(ChurnStats::default())),
304        }
305    }
306
307    /// Create a new churn handler with node failure tracker for grace periods
308    pub fn with_failure_tracker(
309        node_id: NodeId,
310        predictor: Arc<ChurnPredictor>,
311        trust_provider: Arc<dyn TrustProvider>,
312        replication_manager: Arc<ReplicationManager>,
313        router: Arc<AdaptiveRouter>,
314        gossip: Arc<AdaptiveGossipSub>,
315        config: ChurnConfig,
316        failure_tracker: Arc<dyn NodeFailureTracker>,
317    ) -> Self {
318        let node_monitor = Arc::new(NodeMonitor::new(config.clone()));
319        let recovery_manager = Arc::new(RecoveryManager::with_failure_tracker(failure_tracker));
320
321        Self {
322            node_id,
323            predictor,
324            node_monitor,
325            recovery_manager,
326            trust_provider,
327            replication_manager,
328            router,
329            gossip,
330            config,
331            stats: Arc::new(RwLock::new(ChurnStats::default())),
332        }
333    }
334
335    /// Set the node failure tracker for grace period management
336    pub async fn set_failure_tracker(&self, failure_tracker: Arc<dyn NodeFailureTracker>) {
337        self.recovery_manager
338            .set_failure_tracker(failure_tracker)
339            .await;
340    }
341
342    /// Start monitoring network for churn
343    pub async fn start_monitoring(&self) {
344        let monitoring_interval = self.config.monitoring_interval;
345        let handler = self.clone_for_task();
346
347        tokio::spawn(async move {
348            let mut interval = tokio::time::interval(monitoring_interval);
349
350            loop {
351                interval.tick().await;
352
353                if let Err(_e) = handler.monitor_cycle().await {
354                    // log::error!("Churn monitoring error: {}", e);
355                }
356            }
357        });
358    }
359
360    /// Perform one monitoring cycle
361    async fn monitor_cycle(&self) -> Result<()> {
362        let nodes = self.node_monitor.get_all_nodes().await;
363        let mut stats = self.stats.write().await;
364
365        stats.total_nodes = nodes.len() as u64;
366        stats.active_nodes = 0;
367        stats.suspicious_nodes = 0;
368        stats.failed_nodes = 0;
369
370        for node_id in nodes {
371            let node_status = self.node_monitor.get_node_status(&node_id).await;
372
373            match node_status.status {
374                NodeState::Active => {
375                    stats.active_nodes += 1;
376
377                    // Check churn prediction
378                    let prediction = self.predictor.predict(&node_id).await;
379
380                    if prediction.probability_1h > self.config.prediction_threshold {
381                        self.handle_imminent_departure(&node_id).await?;
382                        stats.proactive_replications += 1;
383                    }
384                }
385                NodeState::Suspicious => {
386                    stats.suspicious_nodes += 1;
387
388                    // Check if node should be marked as failed
389                    if node_status.last_seen.elapsed() > self.config.heartbeat_timeout {
390                        self.handle_node_failure(&node_id).await?;
391                    }
392                }
393                NodeState::Failed => {
394                    stats.failed_nodes += 1;
395                }
396                _ => {}
397            }
398        }
399
400        // Calculate churn rate
401        if stats.total_nodes > 0 {
402            stats.churn_rate = stats.failed_nodes as f64 / stats.total_nodes as f64;
403        }
404
405        // Check if network is experiencing high churn
406        if stats.churn_rate > self.config.max_churn_rate {
407            self.handle_high_churn().await?;
408        }
409
410        Ok(())
411    }
412
413    /// Handle imminent node departure (predicted)
414    async fn handle_imminent_departure(&self, node_id: &NodeId) -> Result<()> {
415        // log::info!("Handling imminent departure for node {:?}", node_id);
416
417        // 1. Mark node as departing
418        self.node_monitor
419            .update_node_state(node_id, NodeState::Departing)
420            .await;
421
422        // 2. Get content stored by this node
423        let stored_content = self.get_content_stored_by(node_id).await?;
424
425        // 3. Start aggressive replication
426        for content_hash in stored_content {
427            self.recovery_manager
428                .increase_replication(&content_hash, RecoveryPriority::High)
429                .await?;
430        }
431
432        // 4. Reroute ongoing connections
433        self.router.mark_node_unreliable(node_id).await;
434
435        // 5. Notify network via gossip
436        let message = GossipMessage {
437            topic: "node_departing".to_string(),
438            data: bincode::serialize(&node_id)
439                .map_err(|e| anyhow::anyhow!("Serialization error: {}", e))?,
440            from: self.node_id.clone(),
441            seqno: 0, // Will be set by gossip subsystem
442            timestamp: std::time::SystemTime::now()
443                .duration_since(std::time::UNIX_EPOCH)
444                .map(|d| d.as_secs())
445                .unwrap_or(0),
446        };
447        self.gossip.publish("node_departing", message).await?;
448
449        Ok(())
450    }
451
452    /// Handle confirmed node failure
453    async fn handle_node_failure(&self, node_id: &NodeId) -> Result<()> {
454        let start_time = Instant::now();
455        // log::warn!("Handling node failure for {:?}", node_id);
456
457        // 1. Mark node as failed
458        self.node_monitor
459            .update_node_state(node_id, NodeState::Failed)
460            .await;
461
462        // 2. Remove from all routing structures
463        self.remove_from_routing_tables(node_id).await?;
464
465        // 3. Identify lost content
466        let lost_content = self.identify_lost_content(node_id).await?;
467
468        // 4. Queue recovery tasks with grace period consideration
469        let grace_config = ReplicationGracePeriodConfig::default();
470        tracing::info!(
471            "Node {} failed, queuing recovery for {} content items with {}s grace period",
472            node_id,
473            lost_content.len(),
474            grace_config.grace_period_duration.as_secs()
475        );
476
477        for content_hash in lost_content {
478            self.recovery_manager
479                .queue_recovery_with_grace_period(
480                    content_hash,
481                    vec![node_id.clone()],
482                    RecoveryPriority::Critical,
483                    &grace_config,
484                )
485                .await?;
486        }
487
488        // 5. Update trust scores
489        self.penalize_unexpected_departure(node_id).await;
490
491        // 6. Trigger topology rebalancing
492        self.trigger_topology_rebalance().await?;
493
494        // Update stats
495        let mut stats = self.stats.write().await;
496        stats.failed_nodes += 1;
497        let detection_time = start_time.elapsed().as_millis() as f64;
498        stats.avg_detection_time_ms =
499            (stats.avg_detection_time_ms * (stats.failed_nodes - 1) as f64 + detection_time)
500                / stats.failed_nodes as f64;
501
502        // Update grace period metrics
503        if self
504            .recovery_manager
505            .node_failure_tracker
506            .read()
507            .await
508            .is_some()
509        {
510            stats.grace_period_preventions += 1; // Assuming this failure used grace period
511        }
512
513        Ok(())
514    }
515
516    /// Handle high churn conditions
517    async fn handle_high_churn(&self) -> Result<()> {
518        // log::warn!("Network experiencing high churn, entering defensive mode");
519
520        // 1. Increase replication factors globally
521        self.replication_manager
522            .increase_global_replication(1.5)
523            .await;
524
525        // 2. Reduce gossip fanout to conserve bandwidth
526        self.gossip.reduce_fanout(0.75).await;
527
528        // 3. Enable aggressive caching
529        self.router.enable_aggressive_caching().await;
530
531        // 4. Notify applications of degraded conditions
532        let churn_rate = self.stats.read().await.churn_rate;
533        let message = GossipMessage {
534            topic: "high_churn_alert".to_string(),
535            data: bincode::serialize(&churn_rate)
536                .map_err(|e| anyhow::anyhow!("Serialization error: {}", e))?,
537            from: self.node_id.clone(),
538            seqno: 0, // Will be set by gossip subsystem
539            timestamp: std::time::SystemTime::now()
540                .duration_since(std::time::UNIX_EPOCH)
541                .map(|d| d.as_secs())
542                .unwrap_or(0),
543        };
544        self.gossip.publish("high_churn_alert", message).await?;
545
546        Ok(())
547    }
548
549    /// Remove failed node from routing tables
550    async fn remove_from_routing_tables(&self, node_id: &NodeId) -> Result<()> {
551        // Remove from Kademlia routing table
552        self.router.remove_node(node_id).await;
553
554        // Remove from hyperbolic space
555        self.router.remove_hyperbolic_coordinate(node_id).await;
556
557        // Remove from SOM
558        self.router.remove_from_som(node_id).await;
559
560        // Remove from trust system
561        self.trust_provider.remove_node(node_id);
562
563        Ok(())
564    }
565
566    /// Get content stored by a specific node
567    async fn get_content_stored_by(&self, node_id: &NodeId) -> Result<Vec<ContentHash>> {
568        let status = self.node_monitor.get_node_status(node_id).await;
569        Ok(status.stored_content.into_iter().collect())
570    }
571
572    /// Identify content that may be lost due to node failure
573    async fn identify_lost_content(&self, failed_node: &NodeId) -> Result<Vec<ContentHash>> {
574        let all_content = self.get_content_stored_by(failed_node).await?;
575        let mut at_risk_content = Vec::new();
576
577        for content_hash in all_content {
578            let remaining_replicas = self
579                .recovery_manager
580                .get_remaining_replicas(&content_hash, failed_node)
581                .await?;
582
583            // If below minimum replication factor, mark as at risk
584            if remaining_replicas < 5 {
585                at_risk_content.push(content_hash);
586            }
587        }
588
589        Ok(at_risk_content)
590    }
591
592    /// Penalize node for unexpected departure
593    async fn penalize_unexpected_departure(&self, node_id: &NodeId) {
594        self.trust_provider.update_trust(
595            &NodeId { hash: [0u8; 32] }, // System node
596            node_id,
597            false, // Negative interaction
598        );
599    }
600
601    /// Trigger topology rebalancing after failures
602    async fn trigger_topology_rebalance(&self) -> Result<()> {
603        // Adjust hyperbolic coordinates
604        self.router.rebalance_hyperbolic_space().await;
605
606        // Update SOM grid if needed
607        self.router.update_som_grid().await;
608
609        // Recompute trust scores
610        self.router.trigger_trust_recomputation().await;
611
612        Ok(())
613    }
614
615    /// Handle heartbeat from a node
616    pub async fn handle_heartbeat(&self, node_id: &NodeId) -> Result<()> {
617        self.node_monitor.record_heartbeat(node_id).await;
618        Ok(())
619    }
620
621    /// Handle gossip activity from a node
622    pub async fn handle_gossip_activity(&self, node_id: &NodeId) -> Result<()> {
623        self.node_monitor.record_gossip_activity(node_id).await;
624        Ok(())
625    }
626
627    /// Get churn statistics
628    pub async fn get_stats(&self) -> ChurnStats {
629        self.stats.read().await.clone()
630    }
631
632    /// Clone for spawning tasks
633    fn clone_for_task(&self) -> Self {
634        Self {
635            node_id: self.node_id.clone(),
636            predictor: self.predictor.clone(),
637            node_monitor: self.node_monitor.clone(),
638            recovery_manager: self.recovery_manager.clone(),
639            trust_provider: self.trust_provider.clone(),
640            replication_manager: self.replication_manager.clone(),
641            router: self.router.clone(),
642            gossip: self.gossip.clone(),
643            config: self.config.clone(),
644            stats: self.stats.clone(),
645        }
646    }
647}
648
649impl NodeMonitor {
650    /// Create a new node monitor
651    pub fn new(config: ChurnConfig) -> Self {
652        Self {
653            node_status: Arc::new(RwLock::new(HashMap::new())),
654            heartbeats: Arc::new(RwLock::new(HashMap::new())),
655            config,
656        }
657    }
658
659    /// Get all monitored nodes
660    pub async fn get_all_nodes(&self) -> Vec<NodeId> {
661        self.node_status.read().await.keys().cloned().collect()
662    }
663
664    /// Get node status
665    pub async fn get_node_status(&self, node_id: &NodeId) -> NodeStatus {
666        self.node_status
667            .read()
668            .await
669            .get(node_id)
670            .cloned()
671            .unwrap_or(NodeStatus {
672                node_id: node_id.clone(),
673                last_seen: Instant::now(),
674                last_heartbeat: None,
675                last_gossip: None,
676                status: NodeState::Failed,
677                reliability: 0.0,
678                stored_content: HashSet::new(),
679            })
680    }
681
682    /// Update node state
683    pub async fn update_node_state(&self, node_id: &NodeId, state: NodeState) {
684        if let Some(status) = self.node_status.write().await.get_mut(node_id) {
685            status.status = state;
686        }
687    }
688
689    /// Record heartbeat from node
690    pub async fn record_heartbeat(&self, node_id: &NodeId) {
691        let now = Instant::now();
692        self.heartbeats.write().await.insert(node_id.clone(), now);
693
694        let mut status_map = self.node_status.write().await;
695        let status = status_map
696            .entry(node_id.clone())
697            .or_insert_with(|| NodeStatus {
698                node_id: node_id.clone(),
699                last_seen: now,
700                last_heartbeat: None,
701                last_gossip: None,
702                status: NodeState::Active,
703                reliability: 1.0,
704                stored_content: HashSet::new(),
705            });
706        status.last_heartbeat = Some(now);
707        status.last_seen = now;
708        status.status = NodeState::Active;
709    }
710
711    /// Record gossip activity
712    pub async fn record_gossip_activity(&self, node_id: &NodeId) {
713        let now = Instant::now();
714
715        if let Some(status) = self.node_status.write().await.get_mut(node_id) {
716            status.last_gossip = Some(now);
717            status.last_seen = now;
718        }
719    }
720
721    /// Check if node is alive
722    pub async fn is_alive(&self, node_id: &NodeId) -> bool {
723        if let Some(last_heartbeat) = self.heartbeats.read().await.get(node_id) {
724            last_heartbeat.elapsed() < self.config.heartbeat_timeout
725        } else {
726            false
727        }
728    }
729}
730
731impl Default for RecoveryManager {
732    fn default() -> Self {
733        Self::new()
734    }
735}
736
737impl RecoveryManager {
738    /// Create a new recovery manager
739    pub fn new() -> Self {
740        Self {
741            content_tracker: Arc::new(RwLock::new(HashMap::new())),
742            recovery_queue: Arc::new(RwLock::new(Vec::new())),
743            _active_recoveries: Arc::new(RwLock::new(HashMap::new())),
744            node_failure_tracker: Arc::new(RwLock::new(None)),
745        }
746    }
747
748    /// Create a new recovery manager with node failure tracker
749    pub fn with_failure_tracker(failure_tracker: Arc<dyn NodeFailureTracker>) -> Self {
750        Self {
751            content_tracker: Arc::new(RwLock::new(HashMap::new())),
752            recovery_queue: Arc::new(RwLock::new(Vec::new())),
753            _active_recoveries: Arc::new(RwLock::new(HashMap::new())),
754            node_failure_tracker: Arc::new(RwLock::new(Some(failure_tracker))),
755        }
756    }
757
758    /// Set the node failure tracker
759    pub async fn set_failure_tracker(&self, failure_tracker: Arc<dyn NodeFailureTracker>) {
760        *self.node_failure_tracker.write().await = Some(failure_tracker);
761    }
762
763    /// Increase replication for content
764    pub async fn increase_replication(
765        &self,
766        content_hash: &ContentHash,
767        priority: RecoveryPriority,
768    ) -> Result<()> {
769        // Queue a replication task
770        self.queue_recovery(*content_hash, vec![], priority).await
771    }
772
773    /// Queue content for recovery
774    pub async fn queue_recovery(
775        &self,
776        content_hash: ContentHash,
777        failed_nodes: Vec<NodeId>,
778        priority: RecoveryPriority,
779    ) -> Result<()> {
780        let task = RecoveryTask {
781            content_hash,
782            failed_nodes,
783            priority,
784            created_at: Instant::now(),
785        };
786
787        let mut queue = self.recovery_queue.write().await;
788        queue.push(task);
789
790        // Sort by priority
791        queue.sort_by_key(|task| std::cmp::Reverse(task.priority));
792
793        Ok(())
794    }
795
796    /// Get remaining replicas for content
797    pub async fn get_remaining_replicas(
798        &self,
799        content_hash: &ContentHash,
800        exclude_node: &NodeId,
801    ) -> Result<u32> {
802        if let Some(tracker) = self.content_tracker.read().await.get(content_hash) {
803            let remaining = tracker
804                .storing_nodes
805                .iter()
806                .filter(|&n| n != exclude_node)
807                .count() as u32;
808            Ok(remaining)
809        } else {
810            Ok(0)
811        }
812    }
813
814    /// Queue recovery with grace period consideration
815    pub async fn queue_recovery_with_grace_period(
816        &self,
817        content_hash: ContentHash,
818        failed_nodes: Vec<NodeId>,
819        priority: RecoveryPriority,
820        config: &ReplicationGracePeriodConfig,
821    ) -> Result<()> {
822        if failed_nodes.is_empty() {
823            return self
824                .queue_recovery(content_hash, failed_nodes, priority)
825                .await;
826        }
827
828        if let Some(ref failure_tracker) = *self.node_failure_tracker.read().await {
829            // Record failures and check grace periods
830            for node_id in &failed_nodes {
831                failure_tracker
832                    .record_node_failure(
833                        node_id.clone(),
834                        crate::dht::replication_grace_period::NodeFailureReason::NetworkTimeout,
835                        config,
836                    )
837                    .await?;
838            }
839
840            let mut immediate_recovery_nodes = Vec::new();
841            let mut delayed_recovery_nodes = Vec::new();
842
843            for node_id in &failed_nodes {
844                if failure_tracker.should_start_replication(node_id).await {
845                    immediate_recovery_nodes.push(node_id.clone());
846                } else {
847                    delayed_recovery_nodes.push(node_id.clone());
848                }
849            }
850
851            // Queue immediate recovery for nodes past grace period
852            if !immediate_recovery_nodes.is_empty() {
853                tracing::info!(
854                    "Queuing immediate recovery for {} nodes (past grace period) for content {:?}",
855                    immediate_recovery_nodes.len(),
856                    content_hash
857                );
858                self.queue_recovery(content_hash, immediate_recovery_nodes, priority)
859                    .await?;
860            }
861
862            // Schedule delayed checks for nodes in grace period
863            if !delayed_recovery_nodes.is_empty() {
864                tracing::info!(
865                    "Scheduling delayed recovery check for {} nodes (in grace period) for content {:?}",
866                    delayed_recovery_nodes.len(),
867                    content_hash
868                );
869                self.schedule_grace_period_check(
870                    content_hash,
871                    delayed_recovery_nodes,
872                    priority,
873                    failure_tracker.clone(),
874                )
875                .await?;
876            }
877
878            Ok(())
879        } else {
880            // No failure tracker, use immediate recovery
881            self.queue_recovery(content_hash, failed_nodes, priority)
882                .await
883        }
884    }
885
886    /// Schedule a delayed recovery check for multiple nodes
887    async fn schedule_grace_period_check(
888        &self,
889        content_hash: ContentHash,
890        failed_nodes: Vec<NodeId>,
891        _priority: RecoveryPriority,
892        failure_tracker: Arc<dyn NodeFailureTracker>,
893    ) -> Result<()> {
894        let recovery_manager = Arc::downgrade(&self.content_tracker);
895
896        tokio::spawn(async move {
897            // Wait for grace period to potentially expire (5 minutes + 10 second buffer)
898            tokio::time::sleep(Duration::from_secs(310)).await;
899
900            if let Some(_tracker) = recovery_manager.upgrade() {
901                // Check again if replication should start for any nodes
902                let mut nodes_to_recover = Vec::new();
903
904                for node_id in &failed_nodes {
905                    if failure_tracker.should_start_replication(node_id).await {
906                        nodes_to_recover.push(node_id.clone());
907                    }
908                }
909
910                if !nodes_to_recover.is_empty() {
911                    // Create a new RecoveryManager instance to queue recovery
912                    // In practice, this would be handled by the owning ChurnHandler
913                    if !nodes_to_recover.is_empty() {
914                        tracing::info!(
915                            "Grace period expired for {} nodes, queuing recovery for content {:?}",
916                            nodes_to_recover.len(),
917                            content_hash
918                        );
919                    } else {
920                        tracing::debug!(
921                            "Grace period check completed for content {:?}, no nodes require recovery",
922                            content_hash
923                        );
924                    }
925                }
926            }
927        });
928
929        Ok(())
930    }
931
932    /// Check if a node should be recovered immediately or wait for grace period
933    pub async fn should_recover_node(&self, node_id: &NodeId) -> bool {
934        if let Some(ref failure_tracker) = *self.node_failure_tracker.read().await {
935            failure_tracker.should_start_replication(node_id).await
936        } else {
937            // No failure tracker, always recover immediately
938            true
939        }
940    }
941}
942
943#[cfg(test)]
944mod tests {
945    use super::*;
946    use crate::adaptive::trust::MockTrustProvider;
947    use rand::RngCore;
948
949    async fn create_test_churn_handler() -> ChurnHandler {
950        let predictor = Arc::new(ChurnPredictor::new());
951        let trust_provider = Arc::new(MockTrustProvider::new());
952        let replication_manager = Arc::new(ReplicationManager::new(
953            Default::default(),
954            trust_provider.clone(),
955            predictor.clone(),
956            Arc::new(AdaptiveRouter::new(trust_provider.clone())),
957        ));
958        let router = Arc::new(AdaptiveRouter::new(trust_provider.clone()));
959        // Create a test NodeId
960        use crate::peer_record::UserId;
961        let mut hash = [0u8; 32];
962        rand::thread_rng().fill_bytes(&mut hash);
963        let node_id = UserId::from_bytes(hash);
964
965        let gossip = Arc::new(AdaptiveGossipSub::new(
966            node_id.clone(),
967            trust_provider.clone(),
968        ));
969
970        // Create default ChurnConfig
971        let config = ChurnConfig {
972            heartbeat_timeout: Duration::from_secs(30),
973            gossip_timeout: Duration::from_secs(300),
974            monitoring_interval: Duration::from_secs(5),
975            prediction_threshold: 0.7,
976            max_churn_rate: 0.2,
977        };
978
979        ChurnHandler::new(
980            node_id,
981            predictor,
982            trust_provider,
983            replication_manager,
984            router,
985            gossip,
986            config,
987        )
988    }
989
990    #[tokio::test]
991    async fn test_node_monitoring() {
992        let handler = create_test_churn_handler().await;
993        let node_id = NodeId { hash: [1u8; 32] };
994
995        // Record heartbeat
996        handler.node_monitor.record_heartbeat(&node_id).await;
997
998        // Check node is alive
999        assert!(handler.node_monitor.is_alive(&node_id).await);
1000
1001        // Check status
1002        let status = handler.node_monitor.get_node_status(&node_id).await;
1003        assert_eq!(status.status, NodeState::Active);
1004        assert!(status.last_heartbeat.is_some());
1005    }
1006
1007    #[tokio::test]
1008    async fn test_failure_detection() {
1009        let mut handler = create_test_churn_handler().await;
1010        // Short timeout for testing
1011        handler.config.heartbeat_timeout = Duration::from_millis(100);
1012        // Ensure NodeMonitor uses the same short timeout
1013        if let Some(nm) = Arc::get_mut(&mut handler.node_monitor) {
1014            nm.config.heartbeat_timeout = Duration::from_millis(100);
1015        }
1016        let node_id = NodeId { hash: [1u8; 32] };
1017
1018        // Record initial heartbeat
1019        handler.handle_heartbeat(&node_id).await.unwrap();
1020
1021        // Wait for timeout
1022        tokio::time::sleep(Duration::from_millis(150)).await;
1023
1024        // Node should no longer be alive
1025        assert!(!handler.node_monitor.is_alive(&node_id).await);
1026    }
1027
1028    #[tokio::test]
1029    async fn test_proactive_replication() {
1030        let handler = create_test_churn_handler().await;
1031        let node_id = NodeId { hash: [1u8; 32] };
1032
1033        // Add node with high churn probability
1034        // Ensure node is tracked as active first
1035        handler.node_monitor.record_heartbeat(&node_id).await;
1036        handler
1037            .predictor
1038            .update_node_features(
1039                &node_id,
1040                vec![
1041                    0.1, // Low online duration
1042                    0.9, // High response time
1043                    0.1, // Low resource contribution
1044                    0.1, // Low message frequency
1045                    0.0, // Time of day
1046                    0.0, // Day of week
1047                    0.1, // Low historical reliability
1048                    0.0, 0.0, 0.0,
1049                ],
1050            )
1051            .await
1052            .unwrap();
1053
1054        // Handle imminent departure
1055        handler.handle_imminent_departure(&node_id).await.unwrap();
1056
1057        // Check node marked as departing
1058        let status = handler.node_monitor.get_node_status(&node_id).await;
1059        assert_eq!(status.status, NodeState::Departing);
1060    }
1061
1062    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1063    async fn test_churn_rate_calculation() {
1064        let mut handler = create_test_churn_handler().await;
1065        // Avoid triggering high-churn handling which can slow tests
1066        handler.config.max_churn_rate = 1.0;
1067
1068        // Add some nodes
1069        for i in 0..10 {
1070            let node_id = NodeId { hash: [i; 32] };
1071            handler.handle_heartbeat(&node_id).await.unwrap();
1072        }
1073
1074        // Mark some as failed
1075        for i in 0..3 {
1076            let node_id = NodeId { hash: [i; 32] };
1077            handler
1078                .node_monitor
1079                .update_node_state(&node_id, NodeState::Failed)
1080                .await;
1081        }
1082
1083        // Run monitoring cycle with a strict timeout to avoid hangs
1084        let res =
1085            tokio::time::timeout(std::time::Duration::from_secs(30), handler.monitor_cycle()).await;
1086        assert!(res.is_ok(), "monitor_cycle timed out");
1087        res.unwrap().unwrap();
1088
1089        // Check stats
1090        let stats = handler.get_stats().await;
1091        assert_eq!(stats.total_nodes, 10);
1092        assert_eq!(stats.failed_nodes, 3);
1093        assert!((stats.churn_rate - 0.3).abs() < 0.01); // 30% churn rate
1094    }
1095}