saorsa_core/adaptive/
churn.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: saorsalabs@gmail.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Churn handling and recovery system
15//!
16//! This module implements churn detection and recovery mechanisms:
17//! - Node failure detection with 30-second heartbeat timeout
18//! - Proactive content replication based on churn predictions
19//! - Routing table repair and maintenance
20//! - Trust score updates for unexpected departures
21//! - Topology rebalancing for network health
22//! - Graceful degradation under high churn
23
24use crate::adaptive::{
25    ContentHash, NodeId, TrustProvider,
26    gossip::{AdaptiveGossipSub, GossipMessage},
27    learning::ChurnPredictor,
28    replication::ReplicationManager,
29    routing::AdaptiveRouter,
30};
31use crate::dht::{
32    NodeFailureTracker, ReplicationGracePeriodConfig,
33};
34use anyhow::Result;
35use std::{
36    collections::{HashMap, HashSet},
37    sync::Arc,
38    time::{Duration, Instant},
39};
40use tokio::sync::RwLock;
41
42/// Churn detection and recovery system
43pub struct ChurnHandler {
44    /// Local node ID
45    node_id: NodeId,
46
47    /// Churn predictor for proactive measures
48    predictor: Arc<ChurnPredictor>,
49
50    /// Node monitoring system
51    node_monitor: Arc<NodeMonitor>,
52
53    /// Recovery manager for content
54    recovery_manager: Arc<RecoveryManager>,
55
56    /// Trust provider for reputation updates
57    trust_provider: Arc<dyn TrustProvider>,
58
59    /// Replication manager
60    replication_manager: Arc<ReplicationManager>,
61
62    /// Routing system
63    router: Arc<AdaptiveRouter>,
64
65    /// Gossip system for announcements
66    gossip: Arc<AdaptiveGossipSub>,
67
68    /// Configuration
69    config: ChurnConfig,
70
71    /// Churn statistics
72    stats: Arc<RwLock<ChurnStats>>,
73}
74
75/// Configuration for churn handling
76#[derive(Debug, Clone)]
77pub struct ChurnConfig {
78    /// Heartbeat timeout (default: 30 seconds)
79    pub heartbeat_timeout: Duration,
80
81    /// Gossip absence timeout (default: 5 minutes)
82    pub gossip_timeout: Duration,
83
84    /// Prediction threshold for proactive measures (default: 0.7)
85    pub prediction_threshold: f64,
86
87    /// Monitoring interval (default: 30 seconds)
88    pub monitoring_interval: Duration,
89
90    /// Maximum acceptable churn rate (default: 30%)
91    pub max_churn_rate: f64,
92}
93
94impl Default for ChurnConfig {
95    fn default() -> Self {
96        Self {
97            heartbeat_timeout: Duration::from_secs(30),
98            gossip_timeout: Duration::from_secs(300),
99            prediction_threshold: 0.7,
100            monitoring_interval: Duration::from_secs(30),
101            max_churn_rate: 0.3,
102        }
103    }
104}
105
106/// Node monitoring system
107pub struct NodeMonitor {
108    /// Node status tracking
109    node_status: Arc<RwLock<HashMap<NodeId, NodeStatus>>>,
110
111    /// Heartbeat tracking
112    heartbeats: Arc<RwLock<HashMap<NodeId, Instant>>>,
113
114    /// Configuration
115    config: ChurnConfig,
116}
117
118/// Status of a monitored node
119#[derive(Debug, Clone)]
120pub struct NodeStatus {
121    /// Node identifier
122    pub node_id: NodeId,
123
124    /// Last seen timestamp
125    pub last_seen: Instant,
126
127    /// Last heartbeat received
128    pub last_heartbeat: Option<Instant>,
129
130    /// Last gossip activity
131    pub last_gossip: Option<Instant>,
132
133    /// Current status
134    pub status: NodeState,
135
136    /// Reliability score (0.0-1.0)
137    pub reliability: f64,
138
139    /// Content stored by this node
140    pub stored_content: HashSet<ContentHash>,
141}
142
143/// Node state in the network
144#[derive(Debug, Clone, PartialEq)]
145pub enum NodeState {
146    /// Node is active and healthy
147    Active,
148
149    /// Node is suspicious (missed heartbeats)
150    Suspicious,
151
152    /// Node is departing (predicted or announced)
153    Departing,
154
155    /// Node has failed
156    Failed,
157}
158
159/// Recovery manager for handling failures
160pub struct RecoveryManager {
161    /// Content tracking
162    content_tracker: Arc<RwLock<HashMap<ContentHash, ContentTracker>>>,
163
164    /// Recovery queue
165    recovery_queue: Arc<RwLock<Vec<RecoveryTask>>>,
166
167    /// Active recoveries
168    _active_recoveries: Arc<RwLock<HashMap<ContentHash, RecoveryStatus>>>,
169
170    /// Node failure tracker for grace period management
171    node_failure_tracker: Arc<RwLock<Option<Arc<dyn NodeFailureTracker>>>>,
172}
173
174/// Content tracking information
175#[derive(Debug, Clone)]
176#[allow(dead_code)]
177struct ContentTracker {
178    /// Content hash
179    hash: ContentHash,
180
181    /// Nodes storing this content
182    storing_nodes: HashSet<NodeId>,
183
184    /// Target replication factor
185    target_replicas: u32,
186
187    /// Last verification time
188    last_verified: Instant,
189}
190
191/// Recovery task
192#[derive(Debug, Clone)]
193#[allow(dead_code)]
194struct RecoveryTask {
195    /// Content to recover
196    content_hash: ContentHash,
197
198    /// Failed nodes
199    failed_nodes: Vec<NodeId>,
200
201    /// Priority level
202    priority: RecoveryPriority,
203
204    /// Creation time
205    created_at: Instant,
206}
207
208/// Recovery priority levels
209#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
210pub enum RecoveryPriority {
211    /// Low priority - can wait
212    Low,
213
214    /// Normal priority
215    Normal,
216
217    /// High priority - important content
218    High,
219
220    /// Critical - immediate action needed
221    Critical,
222}
223
224/// Recovery status
225#[derive(Debug, Clone)]
226#[allow(dead_code)]
227struct RecoveryStatus {
228    /// Start time
229    started_at: Instant,
230
231    /// Nodes contacted for recovery
232    contacted_nodes: Vec<NodeId>,
233
234    /// Successful recoveries
235    successful_nodes: Vec<NodeId>,
236
237    /// Failed attempts
238    failed_attempts: u32,
239}
240
241/// Churn handling statistics
242#[derive(Debug, Default, Clone)]
243pub struct ChurnStats {
244    /// Total nodes monitored
245    pub total_nodes: u64,
246
247    /// Active nodes
248    pub active_nodes: u64,
249
250    /// Failed nodes
251    pub failed_nodes: u64,
252
253    /// Suspicious nodes
254    pub suspicious_nodes: u64,
255
256    /// Current churn rate
257    pub churn_rate: f64,
258
259    /// Successful recoveries
260    pub successful_recoveries: u64,
261
262    /// Failed recoveries
263    pub failed_recoveries: u64,
264
265    /// Proactive replications
266    pub proactive_replications: u64,
267
268    /// Average detection time
269    pub avg_detection_time_ms: f64,
270
271    /// Grace period prevented replications
272    pub grace_period_preventions: u64,
273
274    /// Successful node re-registrations during grace period
275    pub successful_reregistrations: u64,
276
277    /// Average grace period duration for re-registered nodes
278    pub avg_grace_period_duration_ms: f64,
279}
280
281impl ChurnHandler {
282    /// Create a new churn handler
283    pub fn new(
284        node_id: NodeId,
285        predictor: Arc<ChurnPredictor>,
286        trust_provider: Arc<dyn TrustProvider>,
287        replication_manager: Arc<ReplicationManager>,
288        router: Arc<AdaptiveRouter>,
289        gossip: Arc<AdaptiveGossipSub>,
290        config: ChurnConfig,
291    ) -> Self {
292        let node_monitor = Arc::new(NodeMonitor::new(config.clone()));
293        let recovery_manager = Arc::new(RecoveryManager::new());
294
295        Self {
296            node_id,
297            predictor,
298            node_monitor,
299            recovery_manager,
300            trust_provider,
301            replication_manager,
302            router,
303            gossip,
304            config,
305            stats: Arc::new(RwLock::new(ChurnStats::default())),
306        }
307    }
308
309    /// Create a new churn handler with node failure tracker for grace periods
310    pub fn with_failure_tracker(
311        node_id: NodeId,
312        predictor: Arc<ChurnPredictor>,
313        trust_provider: Arc<dyn TrustProvider>,
314        replication_manager: Arc<ReplicationManager>,
315        router: Arc<AdaptiveRouter>,
316        gossip: Arc<AdaptiveGossipSub>,
317        config: ChurnConfig,
318        failure_tracker: Arc<dyn NodeFailureTracker>,
319    ) -> Self {
320        let node_monitor = Arc::new(NodeMonitor::new(config.clone()));
321        let recovery_manager = Arc::new(RecoveryManager::with_failure_tracker(failure_tracker));
322
323        Self {
324            node_id,
325            predictor,
326            node_monitor,
327            recovery_manager,
328            trust_provider,
329            replication_manager,
330            router,
331            gossip,
332            config,
333            stats: Arc::new(RwLock::new(ChurnStats::default())),
334        }
335    }
336
337    /// Set the node failure tracker for grace period management
338    pub async fn set_failure_tracker(&self, failure_tracker: Arc<dyn NodeFailureTracker>) {
339        self.recovery_manager.set_failure_tracker(failure_tracker).await;
340    }
341
342    /// Start monitoring network for churn
343    pub async fn start_monitoring(&self) {
344        let monitoring_interval = self.config.monitoring_interval;
345        let handler = self.clone_for_task();
346
347        tokio::spawn(async move {
348            let mut interval = tokio::time::interval(monitoring_interval);
349
350            loop {
351                interval.tick().await;
352
353                if let Err(_e) = handler.monitor_cycle().await {
354                    // log::error!("Churn monitoring error: {}", e);
355                }
356            }
357        });
358    }
359
360    /// Perform one monitoring cycle
361    async fn monitor_cycle(&self) -> Result<()> {
362        let nodes = self.node_monitor.get_all_nodes().await;
363        let mut stats = self.stats.write().await;
364
365        stats.total_nodes = nodes.len() as u64;
366        stats.active_nodes = 0;
367        stats.suspicious_nodes = 0;
368        stats.failed_nodes = 0;
369
370        for node_id in nodes {
371            let node_status = self.node_monitor.get_node_status(&node_id).await;
372
373            match node_status.status {
374                NodeState::Active => {
375                    stats.active_nodes += 1;
376
377                    // Check churn prediction
378                    let prediction = self.predictor.predict(&node_id).await;
379
380                    if prediction.probability_1h > self.config.prediction_threshold {
381                        self.handle_imminent_departure(&node_id).await?;
382                        stats.proactive_replications += 1;
383                    }
384                }
385                NodeState::Suspicious => {
386                    stats.suspicious_nodes += 1;
387
388                    // Check if node should be marked as failed
389                    if node_status.last_seen.elapsed() > self.config.heartbeat_timeout {
390                        self.handle_node_failure(&node_id).await?;
391                    }
392                }
393                NodeState::Failed => {
394                    stats.failed_nodes += 1;
395                }
396                _ => {}
397            }
398        }
399
400        // Calculate churn rate
401        if stats.total_nodes > 0 {
402            stats.churn_rate = stats.failed_nodes as f64 / stats.total_nodes as f64;
403        }
404
405        // Check if network is experiencing high churn
406        if stats.churn_rate > self.config.max_churn_rate {
407            self.handle_high_churn().await?;
408        }
409
410        Ok(())
411    }
412
413    /// Handle imminent node departure (predicted)
414    async fn handle_imminent_departure(&self, node_id: &NodeId) -> Result<()> {
415        // log::info!("Handling imminent departure for node {:?}", node_id);
416
417        // 1. Mark node as departing
418        self.node_monitor
419            .update_node_state(node_id, NodeState::Departing)
420            .await;
421
422        // 2. Get content stored by this node
423        let stored_content = self.get_content_stored_by(node_id).await?;
424
425        // 3. Start aggressive replication
426        for content_hash in stored_content {
427            self.recovery_manager
428                .increase_replication(&content_hash, RecoveryPriority::High)
429                .await?;
430        }
431
432        // 4. Reroute ongoing connections
433        self.router.mark_node_unreliable(node_id).await;
434
435        // 5. Notify network via gossip
436        let message = GossipMessage {
437            topic: "node_departing".to_string(),
438            data: bincode::serialize(&node_id)
439                .map_err(|e| anyhow::anyhow!("Serialization error: {}", e))?,
440            from: self.node_id.clone(),
441            seqno: 0, // Will be set by gossip subsystem
442            timestamp: std::time::SystemTime::now()
443                .duration_since(std::time::UNIX_EPOCH)
444                .map(|d| d.as_secs())
445                .unwrap_or(0),
446        };
447        self.gossip.publish("node_departing", message).await?;
448
449        Ok(())
450    }
451
452    /// Handle confirmed node failure
453    async fn handle_node_failure(&self, node_id: &NodeId) -> Result<()> {
454        let start_time = Instant::now();
455        // log::warn!("Handling node failure for {:?}", node_id);
456
457        // 1. Mark node as failed
458        self.node_monitor
459            .update_node_state(node_id, NodeState::Failed)
460            .await;
461
462        // 2. Remove from all routing structures
463        self.remove_from_routing_tables(node_id).await?;
464
465        // 3. Identify lost content
466        let lost_content = self.identify_lost_content(node_id).await?;
467
468        // 4. Queue recovery tasks with grace period consideration
469        let grace_config = ReplicationGracePeriodConfig::default();
470        tracing::info!("Node {} failed, queuing recovery for {} content items with {}s grace period",
471                      node_id, lost_content.len(), grace_config.grace_period_duration.as_secs());
472
473        for content_hash in lost_content {
474            self.recovery_manager
475                .queue_recovery_with_grace_period(
476                    content_hash,
477                    vec![node_id.clone()],
478                    RecoveryPriority::Critical,
479                    &grace_config,
480                )
481                .await?;
482        }
483
484        // 5. Update trust scores
485        self.penalize_unexpected_departure(node_id).await;
486
487        // 6. Trigger topology rebalancing
488        self.trigger_topology_rebalance().await?;
489
490        // Update stats
491        let mut stats = self.stats.write().await;
492        stats.failed_nodes += 1;
493        let detection_time = start_time.elapsed().as_millis() as f64;
494        stats.avg_detection_time_ms =
495            (stats.avg_detection_time_ms * (stats.failed_nodes - 1) as f64 + detection_time)
496                / stats.failed_nodes as f64;
497
498        // Update grace period metrics
499        if self.recovery_manager.node_failure_tracker.read().await.is_some() {
500            stats.grace_period_preventions += 1; // Assuming this failure used grace period
501        }
502
503        Ok(())
504    }
505
506    /// Handle high churn conditions
507    async fn handle_high_churn(&self) -> Result<()> {
508        // log::warn!("Network experiencing high churn, entering defensive mode");
509
510        // 1. Increase replication factors globally
511        self.replication_manager
512            .increase_global_replication(1.5)
513            .await;
514
515        // 2. Reduce gossip fanout to conserve bandwidth
516        self.gossip.reduce_fanout(0.75).await;
517
518        // 3. Enable aggressive caching
519        self.router.enable_aggressive_caching().await;
520
521        // 4. Notify applications of degraded conditions
522        let churn_rate = self.stats.read().await.churn_rate;
523        let message = GossipMessage {
524            topic: "high_churn_alert".to_string(),
525            data: bincode::serialize(&churn_rate)
526                .map_err(|e| anyhow::anyhow!("Serialization error: {}", e))?,
527            from: self.node_id.clone(),
528            seqno: 0, // Will be set by gossip subsystem
529            timestamp: std::time::SystemTime::now()
530                .duration_since(std::time::UNIX_EPOCH)
531                .map(|d| d.as_secs())
532                .unwrap_or(0),
533        };
534        self.gossip.publish("high_churn_alert", message).await?;
535
536        Ok(())
537    }
538
539    /// Remove failed node from routing tables
540    async fn remove_from_routing_tables(&self, node_id: &NodeId) -> Result<()> {
541        // Remove from Kademlia routing table
542        self.router.remove_node(node_id).await;
543
544        // Remove from hyperbolic space
545        self.router.remove_hyperbolic_coordinate(node_id).await;
546
547        // Remove from SOM
548        self.router.remove_from_som(node_id).await;
549
550        // Remove from trust system
551        self.trust_provider.remove_node(node_id);
552
553        Ok(())
554    }
555
556    /// Get content stored by a specific node
557    async fn get_content_stored_by(&self, node_id: &NodeId) -> Result<Vec<ContentHash>> {
558        let status = self.node_monitor.get_node_status(node_id).await;
559        Ok(status.stored_content.into_iter().collect())
560    }
561
562    /// Identify content that may be lost due to node failure
563    async fn identify_lost_content(&self, failed_node: &NodeId) -> Result<Vec<ContentHash>> {
564        let all_content = self.get_content_stored_by(failed_node).await?;
565        let mut at_risk_content = Vec::new();
566
567        for content_hash in all_content {
568            let remaining_replicas = self
569                .recovery_manager
570                .get_remaining_replicas(&content_hash, failed_node)
571                .await?;
572
573            // If below minimum replication factor, mark as at risk
574            if remaining_replicas < 5 {
575                at_risk_content.push(content_hash);
576            }
577        }
578
579        Ok(at_risk_content)
580    }
581
582    /// Penalize node for unexpected departure
583    async fn penalize_unexpected_departure(&self, node_id: &NodeId) {
584        self.trust_provider.update_trust(
585            &NodeId { hash: [0u8; 32] }, // System node
586            node_id,
587            false, // Negative interaction
588        );
589    }
590
591    /// Trigger topology rebalancing after failures
592    async fn trigger_topology_rebalance(&self) -> Result<()> {
593        // Adjust hyperbolic coordinates
594        self.router.rebalance_hyperbolic_space().await;
595
596        // Update SOM grid if needed
597        self.router.update_som_grid().await;
598
599        // Recompute trust scores
600        self.router.trigger_trust_recomputation().await;
601
602        Ok(())
603    }
604
605    /// Handle heartbeat from a node
606    pub async fn handle_heartbeat(&self, node_id: &NodeId) -> Result<()> {
607        self.node_monitor.record_heartbeat(node_id).await;
608        Ok(())
609    }
610
611    /// Handle gossip activity from a node
612    pub async fn handle_gossip_activity(&self, node_id: &NodeId) -> Result<()> {
613        self.node_monitor.record_gossip_activity(node_id).await;
614        Ok(())
615    }
616
617    /// Get churn statistics
618    pub async fn get_stats(&self) -> ChurnStats {
619        self.stats.read().await.clone()
620    }
621
622    /// Clone for spawning tasks
623    fn clone_for_task(&self) -> Self {
624        Self {
625            node_id: self.node_id.clone(),
626            predictor: self.predictor.clone(),
627            node_monitor: self.node_monitor.clone(),
628            recovery_manager: self.recovery_manager.clone(),
629            trust_provider: self.trust_provider.clone(),
630            replication_manager: self.replication_manager.clone(),
631            router: self.router.clone(),
632            gossip: self.gossip.clone(),
633            config: self.config.clone(),
634            stats: self.stats.clone(),
635        }
636    }
637}
638
639impl NodeMonitor {
640    /// Create a new node monitor
641    pub fn new(config: ChurnConfig) -> Self {
642        Self {
643            node_status: Arc::new(RwLock::new(HashMap::new())),
644            heartbeats: Arc::new(RwLock::new(HashMap::new())),
645            config,
646        }
647    }
648
649    /// Get all monitored nodes
650    pub async fn get_all_nodes(&self) -> Vec<NodeId> {
651        self.node_status.read().await.keys().cloned().collect()
652    }
653
654    /// Get node status
655    pub async fn get_node_status(&self, node_id: &NodeId) -> NodeStatus {
656        self.node_status
657            .read()
658            .await
659            .get(node_id)
660            .cloned()
661            .unwrap_or(NodeStatus {
662                node_id: node_id.clone(),
663                last_seen: Instant::now(),
664                last_heartbeat: None,
665                last_gossip: None,
666                status: NodeState::Failed,
667                reliability: 0.0,
668                stored_content: HashSet::new(),
669            })
670    }
671
672    /// Update node state
673    pub async fn update_node_state(&self, node_id: &NodeId, state: NodeState) {
674        if let Some(status) = self.node_status.write().await.get_mut(node_id) {
675            status.status = state;
676        }
677    }
678
679    /// Record heartbeat from node
680    pub async fn record_heartbeat(&self, node_id: &NodeId) {
681        let now = Instant::now();
682        self.heartbeats.write().await.insert(node_id.clone(), now);
683
684        let mut status_map = self.node_status.write().await;
685        let status = status_map
686            .entry(node_id.clone())
687            .or_insert_with(|| NodeStatus {
688                node_id: node_id.clone(),
689                last_seen: now,
690                last_heartbeat: None,
691                last_gossip: None,
692                status: NodeState::Active,
693                reliability: 1.0,
694                stored_content: HashSet::new(),
695            });
696        status.last_heartbeat = Some(now);
697        status.last_seen = now;
698        status.status = NodeState::Active;
699    }
700
701    /// Record gossip activity
702    pub async fn record_gossip_activity(&self, node_id: &NodeId) {
703        let now = Instant::now();
704
705        if let Some(status) = self.node_status.write().await.get_mut(node_id) {
706            status.last_gossip = Some(now);
707            status.last_seen = now;
708        }
709    }
710
711    /// Check if node is alive
712    pub async fn is_alive(&self, node_id: &NodeId) -> bool {
713        if let Some(last_heartbeat) = self.heartbeats.read().await.get(node_id) {
714            last_heartbeat.elapsed() < self.config.heartbeat_timeout
715        } else {
716            false
717        }
718    }
719}
720
721impl Default for RecoveryManager {
722    fn default() -> Self {
723        Self::new()
724    }
725}
726
727impl RecoveryManager {
728    /// Create a new recovery manager
729    pub fn new() -> Self {
730        Self {
731            content_tracker: Arc::new(RwLock::new(HashMap::new())),
732            recovery_queue: Arc::new(RwLock::new(Vec::new())),
733            _active_recoveries: Arc::new(RwLock::new(HashMap::new())),
734            node_failure_tracker: Arc::new(RwLock::new(None)),
735        }
736    }
737
738    /// Create a new recovery manager with node failure tracker
739    pub fn with_failure_tracker(failure_tracker: Arc<dyn NodeFailureTracker>) -> Self {
740        Self {
741            content_tracker: Arc::new(RwLock::new(HashMap::new())),
742            recovery_queue: Arc::new(RwLock::new(Vec::new())),
743            _active_recoveries: Arc::new(RwLock::new(HashMap::new())),
744            node_failure_tracker: Arc::new(RwLock::new(Some(failure_tracker))),
745        }
746    }
747
748    /// Set the node failure tracker
749    pub async fn set_failure_tracker(&self, failure_tracker: Arc<dyn NodeFailureTracker>) {
750        *self.node_failure_tracker.write().await = Some(failure_tracker);
751    }
752
753    /// Increase replication for content
754    pub async fn increase_replication(
755        &self,
756        content_hash: &ContentHash,
757        priority: RecoveryPriority,
758    ) -> Result<()> {
759        // Queue a replication task
760        self.queue_recovery(*content_hash, vec![], priority).await
761    }
762
763    /// Queue content for recovery
764    pub async fn queue_recovery(
765        &self,
766        content_hash: ContentHash,
767        failed_nodes: Vec<NodeId>,
768        priority: RecoveryPriority,
769    ) -> Result<()> {
770        let task = RecoveryTask {
771            content_hash,
772            failed_nodes,
773            priority,
774            created_at: Instant::now(),
775        };
776
777        let mut queue = self.recovery_queue.write().await;
778        queue.push(task);
779
780        // Sort by priority
781        queue.sort_by_key(|task| std::cmp::Reverse(task.priority));
782
783        Ok(())
784    }
785
786    /// Get remaining replicas for content
787    pub async fn get_remaining_replicas(
788        &self,
789        content_hash: &ContentHash,
790        exclude_node: &NodeId,
791    ) -> Result<u32> {
792        if let Some(tracker) = self.content_tracker.read().await.get(content_hash) {
793            let remaining = tracker
794                .storing_nodes
795                .iter()
796                .filter(|&n| n != exclude_node)
797                .count() as u32;
798            Ok(remaining)
799        } else {
800            Ok(0)
801        }
802    }
803
804    /// Queue recovery with grace period consideration
805    pub async fn queue_recovery_with_grace_period(
806        &self,
807        content_hash: ContentHash,
808        failed_nodes: Vec<NodeId>,
809        priority: RecoveryPriority,
810        config: &ReplicationGracePeriodConfig,
811    ) -> Result<()> {
812        if failed_nodes.is_empty() {
813            return self.queue_recovery(content_hash, failed_nodes, priority).await;
814        }
815
816        if let Some(ref failure_tracker) = *self.node_failure_tracker.read().await {
817            // Record failures and check grace periods
818            for node_id in &failed_nodes {
819                failure_tracker.record_node_failure(
820                    node_id.clone(),
821                    crate::dht::replication_grace_period::NodeFailureReason::NetworkTimeout,
822                    config,
823                ).await?;
824            }
825
826            let mut immediate_recovery_nodes = Vec::new();
827            let mut delayed_recovery_nodes = Vec::new();
828
829            for node_id in &failed_nodes {
830                if failure_tracker.should_start_replication(node_id).await {
831                    immediate_recovery_nodes.push(node_id.clone());
832                } else {
833                    delayed_recovery_nodes.push(node_id.clone());
834                }
835            }
836
837            // Queue immediate recovery for nodes past grace period
838            if !immediate_recovery_nodes.is_empty() {
839                tracing::info!("Queuing immediate recovery for {} nodes (past grace period) for content {:?}",
840                              immediate_recovery_nodes.len(), content_hash);
841                self.queue_recovery(content_hash, immediate_recovery_nodes, priority).await?;
842            }
843
844            // Schedule delayed checks for nodes in grace period
845            if !delayed_recovery_nodes.is_empty() {
846                tracing::info!("Scheduling delayed recovery check for {} nodes (in grace period) for content {:?}",
847                              delayed_recovery_nodes.len(), content_hash);
848                self.schedule_grace_period_check(
849                    content_hash,
850                    delayed_recovery_nodes,
851                    priority,
852                    failure_tracker.clone(),
853                ).await?;
854            }
855
856            Ok(())
857        } else {
858            // No failure tracker, use immediate recovery
859            self.queue_recovery(content_hash, failed_nodes, priority).await
860        }
861    }
862
863    /// Schedule a delayed recovery check for multiple nodes
864    async fn schedule_grace_period_check(
865        &self,
866        content_hash: ContentHash,
867        failed_nodes: Vec<NodeId>,
868        _priority: RecoveryPriority,
869        failure_tracker: Arc<dyn NodeFailureTracker>,
870    ) -> Result<()> {
871        let recovery_manager = Arc::downgrade(&self.content_tracker);
872
873        tokio::spawn(async move {
874            // Wait for grace period to potentially expire (5 minutes + 10 second buffer)
875            tokio::time::sleep(Duration::from_secs(310)).await;
876
877            if let Some(_tracker) = recovery_manager.upgrade() {
878                // Check again if replication should start for any nodes
879                let mut nodes_to_recover = Vec::new();
880
881                for node_id in &failed_nodes {
882                    if failure_tracker.should_start_replication(node_id).await {
883                        nodes_to_recover.push(node_id.clone());
884                    }
885                }
886
887                if !nodes_to_recover.is_empty() {
888                // Create a new RecoveryManager instance to queue recovery
889                // In practice, this would be handled by the owning ChurnHandler
890                if !nodes_to_recover.is_empty() {
891                    tracing::info!(
892                        "Grace period expired for {} nodes, queuing recovery for content {:?}",
893                        nodes_to_recover.len(),
894                        content_hash
895                    );
896                } else {
897                    tracing::debug!(
898                        "Grace period check completed for content {:?}, no nodes require recovery",
899                        content_hash
900                    );
901                }
902                }
903            }
904        });
905
906        Ok(())
907    }
908
909    /// Check if a node should be recovered immediately or wait for grace period
910    pub async fn should_recover_node(&self, node_id: &NodeId) -> bool {
911        if let Some(ref failure_tracker) = *self.node_failure_tracker.read().await {
912            failure_tracker.should_start_replication(node_id).await
913        } else {
914            // No failure tracker, always recover immediately
915            true
916        }
917    }
918}
919
920#[cfg(test)]
921mod tests {
922    use super::*;
923    use crate::adaptive::trust::MockTrustProvider;
924    use rand::RngCore;
925
926    async fn create_test_churn_handler() -> ChurnHandler {
927        let predictor = Arc::new(ChurnPredictor::new());
928        let trust_provider = Arc::new(MockTrustProvider::new());
929        let replication_manager = Arc::new(ReplicationManager::new(
930            Default::default(),
931            trust_provider.clone(),
932            predictor.clone(),
933            Arc::new(AdaptiveRouter::new(trust_provider.clone())),
934        ));
935        let router = Arc::new(AdaptiveRouter::new(trust_provider.clone()));
936        // Create a test NodeId
937        use crate::peer_record::UserId;
938        let mut hash = [0u8; 32];
939        rand::thread_rng().fill_bytes(&mut hash);
940        let node_id = UserId::from_bytes(hash);
941
942        let gossip = Arc::new(AdaptiveGossipSub::new(
943            node_id.clone(),
944            trust_provider.clone(),
945        ));
946
947        // Create default ChurnConfig
948        let config = ChurnConfig {
949            heartbeat_timeout: Duration::from_secs(30),
950            gossip_timeout: Duration::from_secs(300),
951            monitoring_interval: Duration::from_secs(5),
952            prediction_threshold: 0.7,
953            max_churn_rate: 0.2,
954        };
955
956        ChurnHandler::new(
957            node_id,
958            predictor,
959            trust_provider,
960            replication_manager,
961            router,
962            gossip,
963            config,
964        )
965    }
966
967    #[tokio::test]
968    async fn test_node_monitoring() {
969        let handler = create_test_churn_handler().await;
970        let node_id = NodeId { hash: [1u8; 32] };
971
972        // Record heartbeat
973        handler.node_monitor.record_heartbeat(&node_id).await;
974
975        // Check node is alive
976        assert!(handler.node_monitor.is_alive(&node_id).await);
977
978        // Check status
979        let status = handler.node_monitor.get_node_status(&node_id).await;
980        assert_eq!(status.status, NodeState::Active);
981        assert!(status.last_heartbeat.is_some());
982    }
983
984    #[tokio::test]
985    async fn test_failure_detection() {
986        let mut handler = create_test_churn_handler().await;
987        // Short timeout for testing
988        handler.config.heartbeat_timeout = Duration::from_millis(100);
989        // Ensure NodeMonitor uses the same short timeout
990        if let Some(nm) = Arc::get_mut(&mut handler.node_monitor) {
991            nm.config.heartbeat_timeout = Duration::from_millis(100);
992        }
993        let node_id = NodeId { hash: [1u8; 32] };
994
995        // Record initial heartbeat
996        handler.handle_heartbeat(&node_id).await.unwrap();
997
998        // Wait for timeout
999        tokio::time::sleep(Duration::from_millis(150)).await;
1000
1001        // Node should no longer be alive
1002        assert!(!handler.node_monitor.is_alive(&node_id).await);
1003    }
1004
1005    #[tokio::test]
1006    async fn test_proactive_replication() {
1007        let handler = create_test_churn_handler().await;
1008        let node_id = NodeId { hash: [1u8; 32] };
1009
1010        // Add node with high churn probability
1011        // Ensure node is tracked as active first
1012        handler.node_monitor.record_heartbeat(&node_id).await;
1013        handler
1014            .predictor
1015            .update_node_features(
1016                &node_id,
1017                vec![
1018                    0.1, // Low online duration
1019                    0.9, // High response time
1020                    0.1, // Low resource contribution
1021                    0.1, // Low message frequency
1022                    0.0, // Time of day
1023                    0.0, // Day of week
1024                    0.1, // Low historical reliability
1025                    0.0, 0.0, 0.0,
1026                ],
1027            )
1028            .await
1029            .unwrap();
1030
1031        // Handle imminent departure
1032        handler.handle_imminent_departure(&node_id).await.unwrap();
1033
1034        // Check node marked as departing
1035        let status = handler.node_monitor.get_node_status(&node_id).await;
1036        assert_eq!(status.status, NodeState::Departing);
1037    }
1038
1039    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1040    async fn test_churn_rate_calculation() {
1041        let mut handler = create_test_churn_handler().await;
1042        // Avoid triggering high-churn handling which can slow tests
1043        handler.config.max_churn_rate = 1.0;
1044
1045        // Add some nodes
1046        for i in 0..10 {
1047            let node_id = NodeId { hash: [i; 32] };
1048            handler.handle_heartbeat(&node_id).await.unwrap();
1049        }
1050
1051        // Mark some as failed
1052        for i in 0..3 {
1053            let node_id = NodeId { hash: [i; 32] };
1054            handler
1055                .node_monitor
1056                .update_node_state(&node_id, NodeState::Failed)
1057                .await;
1058        }
1059
1060        // Run monitoring cycle with a strict timeout to avoid hangs
1061        let res =
1062            tokio::time::timeout(std::time::Duration::from_secs(30), handler.monitor_cycle()).await;
1063        assert!(res.is_ok(), "monitor_cycle timed out");
1064        res.unwrap().unwrap();
1065
1066        // Check stats
1067        let stats = handler.get_stats().await;
1068        assert_eq!(stats.total_nodes, 10);
1069        assert_eq!(stats.failed_nodes, 3);
1070        assert!((stats.churn_rate - 0.3).abs() < 0.01); // 30% churn rate
1071    }
1072}