1use crate::adaptive::{
25 ContentHash, NodeId, TrustProvider,
26 gossip::{AdaptiveGossipSub, GossipMessage},
27 learning::ChurnPredictor,
28 replication::ReplicationManager,
29 routing::AdaptiveRouter,
30};
31use anyhow::Result;
32use std::{
33 collections::{HashMap, HashSet},
34 sync::Arc,
35 time::{Duration, Instant},
36};
37use tokio::sync::RwLock;
38
39pub struct ChurnHandler {
41 node_id: NodeId,
43
44 predictor: Arc<ChurnPredictor>,
46
47 node_monitor: Arc<NodeMonitor>,
49
50 recovery_manager: Arc<RecoveryManager>,
52
53 trust_provider: Arc<dyn TrustProvider>,
55
56 replication_manager: Arc<ReplicationManager>,
58
59 router: Arc<AdaptiveRouter>,
61
62 gossip: Arc<AdaptiveGossipSub>,
64
65 config: ChurnConfig,
67
68 stats: Arc<RwLock<ChurnStats>>,
70}
71
72#[derive(Debug, Clone)]
74pub struct ChurnConfig {
75 pub heartbeat_timeout: Duration,
77
78 pub gossip_timeout: Duration,
80
81 pub prediction_threshold: f64,
83
84 pub monitoring_interval: Duration,
86
87 pub max_churn_rate: f64,
89}
90
91impl Default for ChurnConfig {
92 fn default() -> Self {
93 Self {
94 heartbeat_timeout: Duration::from_secs(30),
95 gossip_timeout: Duration::from_secs(300),
96 prediction_threshold: 0.7,
97 monitoring_interval: Duration::from_secs(30),
98 max_churn_rate: 0.3,
99 }
100 }
101}
102
103pub struct NodeMonitor {
105 node_status: Arc<RwLock<HashMap<NodeId, NodeStatus>>>,
107
108 heartbeats: Arc<RwLock<HashMap<NodeId, Instant>>>,
110
111 config: ChurnConfig,
113}
114
115#[derive(Debug, Clone)]
117pub struct NodeStatus {
118 pub node_id: NodeId,
120
121 pub last_seen: Instant,
123
124 pub last_heartbeat: Option<Instant>,
126
127 pub last_gossip: Option<Instant>,
129
130 pub status: NodeState,
132
133 pub reliability: f64,
135
136 pub stored_content: HashSet<ContentHash>,
138}
139
140#[derive(Debug, Clone, PartialEq)]
142pub enum NodeState {
143 Active,
145
146 Suspicious,
148
149 Departing,
151
152 Failed,
154}
155
156pub struct RecoveryManager {
158 content_tracker: Arc<RwLock<HashMap<ContentHash, ContentTracker>>>,
160
161 recovery_queue: Arc<RwLock<Vec<RecoveryTask>>>,
163
164 _active_recoveries: Arc<RwLock<HashMap<ContentHash, RecoveryStatus>>>,
166}
167
168#[derive(Debug, Clone)]
170#[allow(dead_code)]
171struct ContentTracker {
172 hash: ContentHash,
174
175 storing_nodes: HashSet<NodeId>,
177
178 target_replicas: u32,
180
181 last_verified: Instant,
183}
184
185#[derive(Debug, Clone)]
187#[allow(dead_code)]
188struct RecoveryTask {
189 content_hash: ContentHash,
191
192 failed_nodes: Vec<NodeId>,
194
195 priority: RecoveryPriority,
197
198 created_at: Instant,
200}
201
202#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
204pub enum RecoveryPriority {
205 Low,
207
208 Normal,
210
211 High,
213
214 Critical,
216}
217
218#[derive(Debug, Clone)]
220#[allow(dead_code)]
221struct RecoveryStatus {
222 started_at: Instant,
224
225 contacted_nodes: Vec<NodeId>,
227
228 successful_nodes: Vec<NodeId>,
230
231 failed_attempts: u32,
233}
234
235#[derive(Debug, Default, Clone)]
237pub struct ChurnStats {
238 pub total_nodes: u64,
240
241 pub active_nodes: u64,
243
244 pub failed_nodes: u64,
246
247 pub suspicious_nodes: u64,
249
250 pub churn_rate: f64,
252
253 pub successful_recoveries: u64,
255
256 pub failed_recoveries: u64,
258
259 pub proactive_replications: u64,
261
262 pub avg_detection_time_ms: f64,
264}
265
266impl ChurnHandler {
267 pub fn new(
269 node_id: NodeId,
270 predictor: Arc<ChurnPredictor>,
271 trust_provider: Arc<dyn TrustProvider>,
272 replication_manager: Arc<ReplicationManager>,
273 router: Arc<AdaptiveRouter>,
274 gossip: Arc<AdaptiveGossipSub>,
275 config: ChurnConfig,
276 ) -> Self {
277 let node_monitor = Arc::new(NodeMonitor::new(config.clone()));
278 let recovery_manager = Arc::new(RecoveryManager::new());
279
280 Self {
281 node_id,
282 predictor,
283 node_monitor,
284 recovery_manager,
285 trust_provider,
286 replication_manager,
287 router,
288 gossip,
289 config,
290 stats: Arc::new(RwLock::new(ChurnStats::default())),
291 }
292 }
293
294 pub async fn start_monitoring(&self) {
296 let monitoring_interval = self.config.monitoring_interval;
297 let handler = self.clone_for_task();
298
299 tokio::spawn(async move {
300 let mut interval = tokio::time::interval(monitoring_interval);
301
302 loop {
303 interval.tick().await;
304
305 if let Err(_e) = handler.monitor_cycle().await {
306 }
308 }
309 });
310 }
311
312 async fn monitor_cycle(&self) -> Result<()> {
314 let nodes = self.node_monitor.get_all_nodes().await;
315 let mut stats = self.stats.write().await;
316
317 stats.total_nodes = nodes.len() as u64;
318 stats.active_nodes = 0;
319 stats.suspicious_nodes = 0;
320 stats.failed_nodes = 0;
321
322 for node_id in nodes {
323 let node_status = self.node_monitor.get_node_status(&node_id).await;
324
325 match node_status.status {
326 NodeState::Active => {
327 stats.active_nodes += 1;
328
329 let prediction = self.predictor.predict(&node_id).await;
331
332 if prediction.probability_1h > self.config.prediction_threshold {
333 self.handle_imminent_departure(&node_id).await?;
334 stats.proactive_replications += 1;
335 }
336 }
337 NodeState::Suspicious => {
338 stats.suspicious_nodes += 1;
339
340 if node_status.last_seen.elapsed() > self.config.heartbeat_timeout {
342 self.handle_node_failure(&node_id).await?;
343 }
344 }
345 NodeState::Failed => {
346 stats.failed_nodes += 1;
347 }
348 _ => {}
349 }
350 }
351
352 if stats.total_nodes > 0 {
354 stats.churn_rate = stats.failed_nodes as f64 / stats.total_nodes as f64;
355 }
356
357 if stats.churn_rate > self.config.max_churn_rate {
359 self.handle_high_churn().await?;
360 }
361
362 Ok(())
363 }
364
365 async fn handle_imminent_departure(&self, node_id: &NodeId) -> Result<()> {
367 self.node_monitor
371 .update_node_state(node_id, NodeState::Departing)
372 .await;
373
374 let stored_content = self.get_content_stored_by(node_id).await?;
376
377 for content_hash in stored_content {
379 self.recovery_manager
380 .increase_replication(&content_hash, RecoveryPriority::High)
381 .await?;
382 }
383
384 self.router.mark_node_unreliable(node_id).await;
386
387 let message = GossipMessage {
389 topic: "node_departing".to_string(),
390 data: bincode::serialize(&node_id)
391 .map_err(|e| anyhow::anyhow!("Serialization error: {}", e))?,
392 from: self.node_id.clone(),
393 seqno: 0, timestamp: std::time::SystemTime::now()
395 .duration_since(std::time::UNIX_EPOCH)
396 .map(|d| d.as_secs())
397 .unwrap_or(0),
398 };
399 self.gossip.publish("node_departing", message).await?;
400
401 Ok(())
402 }
403
404 async fn handle_node_failure(&self, node_id: &NodeId) -> Result<()> {
406 let start_time = Instant::now();
407 self.node_monitor
411 .update_node_state(node_id, NodeState::Failed)
412 .await;
413
414 self.remove_from_routing_tables(node_id).await?;
416
417 let lost_content = self.identify_lost_content(node_id).await?;
419
420 for content_hash in lost_content {
422 self.recovery_manager
423 .queue_recovery(
424 content_hash,
425 vec![node_id.clone()],
426 RecoveryPriority::Critical,
427 )
428 .await?;
429 }
430
431 self.penalize_unexpected_departure(node_id).await;
433
434 self.trigger_topology_rebalance().await?;
436
437 let mut stats = self.stats.write().await;
439 stats.failed_nodes += 1;
440 let detection_time = start_time.elapsed().as_millis() as f64;
441 stats.avg_detection_time_ms =
442 (stats.avg_detection_time_ms * (stats.failed_nodes - 1) as f64 + detection_time)
443 / stats.failed_nodes as f64;
444
445 Ok(())
446 }
447
448 async fn handle_high_churn(&self) -> Result<()> {
450 self.replication_manager
454 .increase_global_replication(1.5)
455 .await;
456
457 self.gossip.reduce_fanout(0.75).await;
459
460 self.router.enable_aggressive_caching().await;
462
463 let churn_rate = self.stats.read().await.churn_rate;
465 let message = GossipMessage {
466 topic: "high_churn_alert".to_string(),
467 data: bincode::serialize(&churn_rate)
468 .map_err(|e| anyhow::anyhow!("Serialization error: {}", e))?,
469 from: self.node_id.clone(),
470 seqno: 0, timestamp: std::time::SystemTime::now()
472 .duration_since(std::time::UNIX_EPOCH)
473 .map(|d| d.as_secs())
474 .unwrap_or(0),
475 };
476 self.gossip.publish("high_churn_alert", message).await?;
477
478 Ok(())
479 }
480
481 async fn remove_from_routing_tables(&self, node_id: &NodeId) -> Result<()> {
483 self.router.remove_node(node_id).await;
485
486 self.router.remove_hyperbolic_coordinate(node_id).await;
488
489 self.router.remove_from_som(node_id).await;
491
492 self.trust_provider.remove_node(node_id);
494
495 Ok(())
496 }
497
498 async fn get_content_stored_by(&self, node_id: &NodeId) -> Result<Vec<ContentHash>> {
500 let status = self.node_monitor.get_node_status(node_id).await;
501 Ok(status.stored_content.into_iter().collect())
502 }
503
504 async fn identify_lost_content(&self, failed_node: &NodeId) -> Result<Vec<ContentHash>> {
506 let all_content = self.get_content_stored_by(failed_node).await?;
507 let mut at_risk_content = Vec::new();
508
509 for content_hash in all_content {
510 let remaining_replicas = self
511 .recovery_manager
512 .get_remaining_replicas(&content_hash, failed_node)
513 .await?;
514
515 if remaining_replicas < 5 {
517 at_risk_content.push(content_hash);
518 }
519 }
520
521 Ok(at_risk_content)
522 }
523
524 async fn penalize_unexpected_departure(&self, node_id: &NodeId) {
526 self.trust_provider.update_trust(
527 &NodeId { hash: [0u8; 32] }, node_id,
529 false, );
531 }
532
533 async fn trigger_topology_rebalance(&self) -> Result<()> {
535 self.router.rebalance_hyperbolic_space().await;
537
538 self.router.update_som_grid().await;
540
541 self.router.trigger_trust_recomputation().await;
543
544 Ok(())
545 }
546
547 pub async fn handle_heartbeat(&self, node_id: &NodeId) -> Result<()> {
549 self.node_monitor.record_heartbeat(node_id).await;
550 Ok(())
551 }
552
553 pub async fn handle_gossip_activity(&self, node_id: &NodeId) -> Result<()> {
555 self.node_monitor.record_gossip_activity(node_id).await;
556 Ok(())
557 }
558
559 pub async fn get_stats(&self) -> ChurnStats {
561 self.stats.read().await.clone()
562 }
563
564 fn clone_for_task(&self) -> Self {
566 Self {
567 node_id: self.node_id.clone(),
568 predictor: self.predictor.clone(),
569 node_monitor: self.node_monitor.clone(),
570 recovery_manager: self.recovery_manager.clone(),
571 trust_provider: self.trust_provider.clone(),
572 replication_manager: self.replication_manager.clone(),
573 router: self.router.clone(),
574 gossip: self.gossip.clone(),
575 config: self.config.clone(),
576 stats: self.stats.clone(),
577 }
578 }
579}
580
581impl NodeMonitor {
582 pub fn new(config: ChurnConfig) -> Self {
584 Self {
585 node_status: Arc::new(RwLock::new(HashMap::new())),
586 heartbeats: Arc::new(RwLock::new(HashMap::new())),
587 config,
588 }
589 }
590
591 pub async fn get_all_nodes(&self) -> Vec<NodeId> {
593 self.node_status.read().await.keys().cloned().collect()
594 }
595
596 pub async fn get_node_status(&self, node_id: &NodeId) -> NodeStatus {
598 self.node_status
599 .read()
600 .await
601 .get(node_id)
602 .cloned()
603 .unwrap_or(NodeStatus {
604 node_id: node_id.clone(),
605 last_seen: Instant::now(),
606 last_heartbeat: None,
607 last_gossip: None,
608 status: NodeState::Failed,
609 reliability: 0.0,
610 stored_content: HashSet::new(),
611 })
612 }
613
614 pub async fn update_node_state(&self, node_id: &NodeId, state: NodeState) {
616 if let Some(status) = self.node_status.write().await.get_mut(node_id) {
617 status.status = state;
618 }
619 }
620
621 pub async fn record_heartbeat(&self, node_id: &NodeId) {
623 let now = Instant::now();
624 self.heartbeats.write().await.insert(node_id.clone(), now);
625
626 if let Some(status) = self.node_status.write().await.get_mut(node_id) {
627 status.last_heartbeat = Some(now);
628 status.last_seen = now;
629 status.status = NodeState::Active;
630 }
631 }
632
633 pub async fn record_gossip_activity(&self, node_id: &NodeId) {
635 let now = Instant::now();
636
637 if let Some(status) = self.node_status.write().await.get_mut(node_id) {
638 status.last_gossip = Some(now);
639 status.last_seen = now;
640 }
641 }
642
643 pub async fn is_alive(&self, node_id: &NodeId) -> bool {
645 if let Some(last_heartbeat) = self.heartbeats.read().await.get(node_id) {
646 last_heartbeat.elapsed() < self.config.heartbeat_timeout
647 } else {
648 false
649 }
650 }
651}
652
653impl Default for RecoveryManager {
654 fn default() -> Self {
655 Self::new()
656 }
657}
658
659impl RecoveryManager {
660 pub fn new() -> Self {
662 Self {
663 content_tracker: Arc::new(RwLock::new(HashMap::new())),
664 recovery_queue: Arc::new(RwLock::new(Vec::new())),
665 _active_recoveries: Arc::new(RwLock::new(HashMap::new())),
666 }
667 }
668
669 pub async fn increase_replication(
671 &self,
672 content_hash: &ContentHash,
673 priority: RecoveryPriority,
674 ) -> Result<()> {
675 self.queue_recovery(*content_hash, vec![], priority).await
677 }
678
679 pub async fn queue_recovery(
681 &self,
682 content_hash: ContentHash,
683 failed_nodes: Vec<NodeId>,
684 priority: RecoveryPriority,
685 ) -> Result<()> {
686 let task = RecoveryTask {
687 content_hash,
688 failed_nodes,
689 priority,
690 created_at: Instant::now(),
691 };
692
693 let mut queue = self.recovery_queue.write().await;
694 queue.push(task);
695
696 queue.sort_by_key(|task| std::cmp::Reverse(task.priority));
698
699 Ok(())
700 }
701
702 pub async fn get_remaining_replicas(
704 &self,
705 content_hash: &ContentHash,
706 exclude_node: &NodeId,
707 ) -> Result<u32> {
708 if let Some(tracker) = self.content_tracker.read().await.get(content_hash) {
709 let remaining = tracker
710 .storing_nodes
711 .iter()
712 .filter(|&n| n != exclude_node)
713 .count() as u32;
714 Ok(remaining)
715 } else {
716 Ok(0)
717 }
718 }
719}
720
721#[cfg(test)]
722mod tests {
723 use super::*;
724 use crate::adaptive::som::{GridSize, SomConfig};
725 use crate::adaptive::trust::MockTrustProvider;
726 use rand::RngCore;
727
728 async fn create_test_churn_handler() -> ChurnHandler {
729 let predictor = Arc::new(ChurnPredictor::new());
730 let trust_provider = Arc::new(MockTrustProvider::new());
731 let replication_manager = Arc::new(ReplicationManager::new(
732 Default::default(),
733 trust_provider.clone(),
734 predictor.clone(),
735 Arc::new(AdaptiveRouter::new(
736 trust_provider.clone(),
737 Arc::new(crate::adaptive::hyperbolic::HyperbolicSpace::new()),
738 Arc::new(crate::adaptive::som::SelfOrganizingMap::new(SomConfig {
739 initial_learning_rate: 0.3,
740 initial_radius: 5.0,
741 iterations: 100,
742 grid_size: GridSize::Fixed(10, 10),
743 })),
744 )),
745 ));
746 let router = Arc::new(AdaptiveRouter::new(
747 trust_provider.clone(),
748 Arc::new(crate::adaptive::hyperbolic::HyperbolicSpace::new()),
749 Arc::new(crate::adaptive::som::SelfOrganizingMap::new(SomConfig {
750 initial_learning_rate: 0.3,
751 initial_radius: 5.0,
752 iterations: 100,
753 grid_size: GridSize::Fixed(10, 10),
754 })),
755 ));
756 use crate::peer_record::UserId;
758 let mut hash = [0u8; 32];
759 rand::thread_rng().fill_bytes(&mut hash);
760 let node_id = UserId::from_bytes(hash);
761
762 let gossip = Arc::new(AdaptiveGossipSub::new(
763 node_id.clone(),
764 trust_provider.clone(),
765 ));
766
767 let config = ChurnConfig {
769 heartbeat_timeout: Duration::from_secs(30),
770 gossip_timeout: Duration::from_secs(300),
771 monitoring_interval: Duration::from_secs(5),
772 prediction_threshold: 0.7,
773 max_churn_rate: 0.2,
774 };
775
776 ChurnHandler::new(
777 node_id,
778 predictor,
779 trust_provider,
780 replication_manager,
781 router,
782 gossip,
783 config,
784 )
785 }
786
787 #[tokio::test]
788 async fn test_node_monitoring() {
789 let handler = create_test_churn_handler().await;
790 let node_id = NodeId { hash: [1u8; 32] };
791
792 handler.handle_heartbeat(&node_id).await.unwrap();
794
795 assert!(handler.node_monitor.is_alive(&node_id).await);
797
798 let status = handler.node_monitor.get_node_status(&node_id).await;
800 assert_eq!(status.status, NodeState::Active);
801 assert!(status.last_heartbeat.is_some());
802 }
803
804 #[tokio::test]
805 async fn test_failure_detection() {
806 let mut config = ChurnConfig::default();
807 config.heartbeat_timeout = Duration::from_millis(100); let handler = create_test_churn_handler().await;
810 let node_id = NodeId { hash: [1u8; 32] };
811
812 handler.handle_heartbeat(&node_id).await.unwrap();
814
815 tokio::time::sleep(Duration::from_millis(150)).await;
817
818 assert!(!handler.node_monitor.is_alive(&node_id).await);
820 }
821
822 #[tokio::test]
823 async fn test_proactive_replication() {
824 let handler = create_test_churn_handler().await;
825 let node_id = NodeId { hash: [1u8; 32] };
826
827 handler
829 .predictor
830 .update_node_features(
831 &node_id,
832 vec![
833 0.1, 0.9, 0.1, 0.1, 0.0, 0.0, 0.1, 0.0, 0.0, 0.0,
841 ],
842 )
843 .await
844 .unwrap();
845
846 handler.handle_imminent_departure(&node_id).await.unwrap();
848
849 let status = handler.node_monitor.get_node_status(&node_id).await;
851 assert_eq!(status.status, NodeState::Departing);
852 }
853
854 #[tokio::test]
855 async fn test_churn_rate_calculation() {
856 let handler = create_test_churn_handler().await;
857
858 for i in 0..10 {
860 let node_id = NodeId { hash: [i; 32] };
861 handler.handle_heartbeat(&node_id).await.unwrap();
862 }
863
864 for i in 0..3 {
866 let node_id = NodeId { hash: [i; 32] };
867 handler
868 .node_monitor
869 .update_node_state(&node_id, NodeState::Failed)
870 .await;
871 }
872
873 handler.monitor_cycle().await.unwrap();
875
876 let stats = handler.get_stats().await;
878 assert_eq!(stats.total_nodes, 10);
879 assert_eq!(stats.failed_nodes, 3);
880 assert!((stats.churn_rate - 0.3).abs() < 0.01); }
882}