1use crate::adaptive::{
25 ContentHash, NodeId, TrustProvider,
26 gossip::{AdaptiveGossipSub, GossipMessage},
27 learning::ChurnPredictor,
28 replication::ReplicationManager,
29 routing::AdaptiveRouter,
30};
31use crate::dht::{NodeFailureTracker, ReplicationGracePeriodConfig};
32use anyhow::Result;
33use std::{
34 collections::{HashMap, HashSet},
35 sync::Arc,
36 time::{Duration, Instant},
37};
38use tokio::sync::RwLock;
39
40pub struct ChurnHandler {
42 node_id: NodeId,
44
45 predictor: Arc<ChurnPredictor>,
47
48 node_monitor: Arc<NodeMonitor>,
50
51 recovery_manager: Arc<RecoveryManager>,
53
54 trust_provider: Arc<dyn TrustProvider>,
56
57 replication_manager: Arc<ReplicationManager>,
59
60 router: Arc<AdaptiveRouter>,
62
63 gossip: Arc<AdaptiveGossipSub>,
65
66 config: ChurnConfig,
68
69 stats: Arc<RwLock<ChurnStats>>,
71}
72
73#[derive(Debug, Clone)]
75pub struct ChurnConfig {
76 pub heartbeat_timeout: Duration,
78
79 pub gossip_timeout: Duration,
81
82 pub prediction_threshold: f64,
84
85 pub monitoring_interval: Duration,
87
88 pub max_churn_rate: f64,
90}
91
92impl Default for ChurnConfig {
93 fn default() -> Self {
94 Self {
95 heartbeat_timeout: Duration::from_secs(30),
96 gossip_timeout: Duration::from_secs(300),
97 prediction_threshold: 0.7,
98 monitoring_interval: Duration::from_secs(30),
99 max_churn_rate: 0.3,
100 }
101 }
102}
103
104pub struct NodeMonitor {
106 node_status: Arc<RwLock<HashMap<NodeId, NodeStatus>>>,
108
109 heartbeats: Arc<RwLock<HashMap<NodeId, Instant>>>,
111
112 config: ChurnConfig,
114}
115
116#[derive(Debug, Clone)]
118pub struct NodeStatus {
119 pub node_id: NodeId,
121
122 pub last_seen: Instant,
124
125 pub last_heartbeat: Option<Instant>,
127
128 pub last_gossip: Option<Instant>,
130
131 pub status: NodeState,
133
134 pub reliability: f64,
136
137 pub stored_content: HashSet<ContentHash>,
139}
140
141#[derive(Debug, Clone, PartialEq)]
143pub enum NodeState {
144 Active,
146
147 Suspicious,
149
150 Departing,
152
153 Failed,
155}
156
157pub struct RecoveryManager {
159 content_tracker: Arc<RwLock<HashMap<ContentHash, ContentTracker>>>,
161
162 recovery_queue: Arc<RwLock<Vec<RecoveryTask>>>,
164
165 _active_recoveries: Arc<RwLock<HashMap<ContentHash, RecoveryStatus>>>,
167
168 node_failure_tracker: Arc<RwLock<Option<Arc<dyn NodeFailureTracker>>>>,
170}
171
172#[derive(Debug, Clone)]
174#[allow(dead_code)]
175struct ContentTracker {
176 hash: ContentHash,
178
179 storing_nodes: HashSet<NodeId>,
181
182 target_replicas: u32,
184
185 last_verified: Instant,
187}
188
189#[derive(Debug, Clone)]
191#[allow(dead_code)]
192struct RecoveryTask {
193 content_hash: ContentHash,
195
196 failed_nodes: Vec<NodeId>,
198
199 priority: RecoveryPriority,
201
202 created_at: Instant,
204}
205
206#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
208pub enum RecoveryPriority {
209 Low,
211
212 Normal,
214
215 High,
217
218 Critical,
220}
221
222#[derive(Debug, Clone)]
224#[allow(dead_code)]
225struct RecoveryStatus {
226 started_at: Instant,
228
229 contacted_nodes: Vec<NodeId>,
231
232 successful_nodes: Vec<NodeId>,
234
235 failed_attempts: u32,
237}
238
239#[derive(Debug, Default, Clone)]
241pub struct ChurnStats {
242 pub total_nodes: u64,
244
245 pub active_nodes: u64,
247
248 pub failed_nodes: u64,
250
251 pub suspicious_nodes: u64,
253
254 pub churn_rate: f64,
256
257 pub successful_recoveries: u64,
259
260 pub failed_recoveries: u64,
262
263 pub proactive_replications: u64,
265
266 pub avg_detection_time_ms: f64,
268
269 pub grace_period_preventions: u64,
271
272 pub successful_reregistrations: u64,
274
275 pub avg_grace_period_duration_ms: f64,
277}
278
279impl ChurnHandler {
280 pub fn new(
282 node_id: NodeId,
283 predictor: Arc<ChurnPredictor>,
284 trust_provider: Arc<dyn TrustProvider>,
285 replication_manager: Arc<ReplicationManager>,
286 router: Arc<AdaptiveRouter>,
287 gossip: Arc<AdaptiveGossipSub>,
288 config: ChurnConfig,
289 ) -> Self {
290 let node_monitor = Arc::new(NodeMonitor::new(config.clone()));
291 let recovery_manager = Arc::new(RecoveryManager::new());
292
293 Self {
294 node_id,
295 predictor,
296 node_monitor,
297 recovery_manager,
298 trust_provider,
299 replication_manager,
300 router,
301 gossip,
302 config,
303 stats: Arc::new(RwLock::new(ChurnStats::default())),
304 }
305 }
306
307 pub fn with_failure_tracker(
309 node_id: NodeId,
310 predictor: Arc<ChurnPredictor>,
311 trust_provider: Arc<dyn TrustProvider>,
312 replication_manager: Arc<ReplicationManager>,
313 router: Arc<AdaptiveRouter>,
314 gossip: Arc<AdaptiveGossipSub>,
315 config: ChurnConfig,
316 failure_tracker: Arc<dyn NodeFailureTracker>,
317 ) -> Self {
318 let node_monitor = Arc::new(NodeMonitor::new(config.clone()));
319 let recovery_manager = Arc::new(RecoveryManager::with_failure_tracker(failure_tracker));
320
321 Self {
322 node_id,
323 predictor,
324 node_monitor,
325 recovery_manager,
326 trust_provider,
327 replication_manager,
328 router,
329 gossip,
330 config,
331 stats: Arc::new(RwLock::new(ChurnStats::default())),
332 }
333 }
334
335 pub async fn set_failure_tracker(&self, failure_tracker: Arc<dyn NodeFailureTracker>) {
337 self.recovery_manager
338 .set_failure_tracker(failure_tracker)
339 .await;
340 }
341
342 pub async fn start_monitoring(&self) {
344 let monitoring_interval = self.config.monitoring_interval;
345 let handler = self.clone_for_task();
346
347 tokio::spawn(async move {
348 let mut interval = tokio::time::interval(monitoring_interval);
349
350 loop {
351 interval.tick().await;
352
353 if let Err(_e) = handler.monitor_cycle().await {
354 }
356 }
357 });
358 }
359
360 async fn monitor_cycle(&self) -> Result<()> {
362 let nodes = self.node_monitor.get_all_nodes().await;
363 let mut stats = self.stats.write().await;
364
365 stats.total_nodes = nodes.len() as u64;
366 stats.active_nodes = 0;
367 stats.suspicious_nodes = 0;
368 stats.failed_nodes = 0;
369
370 for node_id in nodes {
371 let node_status = self.node_monitor.get_node_status(&node_id).await;
372
373 match node_status.status {
374 NodeState::Active => {
375 stats.active_nodes += 1;
376
377 let prediction = self.predictor.predict(&node_id).await;
379
380 if prediction.probability_1h > self.config.prediction_threshold {
381 self.handle_imminent_departure(&node_id).await?;
382 stats.proactive_replications += 1;
383 }
384 }
385 NodeState::Suspicious => {
386 stats.suspicious_nodes += 1;
387
388 if node_status.last_seen.elapsed() > self.config.heartbeat_timeout {
390 self.handle_node_failure(&node_id).await?;
391 }
392 }
393 NodeState::Failed => {
394 stats.failed_nodes += 1;
395 }
396 _ => {}
397 }
398 }
399
400 if stats.total_nodes > 0 {
402 stats.churn_rate = stats.failed_nodes as f64 / stats.total_nodes as f64;
403 }
404
405 if stats.churn_rate > self.config.max_churn_rate {
407 self.handle_high_churn().await?;
408 }
409
410 Ok(())
411 }
412
413 async fn handle_imminent_departure(&self, node_id: &NodeId) -> Result<()> {
415 self.node_monitor
419 .update_node_state(node_id, NodeState::Departing)
420 .await;
421
422 let stored_content = self.get_content_stored_by(node_id).await?;
424
425 for content_hash in stored_content {
427 self.recovery_manager
428 .increase_replication(&content_hash, RecoveryPriority::High)
429 .await?;
430 }
431
432 self.router.mark_node_unreliable(node_id).await;
434
435 let message = GossipMessage {
437 topic: "node_departing".to_string(),
438 data: bincode::serialize(&node_id)
439 .map_err(|e| anyhow::anyhow!("Serialization error: {}", e))?,
440 from: self.node_id.clone(),
441 seqno: 0, timestamp: std::time::SystemTime::now()
443 .duration_since(std::time::UNIX_EPOCH)
444 .map(|d| d.as_secs())
445 .unwrap_or(0),
446 };
447 self.gossip.publish("node_departing", message).await?;
448
449 Ok(())
450 }
451
452 async fn handle_node_failure(&self, node_id: &NodeId) -> Result<()> {
454 let start_time = Instant::now();
455 self.node_monitor
459 .update_node_state(node_id, NodeState::Failed)
460 .await;
461
462 self.remove_from_routing_tables(node_id).await?;
464
465 let lost_content = self.identify_lost_content(node_id).await?;
467
468 let grace_config = ReplicationGracePeriodConfig::default();
470 tracing::info!(
471 "Node {} failed, queuing recovery for {} content items with {}s grace period",
472 node_id,
473 lost_content.len(),
474 grace_config.grace_period_duration.as_secs()
475 );
476
477 for content_hash in lost_content {
478 self.recovery_manager
479 .queue_recovery_with_grace_period(
480 content_hash,
481 vec![node_id.clone()],
482 RecoveryPriority::Critical,
483 &grace_config,
484 )
485 .await?;
486 }
487
488 self.penalize_unexpected_departure(node_id).await;
490
491 self.trigger_topology_rebalance().await?;
493
494 let mut stats = self.stats.write().await;
496 stats.failed_nodes += 1;
497 let detection_time = start_time.elapsed().as_millis() as f64;
498 stats.avg_detection_time_ms =
499 (stats.avg_detection_time_ms * (stats.failed_nodes - 1) as f64 + detection_time)
500 / stats.failed_nodes as f64;
501
502 if self
504 .recovery_manager
505 .node_failure_tracker
506 .read()
507 .await
508 .is_some()
509 {
510 stats.grace_period_preventions += 1; }
512
513 Ok(())
514 }
515
516 async fn handle_high_churn(&self) -> Result<()> {
518 self.replication_manager
522 .increase_global_replication(1.5)
523 .await;
524
525 self.gossip.reduce_fanout(0.75).await;
527
528 self.router.enable_aggressive_caching().await;
530
531 let churn_rate = self.stats.read().await.churn_rate;
533 let message = GossipMessage {
534 topic: "high_churn_alert".to_string(),
535 data: bincode::serialize(&churn_rate)
536 .map_err(|e| anyhow::anyhow!("Serialization error: {}", e))?,
537 from: self.node_id.clone(),
538 seqno: 0, timestamp: std::time::SystemTime::now()
540 .duration_since(std::time::UNIX_EPOCH)
541 .map(|d| d.as_secs())
542 .unwrap_or(0),
543 };
544 self.gossip.publish("high_churn_alert", message).await?;
545
546 Ok(())
547 }
548
549 async fn remove_from_routing_tables(&self, node_id: &NodeId) -> Result<()> {
551 self.router.remove_node(node_id).await;
553
554 self.router.remove_hyperbolic_coordinate(node_id).await;
556
557 self.router.remove_from_som(node_id).await;
559
560 self.trust_provider.remove_node(node_id);
562
563 Ok(())
564 }
565
566 async fn get_content_stored_by(&self, node_id: &NodeId) -> Result<Vec<ContentHash>> {
568 let status = self.node_monitor.get_node_status(node_id).await;
569 Ok(status.stored_content.into_iter().collect())
570 }
571
572 async fn identify_lost_content(&self, failed_node: &NodeId) -> Result<Vec<ContentHash>> {
574 let all_content = self.get_content_stored_by(failed_node).await?;
575 let mut at_risk_content = Vec::new();
576
577 for content_hash in all_content {
578 let remaining_replicas = self
579 .recovery_manager
580 .get_remaining_replicas(&content_hash, failed_node)
581 .await?;
582
583 if remaining_replicas < 5 {
585 at_risk_content.push(content_hash);
586 }
587 }
588
589 Ok(at_risk_content)
590 }
591
592 async fn penalize_unexpected_departure(&self, node_id: &NodeId) {
594 self.trust_provider.update_trust(
595 &NodeId { hash: [0u8; 32] }, node_id,
597 false, );
599 }
600
601 async fn trigger_topology_rebalance(&self) -> Result<()> {
603 self.router.rebalance_hyperbolic_space().await;
605
606 self.router.update_som_grid().await;
608
609 self.router.trigger_trust_recomputation().await;
611
612 Ok(())
613 }
614
615 pub async fn handle_heartbeat(&self, node_id: &NodeId) -> Result<()> {
617 self.node_monitor.record_heartbeat(node_id).await;
618 Ok(())
619 }
620
621 pub async fn handle_gossip_activity(&self, node_id: &NodeId) -> Result<()> {
623 self.node_monitor.record_gossip_activity(node_id).await;
624 Ok(())
625 }
626
627 pub async fn get_stats(&self) -> ChurnStats {
629 self.stats.read().await.clone()
630 }
631
632 fn clone_for_task(&self) -> Self {
634 Self {
635 node_id: self.node_id.clone(),
636 predictor: self.predictor.clone(),
637 node_monitor: self.node_monitor.clone(),
638 recovery_manager: self.recovery_manager.clone(),
639 trust_provider: self.trust_provider.clone(),
640 replication_manager: self.replication_manager.clone(),
641 router: self.router.clone(),
642 gossip: self.gossip.clone(),
643 config: self.config.clone(),
644 stats: self.stats.clone(),
645 }
646 }
647}
648
649impl NodeMonitor {
650 pub fn new(config: ChurnConfig) -> Self {
652 Self {
653 node_status: Arc::new(RwLock::new(HashMap::new())),
654 heartbeats: Arc::new(RwLock::new(HashMap::new())),
655 config,
656 }
657 }
658
659 pub async fn get_all_nodes(&self) -> Vec<NodeId> {
661 self.node_status.read().await.keys().cloned().collect()
662 }
663
664 pub async fn get_node_status(&self, node_id: &NodeId) -> NodeStatus {
666 self.node_status
667 .read()
668 .await
669 .get(node_id)
670 .cloned()
671 .unwrap_or(NodeStatus {
672 node_id: node_id.clone(),
673 last_seen: Instant::now(),
674 last_heartbeat: None,
675 last_gossip: None,
676 status: NodeState::Failed,
677 reliability: 0.0,
678 stored_content: HashSet::new(),
679 })
680 }
681
682 pub async fn update_node_state(&self, node_id: &NodeId, state: NodeState) {
684 if let Some(status) = self.node_status.write().await.get_mut(node_id) {
685 status.status = state;
686 }
687 }
688
689 pub async fn record_heartbeat(&self, node_id: &NodeId) {
691 let now = Instant::now();
692 self.heartbeats.write().await.insert(node_id.clone(), now);
693
694 let mut status_map = self.node_status.write().await;
695 let status = status_map
696 .entry(node_id.clone())
697 .or_insert_with(|| NodeStatus {
698 node_id: node_id.clone(),
699 last_seen: now,
700 last_heartbeat: None,
701 last_gossip: None,
702 status: NodeState::Active,
703 reliability: 1.0,
704 stored_content: HashSet::new(),
705 });
706 status.last_heartbeat = Some(now);
707 status.last_seen = now;
708 status.status = NodeState::Active;
709 }
710
711 pub async fn record_gossip_activity(&self, node_id: &NodeId) {
713 let now = Instant::now();
714
715 if let Some(status) = self.node_status.write().await.get_mut(node_id) {
716 status.last_gossip = Some(now);
717 status.last_seen = now;
718 }
719 }
720
721 pub async fn is_alive(&self, node_id: &NodeId) -> bool {
723 if let Some(last_heartbeat) = self.heartbeats.read().await.get(node_id) {
724 last_heartbeat.elapsed() < self.config.heartbeat_timeout
725 } else {
726 false
727 }
728 }
729}
730
731impl Default for RecoveryManager {
732 fn default() -> Self {
733 Self::new()
734 }
735}
736
737impl RecoveryManager {
738 pub fn new() -> Self {
740 Self {
741 content_tracker: Arc::new(RwLock::new(HashMap::new())),
742 recovery_queue: Arc::new(RwLock::new(Vec::new())),
743 _active_recoveries: Arc::new(RwLock::new(HashMap::new())),
744 node_failure_tracker: Arc::new(RwLock::new(None)),
745 }
746 }
747
748 pub fn with_failure_tracker(failure_tracker: Arc<dyn NodeFailureTracker>) -> Self {
750 Self {
751 content_tracker: Arc::new(RwLock::new(HashMap::new())),
752 recovery_queue: Arc::new(RwLock::new(Vec::new())),
753 _active_recoveries: Arc::new(RwLock::new(HashMap::new())),
754 node_failure_tracker: Arc::new(RwLock::new(Some(failure_tracker))),
755 }
756 }
757
758 pub async fn set_failure_tracker(&self, failure_tracker: Arc<dyn NodeFailureTracker>) {
760 *self.node_failure_tracker.write().await = Some(failure_tracker);
761 }
762
763 pub async fn increase_replication(
765 &self,
766 content_hash: &ContentHash,
767 priority: RecoveryPriority,
768 ) -> Result<()> {
769 self.queue_recovery(*content_hash, vec![], priority).await
771 }
772
773 pub async fn queue_recovery(
775 &self,
776 content_hash: ContentHash,
777 failed_nodes: Vec<NodeId>,
778 priority: RecoveryPriority,
779 ) -> Result<()> {
780 let task = RecoveryTask {
781 content_hash,
782 failed_nodes,
783 priority,
784 created_at: Instant::now(),
785 };
786
787 let mut queue = self.recovery_queue.write().await;
788 queue.push(task);
789
790 queue.sort_by_key(|task| std::cmp::Reverse(task.priority));
792
793 Ok(())
794 }
795
796 pub async fn get_remaining_replicas(
798 &self,
799 content_hash: &ContentHash,
800 exclude_node: &NodeId,
801 ) -> Result<u32> {
802 if let Some(tracker) = self.content_tracker.read().await.get(content_hash) {
803 let remaining = tracker
804 .storing_nodes
805 .iter()
806 .filter(|&n| n != exclude_node)
807 .count() as u32;
808 Ok(remaining)
809 } else {
810 Ok(0)
811 }
812 }
813
814 pub async fn queue_recovery_with_grace_period(
816 &self,
817 content_hash: ContentHash,
818 failed_nodes: Vec<NodeId>,
819 priority: RecoveryPriority,
820 config: &ReplicationGracePeriodConfig,
821 ) -> Result<()> {
822 if failed_nodes.is_empty() {
823 return self
824 .queue_recovery(content_hash, failed_nodes, priority)
825 .await;
826 }
827
828 if let Some(ref failure_tracker) = *self.node_failure_tracker.read().await {
829 for node_id in &failed_nodes {
831 failure_tracker
832 .record_node_failure(
833 node_id.clone(),
834 crate::dht::replication_grace_period::NodeFailureReason::NetworkTimeout,
835 config,
836 )
837 .await?;
838 }
839
840 let mut immediate_recovery_nodes = Vec::new();
841 let mut delayed_recovery_nodes = Vec::new();
842
843 for node_id in &failed_nodes {
844 if failure_tracker.should_start_replication(node_id).await {
845 immediate_recovery_nodes.push(node_id.clone());
846 } else {
847 delayed_recovery_nodes.push(node_id.clone());
848 }
849 }
850
851 if !immediate_recovery_nodes.is_empty() {
853 tracing::info!(
854 "Queuing immediate recovery for {} nodes (past grace period) for content {:?}",
855 immediate_recovery_nodes.len(),
856 content_hash
857 );
858 self.queue_recovery(content_hash, immediate_recovery_nodes, priority)
859 .await?;
860 }
861
862 if !delayed_recovery_nodes.is_empty() {
864 tracing::info!(
865 "Scheduling delayed recovery check for {} nodes (in grace period) for content {:?}",
866 delayed_recovery_nodes.len(),
867 content_hash
868 );
869 self.schedule_grace_period_check(
870 content_hash,
871 delayed_recovery_nodes,
872 priority,
873 failure_tracker.clone(),
874 )
875 .await?;
876 }
877
878 Ok(())
879 } else {
880 self.queue_recovery(content_hash, failed_nodes, priority)
882 .await
883 }
884 }
885
886 async fn schedule_grace_period_check(
888 &self,
889 content_hash: ContentHash,
890 failed_nodes: Vec<NodeId>,
891 _priority: RecoveryPriority,
892 failure_tracker: Arc<dyn NodeFailureTracker>,
893 ) -> Result<()> {
894 let recovery_manager = Arc::downgrade(&self.content_tracker);
895
896 tokio::spawn(async move {
897 tokio::time::sleep(Duration::from_secs(310)).await;
899
900 if let Some(_tracker) = recovery_manager.upgrade() {
901 let mut nodes_to_recover = Vec::new();
903
904 for node_id in &failed_nodes {
905 if failure_tracker.should_start_replication(node_id).await {
906 nodes_to_recover.push(node_id.clone());
907 }
908 }
909
910 if !nodes_to_recover.is_empty() {
911 if !nodes_to_recover.is_empty() {
914 tracing::info!(
915 "Grace period expired for {} nodes, queuing recovery for content {:?}",
916 nodes_to_recover.len(),
917 content_hash
918 );
919 } else {
920 tracing::debug!(
921 "Grace period check completed for content {:?}, no nodes require recovery",
922 content_hash
923 );
924 }
925 }
926 }
927 });
928
929 Ok(())
930 }
931
932 pub async fn should_recover_node(&self, node_id: &NodeId) -> bool {
934 if let Some(ref failure_tracker) = *self.node_failure_tracker.read().await {
935 failure_tracker.should_start_replication(node_id).await
936 } else {
937 true
939 }
940 }
941}
942
943#[cfg(test)]
944mod tests {
945 use super::*;
946 use crate::adaptive::trust::MockTrustProvider;
947 use rand::RngCore;
948
949 async fn create_test_churn_handler() -> ChurnHandler {
950 let predictor = Arc::new(ChurnPredictor::new());
951 let trust_provider = Arc::new(MockTrustProvider::new());
952 let replication_manager = Arc::new(ReplicationManager::new(
953 Default::default(),
954 trust_provider.clone(),
955 predictor.clone(),
956 Arc::new(AdaptiveRouter::new(trust_provider.clone())),
957 ));
958 let router = Arc::new(AdaptiveRouter::new(trust_provider.clone()));
959 use crate::peer_record::UserId;
961 let mut hash = [0u8; 32];
962 rand::thread_rng().fill_bytes(&mut hash);
963 let node_id = UserId::from_bytes(hash);
964
965 let gossip = Arc::new(AdaptiveGossipSub::new(
966 node_id.clone(),
967 trust_provider.clone(),
968 ));
969
970 let config = ChurnConfig {
972 heartbeat_timeout: Duration::from_secs(30),
973 gossip_timeout: Duration::from_secs(300),
974 monitoring_interval: Duration::from_secs(5),
975 prediction_threshold: 0.7,
976 max_churn_rate: 0.2,
977 };
978
979 ChurnHandler::new(
980 node_id,
981 predictor,
982 trust_provider,
983 replication_manager,
984 router,
985 gossip,
986 config,
987 )
988 }
989
990 #[tokio::test]
991 async fn test_node_monitoring() {
992 let handler = create_test_churn_handler().await;
993 let node_id = NodeId { hash: [1u8; 32] };
994
995 handler.node_monitor.record_heartbeat(&node_id).await;
997
998 assert!(handler.node_monitor.is_alive(&node_id).await);
1000
1001 let status = handler.node_monitor.get_node_status(&node_id).await;
1003 assert_eq!(status.status, NodeState::Active);
1004 assert!(status.last_heartbeat.is_some());
1005 }
1006
1007 #[tokio::test]
1008 async fn test_failure_detection() {
1009 let mut handler = create_test_churn_handler().await;
1010 handler.config.heartbeat_timeout = Duration::from_millis(100);
1012 if let Some(nm) = Arc::get_mut(&mut handler.node_monitor) {
1014 nm.config.heartbeat_timeout = Duration::from_millis(100);
1015 }
1016 let node_id = NodeId { hash: [1u8; 32] };
1017
1018 handler.handle_heartbeat(&node_id).await.unwrap();
1020
1021 tokio::time::sleep(Duration::from_millis(150)).await;
1023
1024 assert!(!handler.node_monitor.is_alive(&node_id).await);
1026 }
1027
1028 #[tokio::test]
1029 async fn test_proactive_replication() {
1030 let handler = create_test_churn_handler().await;
1031 let node_id = NodeId { hash: [1u8; 32] };
1032
1033 handler.node_monitor.record_heartbeat(&node_id).await;
1036 handler
1037 .predictor
1038 .update_node_features(
1039 &node_id,
1040 vec![
1041 0.1, 0.9, 0.1, 0.1, 0.0, 0.0, 0.1, 0.0, 0.0, 0.0,
1049 ],
1050 )
1051 .await
1052 .unwrap();
1053
1054 handler.handle_imminent_departure(&node_id).await.unwrap();
1056
1057 let status = handler.node_monitor.get_node_status(&node_id).await;
1059 assert_eq!(status.status, NodeState::Departing);
1060 }
1061
1062 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1063 async fn test_churn_rate_calculation() {
1064 let mut handler = create_test_churn_handler().await;
1065 handler.config.max_churn_rate = 1.0;
1067
1068 for i in 0..10 {
1070 let node_id = NodeId { hash: [i; 32] };
1071 handler.handle_heartbeat(&node_id).await.unwrap();
1072 }
1073
1074 for i in 0..3 {
1076 let node_id = NodeId { hash: [i; 32] };
1077 handler
1078 .node_monitor
1079 .update_node_state(&node_id, NodeState::Failed)
1080 .await;
1081 }
1082
1083 let res =
1085 tokio::time::timeout(std::time::Duration::from_secs(30), handler.monitor_cycle()).await;
1086 assert!(res.is_ok(), "monitor_cycle timed out");
1087 res.unwrap().unwrap();
1088
1089 let stats = handler.get_stats().await;
1091 assert_eq!(stats.total_nodes, 10);
1092 assert_eq!(stats.failed_nodes, 3);
1093 assert!((stats.churn_rate - 0.3).abs() < 0.01); }
1095}