1use crate::adaptive::{
25 ContentHash, NodeId, TrustProvider,
26 gossip::{AdaptiveGossipSub, GossipMessage},
27 learning::ChurnPredictor,
28 replication::ReplicationManager,
29 routing::AdaptiveRouter,
30};
31use crate::dht::{
32 NodeFailureTracker, ReplicationGracePeriodConfig,
33};
34use anyhow::Result;
35use std::{
36 collections::{HashMap, HashSet},
37 sync::Arc,
38 time::{Duration, Instant},
39};
40use tokio::sync::RwLock;
41
42pub struct ChurnHandler {
44 node_id: NodeId,
46
47 predictor: Arc<ChurnPredictor>,
49
50 node_monitor: Arc<NodeMonitor>,
52
53 recovery_manager: Arc<RecoveryManager>,
55
56 trust_provider: Arc<dyn TrustProvider>,
58
59 replication_manager: Arc<ReplicationManager>,
61
62 router: Arc<AdaptiveRouter>,
64
65 gossip: Arc<AdaptiveGossipSub>,
67
68 config: ChurnConfig,
70
71 stats: Arc<RwLock<ChurnStats>>,
73}
74
75#[derive(Debug, Clone)]
77pub struct ChurnConfig {
78 pub heartbeat_timeout: Duration,
80
81 pub gossip_timeout: Duration,
83
84 pub prediction_threshold: f64,
86
87 pub monitoring_interval: Duration,
89
90 pub max_churn_rate: f64,
92}
93
94impl Default for ChurnConfig {
95 fn default() -> Self {
96 Self {
97 heartbeat_timeout: Duration::from_secs(30),
98 gossip_timeout: Duration::from_secs(300),
99 prediction_threshold: 0.7,
100 monitoring_interval: Duration::from_secs(30),
101 max_churn_rate: 0.3,
102 }
103 }
104}
105
106pub struct NodeMonitor {
108 node_status: Arc<RwLock<HashMap<NodeId, NodeStatus>>>,
110
111 heartbeats: Arc<RwLock<HashMap<NodeId, Instant>>>,
113
114 config: ChurnConfig,
116}
117
118#[derive(Debug, Clone)]
120pub struct NodeStatus {
121 pub node_id: NodeId,
123
124 pub last_seen: Instant,
126
127 pub last_heartbeat: Option<Instant>,
129
130 pub last_gossip: Option<Instant>,
132
133 pub status: NodeState,
135
136 pub reliability: f64,
138
139 pub stored_content: HashSet<ContentHash>,
141}
142
143#[derive(Debug, Clone, PartialEq)]
145pub enum NodeState {
146 Active,
148
149 Suspicious,
151
152 Departing,
154
155 Failed,
157}
158
159pub struct RecoveryManager {
161 content_tracker: Arc<RwLock<HashMap<ContentHash, ContentTracker>>>,
163
164 recovery_queue: Arc<RwLock<Vec<RecoveryTask>>>,
166
167 _active_recoveries: Arc<RwLock<HashMap<ContentHash, RecoveryStatus>>>,
169
170 node_failure_tracker: Arc<RwLock<Option<Arc<dyn NodeFailureTracker>>>>,
172}
173
174#[derive(Debug, Clone)]
176#[allow(dead_code)]
177struct ContentTracker {
178 hash: ContentHash,
180
181 storing_nodes: HashSet<NodeId>,
183
184 target_replicas: u32,
186
187 last_verified: Instant,
189}
190
191#[derive(Debug, Clone)]
193#[allow(dead_code)]
194struct RecoveryTask {
195 content_hash: ContentHash,
197
198 failed_nodes: Vec<NodeId>,
200
201 priority: RecoveryPriority,
203
204 created_at: Instant,
206}
207
208#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
210pub enum RecoveryPriority {
211 Low,
213
214 Normal,
216
217 High,
219
220 Critical,
222}
223
224#[derive(Debug, Clone)]
226#[allow(dead_code)]
227struct RecoveryStatus {
228 started_at: Instant,
230
231 contacted_nodes: Vec<NodeId>,
233
234 successful_nodes: Vec<NodeId>,
236
237 failed_attempts: u32,
239}
240
241#[derive(Debug, Default, Clone)]
243pub struct ChurnStats {
244 pub total_nodes: u64,
246
247 pub active_nodes: u64,
249
250 pub failed_nodes: u64,
252
253 pub suspicious_nodes: u64,
255
256 pub churn_rate: f64,
258
259 pub successful_recoveries: u64,
261
262 pub failed_recoveries: u64,
264
265 pub proactive_replications: u64,
267
268 pub avg_detection_time_ms: f64,
270
271 pub grace_period_preventions: u64,
273
274 pub successful_reregistrations: u64,
276
277 pub avg_grace_period_duration_ms: f64,
279}
280
281impl ChurnHandler {
282 pub fn new(
284 node_id: NodeId,
285 predictor: Arc<ChurnPredictor>,
286 trust_provider: Arc<dyn TrustProvider>,
287 replication_manager: Arc<ReplicationManager>,
288 router: Arc<AdaptiveRouter>,
289 gossip: Arc<AdaptiveGossipSub>,
290 config: ChurnConfig,
291 ) -> Self {
292 let node_monitor = Arc::new(NodeMonitor::new(config.clone()));
293 let recovery_manager = Arc::new(RecoveryManager::new());
294
295 Self {
296 node_id,
297 predictor,
298 node_monitor,
299 recovery_manager,
300 trust_provider,
301 replication_manager,
302 router,
303 gossip,
304 config,
305 stats: Arc::new(RwLock::new(ChurnStats::default())),
306 }
307 }
308
309 pub fn with_failure_tracker(
311 node_id: NodeId,
312 predictor: Arc<ChurnPredictor>,
313 trust_provider: Arc<dyn TrustProvider>,
314 replication_manager: Arc<ReplicationManager>,
315 router: Arc<AdaptiveRouter>,
316 gossip: Arc<AdaptiveGossipSub>,
317 config: ChurnConfig,
318 failure_tracker: Arc<dyn NodeFailureTracker>,
319 ) -> Self {
320 let node_monitor = Arc::new(NodeMonitor::new(config.clone()));
321 let recovery_manager = Arc::new(RecoveryManager::with_failure_tracker(failure_tracker));
322
323 Self {
324 node_id,
325 predictor,
326 node_monitor,
327 recovery_manager,
328 trust_provider,
329 replication_manager,
330 router,
331 gossip,
332 config,
333 stats: Arc::new(RwLock::new(ChurnStats::default())),
334 }
335 }
336
337 pub async fn set_failure_tracker(&self, failure_tracker: Arc<dyn NodeFailureTracker>) {
339 self.recovery_manager.set_failure_tracker(failure_tracker).await;
340 }
341
342 pub async fn start_monitoring(&self) {
344 let monitoring_interval = self.config.monitoring_interval;
345 let handler = self.clone_for_task();
346
347 tokio::spawn(async move {
348 let mut interval = tokio::time::interval(monitoring_interval);
349
350 loop {
351 interval.tick().await;
352
353 if let Err(_e) = handler.monitor_cycle().await {
354 }
356 }
357 });
358 }
359
360 async fn monitor_cycle(&self) -> Result<()> {
362 let nodes = self.node_monitor.get_all_nodes().await;
363 let mut stats = self.stats.write().await;
364
365 stats.total_nodes = nodes.len() as u64;
366 stats.active_nodes = 0;
367 stats.suspicious_nodes = 0;
368 stats.failed_nodes = 0;
369
370 for node_id in nodes {
371 let node_status = self.node_monitor.get_node_status(&node_id).await;
372
373 match node_status.status {
374 NodeState::Active => {
375 stats.active_nodes += 1;
376
377 let prediction = self.predictor.predict(&node_id).await;
379
380 if prediction.probability_1h > self.config.prediction_threshold {
381 self.handle_imminent_departure(&node_id).await?;
382 stats.proactive_replications += 1;
383 }
384 }
385 NodeState::Suspicious => {
386 stats.suspicious_nodes += 1;
387
388 if node_status.last_seen.elapsed() > self.config.heartbeat_timeout {
390 self.handle_node_failure(&node_id).await?;
391 }
392 }
393 NodeState::Failed => {
394 stats.failed_nodes += 1;
395 }
396 _ => {}
397 }
398 }
399
400 if stats.total_nodes > 0 {
402 stats.churn_rate = stats.failed_nodes as f64 / stats.total_nodes as f64;
403 }
404
405 if stats.churn_rate > self.config.max_churn_rate {
407 self.handle_high_churn().await?;
408 }
409
410 Ok(())
411 }
412
413 async fn handle_imminent_departure(&self, node_id: &NodeId) -> Result<()> {
415 self.node_monitor
419 .update_node_state(node_id, NodeState::Departing)
420 .await;
421
422 let stored_content = self.get_content_stored_by(node_id).await?;
424
425 for content_hash in stored_content {
427 self.recovery_manager
428 .increase_replication(&content_hash, RecoveryPriority::High)
429 .await?;
430 }
431
432 self.router.mark_node_unreliable(node_id).await;
434
435 let message = GossipMessage {
437 topic: "node_departing".to_string(),
438 data: bincode::serialize(&node_id)
439 .map_err(|e| anyhow::anyhow!("Serialization error: {}", e))?,
440 from: self.node_id.clone(),
441 seqno: 0, timestamp: std::time::SystemTime::now()
443 .duration_since(std::time::UNIX_EPOCH)
444 .map(|d| d.as_secs())
445 .unwrap_or(0),
446 };
447 self.gossip.publish("node_departing", message).await?;
448
449 Ok(())
450 }
451
452 async fn handle_node_failure(&self, node_id: &NodeId) -> Result<()> {
454 let start_time = Instant::now();
455 self.node_monitor
459 .update_node_state(node_id, NodeState::Failed)
460 .await;
461
462 self.remove_from_routing_tables(node_id).await?;
464
465 let lost_content = self.identify_lost_content(node_id).await?;
467
468 let grace_config = ReplicationGracePeriodConfig::default();
470 tracing::info!("Node {} failed, queuing recovery for {} content items with {}s grace period",
471 node_id, lost_content.len(), grace_config.grace_period_duration.as_secs());
472
473 for content_hash in lost_content {
474 self.recovery_manager
475 .queue_recovery_with_grace_period(
476 content_hash,
477 vec![node_id.clone()],
478 RecoveryPriority::Critical,
479 &grace_config,
480 )
481 .await?;
482 }
483
484 self.penalize_unexpected_departure(node_id).await;
486
487 self.trigger_topology_rebalance().await?;
489
490 let mut stats = self.stats.write().await;
492 stats.failed_nodes += 1;
493 let detection_time = start_time.elapsed().as_millis() as f64;
494 stats.avg_detection_time_ms =
495 (stats.avg_detection_time_ms * (stats.failed_nodes - 1) as f64 + detection_time)
496 / stats.failed_nodes as f64;
497
498 if self.recovery_manager.node_failure_tracker.read().await.is_some() {
500 stats.grace_period_preventions += 1; }
502
503 Ok(())
504 }
505
506 async fn handle_high_churn(&self) -> Result<()> {
508 self.replication_manager
512 .increase_global_replication(1.5)
513 .await;
514
515 self.gossip.reduce_fanout(0.75).await;
517
518 self.router.enable_aggressive_caching().await;
520
521 let churn_rate = self.stats.read().await.churn_rate;
523 let message = GossipMessage {
524 topic: "high_churn_alert".to_string(),
525 data: bincode::serialize(&churn_rate)
526 .map_err(|e| anyhow::anyhow!("Serialization error: {}", e))?,
527 from: self.node_id.clone(),
528 seqno: 0, timestamp: std::time::SystemTime::now()
530 .duration_since(std::time::UNIX_EPOCH)
531 .map(|d| d.as_secs())
532 .unwrap_or(0),
533 };
534 self.gossip.publish("high_churn_alert", message).await?;
535
536 Ok(())
537 }
538
539 async fn remove_from_routing_tables(&self, node_id: &NodeId) -> Result<()> {
541 self.router.remove_node(node_id).await;
543
544 self.router.remove_hyperbolic_coordinate(node_id).await;
546
547 self.router.remove_from_som(node_id).await;
549
550 self.trust_provider.remove_node(node_id);
552
553 Ok(())
554 }
555
556 async fn get_content_stored_by(&self, node_id: &NodeId) -> Result<Vec<ContentHash>> {
558 let status = self.node_monitor.get_node_status(node_id).await;
559 Ok(status.stored_content.into_iter().collect())
560 }
561
562 async fn identify_lost_content(&self, failed_node: &NodeId) -> Result<Vec<ContentHash>> {
564 let all_content = self.get_content_stored_by(failed_node).await?;
565 let mut at_risk_content = Vec::new();
566
567 for content_hash in all_content {
568 let remaining_replicas = self
569 .recovery_manager
570 .get_remaining_replicas(&content_hash, failed_node)
571 .await?;
572
573 if remaining_replicas < 5 {
575 at_risk_content.push(content_hash);
576 }
577 }
578
579 Ok(at_risk_content)
580 }
581
582 async fn penalize_unexpected_departure(&self, node_id: &NodeId) {
584 self.trust_provider.update_trust(
585 &NodeId { hash: [0u8; 32] }, node_id,
587 false, );
589 }
590
591 async fn trigger_topology_rebalance(&self) -> Result<()> {
593 self.router.rebalance_hyperbolic_space().await;
595
596 self.router.update_som_grid().await;
598
599 self.router.trigger_trust_recomputation().await;
601
602 Ok(())
603 }
604
605 pub async fn handle_heartbeat(&self, node_id: &NodeId) -> Result<()> {
607 self.node_monitor.record_heartbeat(node_id).await;
608 Ok(())
609 }
610
611 pub async fn handle_gossip_activity(&self, node_id: &NodeId) -> Result<()> {
613 self.node_monitor.record_gossip_activity(node_id).await;
614 Ok(())
615 }
616
617 pub async fn get_stats(&self) -> ChurnStats {
619 self.stats.read().await.clone()
620 }
621
622 fn clone_for_task(&self) -> Self {
624 Self {
625 node_id: self.node_id.clone(),
626 predictor: self.predictor.clone(),
627 node_monitor: self.node_monitor.clone(),
628 recovery_manager: self.recovery_manager.clone(),
629 trust_provider: self.trust_provider.clone(),
630 replication_manager: self.replication_manager.clone(),
631 router: self.router.clone(),
632 gossip: self.gossip.clone(),
633 config: self.config.clone(),
634 stats: self.stats.clone(),
635 }
636 }
637}
638
639impl NodeMonitor {
640 pub fn new(config: ChurnConfig) -> Self {
642 Self {
643 node_status: Arc::new(RwLock::new(HashMap::new())),
644 heartbeats: Arc::new(RwLock::new(HashMap::new())),
645 config,
646 }
647 }
648
649 pub async fn get_all_nodes(&self) -> Vec<NodeId> {
651 self.node_status.read().await.keys().cloned().collect()
652 }
653
654 pub async fn get_node_status(&self, node_id: &NodeId) -> NodeStatus {
656 self.node_status
657 .read()
658 .await
659 .get(node_id)
660 .cloned()
661 .unwrap_or(NodeStatus {
662 node_id: node_id.clone(),
663 last_seen: Instant::now(),
664 last_heartbeat: None,
665 last_gossip: None,
666 status: NodeState::Failed,
667 reliability: 0.0,
668 stored_content: HashSet::new(),
669 })
670 }
671
672 pub async fn update_node_state(&self, node_id: &NodeId, state: NodeState) {
674 if let Some(status) = self.node_status.write().await.get_mut(node_id) {
675 status.status = state;
676 }
677 }
678
679 pub async fn record_heartbeat(&self, node_id: &NodeId) {
681 let now = Instant::now();
682 self.heartbeats.write().await.insert(node_id.clone(), now);
683
684 let mut status_map = self.node_status.write().await;
685 let status = status_map
686 .entry(node_id.clone())
687 .or_insert_with(|| NodeStatus {
688 node_id: node_id.clone(),
689 last_seen: now,
690 last_heartbeat: None,
691 last_gossip: None,
692 status: NodeState::Active,
693 reliability: 1.0,
694 stored_content: HashSet::new(),
695 });
696 status.last_heartbeat = Some(now);
697 status.last_seen = now;
698 status.status = NodeState::Active;
699 }
700
701 pub async fn record_gossip_activity(&self, node_id: &NodeId) {
703 let now = Instant::now();
704
705 if let Some(status) = self.node_status.write().await.get_mut(node_id) {
706 status.last_gossip = Some(now);
707 status.last_seen = now;
708 }
709 }
710
711 pub async fn is_alive(&self, node_id: &NodeId) -> bool {
713 if let Some(last_heartbeat) = self.heartbeats.read().await.get(node_id) {
714 last_heartbeat.elapsed() < self.config.heartbeat_timeout
715 } else {
716 false
717 }
718 }
719}
720
721impl Default for RecoveryManager {
722 fn default() -> Self {
723 Self::new()
724 }
725}
726
727impl RecoveryManager {
728 pub fn new() -> Self {
730 Self {
731 content_tracker: Arc::new(RwLock::new(HashMap::new())),
732 recovery_queue: Arc::new(RwLock::new(Vec::new())),
733 _active_recoveries: Arc::new(RwLock::new(HashMap::new())),
734 node_failure_tracker: Arc::new(RwLock::new(None)),
735 }
736 }
737
738 pub fn with_failure_tracker(failure_tracker: Arc<dyn NodeFailureTracker>) -> Self {
740 Self {
741 content_tracker: Arc::new(RwLock::new(HashMap::new())),
742 recovery_queue: Arc::new(RwLock::new(Vec::new())),
743 _active_recoveries: Arc::new(RwLock::new(HashMap::new())),
744 node_failure_tracker: Arc::new(RwLock::new(Some(failure_tracker))),
745 }
746 }
747
748 pub async fn set_failure_tracker(&self, failure_tracker: Arc<dyn NodeFailureTracker>) {
750 *self.node_failure_tracker.write().await = Some(failure_tracker);
751 }
752
753 pub async fn increase_replication(
755 &self,
756 content_hash: &ContentHash,
757 priority: RecoveryPriority,
758 ) -> Result<()> {
759 self.queue_recovery(*content_hash, vec![], priority).await
761 }
762
763 pub async fn queue_recovery(
765 &self,
766 content_hash: ContentHash,
767 failed_nodes: Vec<NodeId>,
768 priority: RecoveryPriority,
769 ) -> Result<()> {
770 let task = RecoveryTask {
771 content_hash,
772 failed_nodes,
773 priority,
774 created_at: Instant::now(),
775 };
776
777 let mut queue = self.recovery_queue.write().await;
778 queue.push(task);
779
780 queue.sort_by_key(|task| std::cmp::Reverse(task.priority));
782
783 Ok(())
784 }
785
786 pub async fn get_remaining_replicas(
788 &self,
789 content_hash: &ContentHash,
790 exclude_node: &NodeId,
791 ) -> Result<u32> {
792 if let Some(tracker) = self.content_tracker.read().await.get(content_hash) {
793 let remaining = tracker
794 .storing_nodes
795 .iter()
796 .filter(|&n| n != exclude_node)
797 .count() as u32;
798 Ok(remaining)
799 } else {
800 Ok(0)
801 }
802 }
803
804 pub async fn queue_recovery_with_grace_period(
806 &self,
807 content_hash: ContentHash,
808 failed_nodes: Vec<NodeId>,
809 priority: RecoveryPriority,
810 config: &ReplicationGracePeriodConfig,
811 ) -> Result<()> {
812 if failed_nodes.is_empty() {
813 return self.queue_recovery(content_hash, failed_nodes, priority).await;
814 }
815
816 if let Some(ref failure_tracker) = *self.node_failure_tracker.read().await {
817 for node_id in &failed_nodes {
819 failure_tracker.record_node_failure(
820 node_id.clone(),
821 crate::dht::replication_grace_period::NodeFailureReason::NetworkTimeout,
822 config,
823 ).await?;
824 }
825
826 let mut immediate_recovery_nodes = Vec::new();
827 let mut delayed_recovery_nodes = Vec::new();
828
829 for node_id in &failed_nodes {
830 if failure_tracker.should_start_replication(node_id).await {
831 immediate_recovery_nodes.push(node_id.clone());
832 } else {
833 delayed_recovery_nodes.push(node_id.clone());
834 }
835 }
836
837 if !immediate_recovery_nodes.is_empty() {
839 tracing::info!("Queuing immediate recovery for {} nodes (past grace period) for content {:?}",
840 immediate_recovery_nodes.len(), content_hash);
841 self.queue_recovery(content_hash, immediate_recovery_nodes, priority).await?;
842 }
843
844 if !delayed_recovery_nodes.is_empty() {
846 tracing::info!("Scheduling delayed recovery check for {} nodes (in grace period) for content {:?}",
847 delayed_recovery_nodes.len(), content_hash);
848 self.schedule_grace_period_check(
849 content_hash,
850 delayed_recovery_nodes,
851 priority,
852 failure_tracker.clone(),
853 ).await?;
854 }
855
856 Ok(())
857 } else {
858 self.queue_recovery(content_hash, failed_nodes, priority).await
860 }
861 }
862
863 async fn schedule_grace_period_check(
865 &self,
866 content_hash: ContentHash,
867 failed_nodes: Vec<NodeId>,
868 _priority: RecoveryPriority,
869 failure_tracker: Arc<dyn NodeFailureTracker>,
870 ) -> Result<()> {
871 let recovery_manager = Arc::downgrade(&self.content_tracker);
872
873 tokio::spawn(async move {
874 tokio::time::sleep(Duration::from_secs(310)).await;
876
877 if let Some(_tracker) = recovery_manager.upgrade() {
878 let mut nodes_to_recover = Vec::new();
880
881 for node_id in &failed_nodes {
882 if failure_tracker.should_start_replication(node_id).await {
883 nodes_to_recover.push(node_id.clone());
884 }
885 }
886
887 if !nodes_to_recover.is_empty() {
888 if !nodes_to_recover.is_empty() {
891 tracing::info!(
892 "Grace period expired for {} nodes, queuing recovery for content {:?}",
893 nodes_to_recover.len(),
894 content_hash
895 );
896 } else {
897 tracing::debug!(
898 "Grace period check completed for content {:?}, no nodes require recovery",
899 content_hash
900 );
901 }
902 }
903 }
904 });
905
906 Ok(())
907 }
908
909 pub async fn should_recover_node(&self, node_id: &NodeId) -> bool {
911 if let Some(ref failure_tracker) = *self.node_failure_tracker.read().await {
912 failure_tracker.should_start_replication(node_id).await
913 } else {
914 true
916 }
917 }
918}
919
920#[cfg(test)]
921mod tests {
922 use super::*;
923 use crate::adaptive::trust::MockTrustProvider;
924 use rand::RngCore;
925
926 async fn create_test_churn_handler() -> ChurnHandler {
927 let predictor = Arc::new(ChurnPredictor::new());
928 let trust_provider = Arc::new(MockTrustProvider::new());
929 let replication_manager = Arc::new(ReplicationManager::new(
930 Default::default(),
931 trust_provider.clone(),
932 predictor.clone(),
933 Arc::new(AdaptiveRouter::new(trust_provider.clone())),
934 ));
935 let router = Arc::new(AdaptiveRouter::new(trust_provider.clone()));
936 use crate::peer_record::UserId;
938 let mut hash = [0u8; 32];
939 rand::thread_rng().fill_bytes(&mut hash);
940 let node_id = UserId::from_bytes(hash);
941
942 let gossip = Arc::new(AdaptiveGossipSub::new(
943 node_id.clone(),
944 trust_provider.clone(),
945 ));
946
947 let config = ChurnConfig {
949 heartbeat_timeout: Duration::from_secs(30),
950 gossip_timeout: Duration::from_secs(300),
951 monitoring_interval: Duration::from_secs(5),
952 prediction_threshold: 0.7,
953 max_churn_rate: 0.2,
954 };
955
956 ChurnHandler::new(
957 node_id,
958 predictor,
959 trust_provider,
960 replication_manager,
961 router,
962 gossip,
963 config,
964 )
965 }
966
967 #[tokio::test]
968 async fn test_node_monitoring() {
969 let handler = create_test_churn_handler().await;
970 let node_id = NodeId { hash: [1u8; 32] };
971
972 handler.node_monitor.record_heartbeat(&node_id).await;
974
975 assert!(handler.node_monitor.is_alive(&node_id).await);
977
978 let status = handler.node_monitor.get_node_status(&node_id).await;
980 assert_eq!(status.status, NodeState::Active);
981 assert!(status.last_heartbeat.is_some());
982 }
983
984 #[tokio::test]
985 async fn test_failure_detection() {
986 let mut handler = create_test_churn_handler().await;
987 handler.config.heartbeat_timeout = Duration::from_millis(100);
989 if let Some(nm) = Arc::get_mut(&mut handler.node_monitor) {
991 nm.config.heartbeat_timeout = Duration::from_millis(100);
992 }
993 let node_id = NodeId { hash: [1u8; 32] };
994
995 handler.handle_heartbeat(&node_id).await.unwrap();
997
998 tokio::time::sleep(Duration::from_millis(150)).await;
1000
1001 assert!(!handler.node_monitor.is_alive(&node_id).await);
1003 }
1004
1005 #[tokio::test]
1006 async fn test_proactive_replication() {
1007 let handler = create_test_churn_handler().await;
1008 let node_id = NodeId { hash: [1u8; 32] };
1009
1010 handler.node_monitor.record_heartbeat(&node_id).await;
1013 handler
1014 .predictor
1015 .update_node_features(
1016 &node_id,
1017 vec![
1018 0.1, 0.9, 0.1, 0.1, 0.0, 0.0, 0.1, 0.0, 0.0, 0.0,
1026 ],
1027 )
1028 .await
1029 .unwrap();
1030
1031 handler.handle_imminent_departure(&node_id).await.unwrap();
1033
1034 let status = handler.node_monitor.get_node_status(&node_id).await;
1036 assert_eq!(status.status, NodeState::Departing);
1037 }
1038
1039 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1040 async fn test_churn_rate_calculation() {
1041 let mut handler = create_test_churn_handler().await;
1042 handler.config.max_churn_rate = 1.0;
1044
1045 for i in 0..10 {
1047 let node_id = NodeId { hash: [i; 32] };
1048 handler.handle_heartbeat(&node_id).await.unwrap();
1049 }
1050
1051 for i in 0..3 {
1053 let node_id = NodeId { hash: [i; 32] };
1054 handler
1055 .node_monitor
1056 .update_node_state(&node_id, NodeState::Failed)
1057 .await;
1058 }
1059
1060 let res =
1062 tokio::time::timeout(std::time::Duration::from_secs(30), handler.monitor_cycle()).await;
1063 assert!(res.is_ok(), "monitor_cycle timed out");
1064 res.unwrap().unwrap();
1065
1066 let stats = handler.get_stats().await;
1068 assert_eq!(stats.total_nodes, 10);
1069 assert_eq!(stats.failed_nodes, 3);
1070 assert!((stats.churn_rate - 0.3).abs() < 0.01); }
1072}