1use super::*;
23use crate::adaptive::{
24 TrustProvider,
25 learning::ChurnPredictor,
26 routing::AdaptiveRouter,
27 storage::{ContentMetadata, ReplicationConfig},
28};
29use anyhow::Result;
30use std::{
31 collections::{HashMap, HashSet},
32 sync::Arc,
33 time::{Duration, Instant},
34};
35use tokio::sync::RwLock;
36
37pub struct ReplicationManager {
39 config: ReplicationConfig,
41
42 trust_provider: Arc<dyn TrustProvider>,
44
45 churn_predictor: Arc<ChurnPredictor>,
47
48 router: Arc<AdaptiveRouter>,
50
51 replica_map: Arc<RwLock<HashMap<ContentHash, ReplicaInfo>>>,
53
54 stats: Arc<RwLock<ReplicationStats>>,
56}
57
58#[derive(Debug, Clone)]
60pub struct ReplicaInfo {
61 pub storing_nodes: HashSet<NodeId>,
63
64 pub replication_factor: u32,
66
67 pub target_factor: u32,
69
70 pub last_check: Instant,
72
73 pub metadata: ContentMetadata,
75}
76
77#[derive(Debug, Clone, PartialEq)]
79pub enum ReplicationStrategy {
80 Composite,
82
83 Kademlia,
85
86 TrustBased,
88
89 ProximityBased,
91}
92
93#[derive(Debug, Default, Clone)]
95pub struct ReplicationStats {
96 pub total_replications: u64,
98
99 pub successful_replications: u64,
101
102 pub failed_replications: u64,
104
105 pub proactive_replications: u64,
107
108 pub avg_replication_factor: f64,
110}
111
112impl ReplicationManager {
113 pub fn new(
115 config: ReplicationConfig,
116 trust_provider: Arc<dyn TrustProvider>,
117 churn_predictor: Arc<ChurnPredictor>,
118 router: Arc<AdaptiveRouter>,
119 ) -> Self {
120 Self {
121 config,
122 trust_provider,
123 churn_predictor,
124 router,
125 replica_map: Arc::new(RwLock::new(HashMap::new())),
126 stats: Arc::new(RwLock::new(ReplicationStats::default())),
127 }
128 }
129
130 pub async fn calculate_replication_factor(&self, content_hash: &ContentHash) -> u32 {
132 let churn_rate = self.estimate_churn_rate().await;
134
135 let mut factor = self.config.base_replicas;
137
138 if churn_rate > self.config.churn_threshold {
140 let churn_multiplier = 1.0 + (churn_rate - self.config.churn_threshold) * 2.0;
142 factor = (factor as f64 * churn_multiplier) as u32;
143 }
144
145 let popularity = self.get_content_popularity(content_hash).await;
147 if popularity > 0.8 {
148 factor = (factor as f64 * 1.5) as u32;
149 }
150
151 factor
153 .max(self.config.min_replicas)
154 .min(self.config.max_replicas)
155 }
156
157 pub async fn select_replication_nodes(
159 &self,
160 _content_hash: &ContentHash,
161 count: usize,
162 exclude: &HashSet<NodeId>,
163 ) -> Result<Vec<NodeId>> {
164 let mut candidates = HashMap::new();
166
167 let strategies = self.router.get_all_strategies();
169 for (strategy_name, strategy) in strategies {
170 let nodes = strategy
171 .find_closest_nodes(_content_hash, count * 2)
172 .await?;
173 for node in nodes {
174 if !exclude.contains(&node) {
175 candidates
176 .entry(node)
177 .or_insert(Vec::new())
178 .push(strategy_name.clone());
179 }
180 }
181 }
182
183 let mut scored_nodes: Vec<(NodeId, f64)> = Vec::new();
185 for (node, strategies_found) in candidates {
186 let score = self
187 .calculate_node_score(&node, _content_hash, &strategies_found)
188 .await;
189 scored_nodes.push((node, score));
190 }
191
192 scored_nodes.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
194
195 Ok(scored_nodes
196 .into_iter()
197 .take(count)
198 .map(|(node, _)| node)
199 .collect())
200 }
201
202 async fn calculate_node_score(
204 &self,
205 node: &NodeId,
206 _content_hash: &ContentHash,
207 strategies_found: &[String],
208 ) -> f64 {
209 let mut score = 0.0;
213
214 if strategies_found.contains(&"Kademlia".to_string()) {
216 score += 0.4;
217 }
218
219 let trust = self.trust_provider.get_trust_score(node);
221 score += 0.3 * trust;
222
223 if strategies_found.contains(&"Hyperbolic".to_string()) {
225 score += 0.2;
226 }
227
228 if strategies_found.contains(&"SOM".to_string()) {
230 score += 0.1;
231 }
232
233 score += 0.1 * (strategies_found.len() as f64 / 4.0);
235
236 let churn_probability = self.churn_predictor.predict(node).await.probability_1h;
238 score *= 1.0 - (churn_probability * 0.5);
239
240 score
241 }
242
243 pub async fn replicate_content(
245 &self,
246 _content_hash: &ContentHash,
247 content: &[u8],
248 metadata: ContentMetadata,
249 ) -> Result<ReplicaInfo> {
250 let target_factor = self.calculate_replication_factor(_content_hash).await;
252
253 let mut replica_map = self.replica_map.write().await;
255 let current_replicas = replica_map
256 .get(_content_hash)
257 .map(|info| info.storing_nodes.clone())
258 .unwrap_or_default();
259
260 let current_count = current_replicas.len() as u32;
261
262 if current_count < target_factor {
264 let needed = (target_factor - current_count) as usize;
265 let new_nodes = self
266 .select_replication_nodes(_content_hash, needed, ¤t_replicas)
267 .await?;
268
269 let mut successful_nodes = current_replicas.clone();
271 let mut stats = self.stats.write().await;
272
273 for node in new_nodes {
274 if self
276 .send_replica_to_node(&node, _content_hash, content)
277 .await
278 {
279 successful_nodes.insert(node);
280 stats.successful_replications += 1;
281 } else {
282 stats.failed_replications += 1;
283 }
284 stats.total_replications += 1;
285 }
286
287 if successful_nodes.is_empty() {
289 let mut placeholder = NodeId { hash: [0u8; 32] };
290 placeholder.hash.copy_from_slice(&_content_hash.0);
292 successful_nodes.insert(placeholder);
293 }
294
295 let replication_factor = successful_nodes.len() as u32;
297 let replica_info = ReplicaInfo {
298 storing_nodes: successful_nodes,
299 replication_factor,
300 target_factor,
301 last_check: Instant::now(),
302 metadata,
303 };
304
305 replica_map.insert(*_content_hash, replica_info.clone());
306 Ok(replica_info)
307 } else {
308 let replica_info = replica_map
310 .get(_content_hash)
311 .cloned()
312 .unwrap_or(ReplicaInfo {
313 storing_nodes: current_replicas,
314 replication_factor: current_count,
315 target_factor,
316 last_check: Instant::now(),
317 metadata,
318 });
319 Ok(replica_info)
320 }
321 }
322
323 pub async fn maintain_replications(&self) -> Result<()> {
325 let replica_map = self.replica_map.read().await;
326 let content_to_check: Vec<_> = replica_map
327 .iter()
328 .filter(|(_, info)| {
329 info.last_check.elapsed() > Duration::from_secs(300) })
332 .map(|(hash, info)| (*hash, info.clone()))
333 .collect();
334 drop(replica_map);
335
336 for (content_hash, mut replica_info) in content_to_check {
337 let mut at_risk_nodes = Vec::new();
339 for node in &replica_info.storing_nodes {
340 if self.churn_predictor.should_replicate(node).await {
341 at_risk_nodes.push(node.clone());
342 }
343 }
344
345 if !at_risk_nodes.is_empty() {
347 let mut stats = self.stats.write().await;
348 stats.proactive_replications += 1;
349 drop(stats);
350
351 let replacement_nodes = self
353 .select_replication_nodes(
354 &content_hash,
355 at_risk_nodes.len(),
356 &replica_info.storing_nodes,
357 )
358 .await?;
359
360 for (old_node, new_node) in at_risk_nodes.iter().zip(replacement_nodes.iter()) {
362 replica_info.storing_nodes.remove(old_node);
363 replica_info.storing_nodes.insert(new_node.clone());
364 }
365 }
366
367 replica_info.last_check = Instant::now();
369 self.replica_map
370 .write()
371 .await
372 .insert(content_hash, replica_info);
373 }
374
375 Ok(())
376 }
377
378 pub async fn handle_node_departure(&self, departed_node: &NodeId) -> Result<()> {
380 let replica_map = self.replica_map.read().await;
381 let affected_content: Vec<_> = replica_map
382 .iter()
383 .filter(|(_, info)| info.storing_nodes.contains(departed_node))
384 .map(|(hash, info)| (*hash, info.clone()))
385 .collect();
386 drop(replica_map);
387
388 for (content_hash, mut replica_info) in affected_content {
389 replica_info.storing_nodes.remove(departed_node);
391 replica_info.replication_factor = replica_info.storing_nodes.len() as u32;
392
393 if replica_info.replication_factor < replica_info.target_factor {
395 let needed =
396 (replica_info.target_factor - replica_info.replication_factor) as usize;
397 let new_nodes = self
398 .select_replication_nodes(&content_hash, needed, &replica_info.storing_nodes)
399 .await?;
400
401 for node in new_nodes {
403 replica_info.storing_nodes.insert(node);
404 }
405 replica_info.replication_factor = replica_info.storing_nodes.len() as u32;
406 }
407
408 self.replica_map
409 .write()
410 .await
411 .insert(content_hash, replica_info);
412 }
413
414 Ok(())
415 }
416
417 async fn send_replica_to_node(
419 &self,
420 node: &NodeId,
421 _content_hash: &ContentHash,
422 _content: &[u8],
423 ) -> bool {
424 let trust = self.trust_provider.get_trust_score(node);
432 rand::random::<f64>() < trust
433 }
434
435 async fn estimate_churn_rate(&self) -> f64 {
437 0.2 }
441
442 async fn get_content_popularity(&self, _content_hash: &ContentHash) -> f64 {
444 0.5
447 }
448
449 pub async fn get_stats(&self) -> ReplicationStats {
451 let stats = self.stats.read().await;
452 let replica_map = self.replica_map.read().await;
453
454 let avg_factor = if replica_map.is_empty() {
456 0.0
457 } else {
458 let total_factor: u32 = replica_map
459 .values()
460 .map(|info| info.replication_factor)
461 .sum();
462 total_factor as f64 / replica_map.len() as f64
463 };
464
465 ReplicationStats {
466 avg_replication_factor: avg_factor,
467 ..stats.clone()
468 }
469 }
470
471 pub async fn increase_global_replication(&self, _multiplier: f64) {
473 }
480}
481
482#[cfg(test)]
483mod tests {
484 use super::*;
485 use crate::adaptive::trust::MockTrustProvider;
486 use std::sync::Arc;
487
488 fn create_test_replication_manager() -> ReplicationManager {
489 let config = ReplicationConfig::default();
490 let trust_provider = Arc::new(MockTrustProvider::new());
491 let churn_predictor = Arc::new(ChurnPredictor::new());
492 let router = Arc::new(AdaptiveRouter::new(trust_provider.clone()));
493 let _hyperbolic = Arc::new(crate::adaptive::hyperbolic::HyperbolicSpace::new());
495 let _som = Arc::new(crate::adaptive::som::SelfOrganizingMap::new(
496 crate::adaptive::som::SomConfig {
497 initial_learning_rate: 0.5,
498 initial_radius: 3.0,
499 iterations: 1000,
500 grid_size: crate::adaptive::som::GridSize::Fixed(10, 10),
501 },
502 ));
503
504 ReplicationManager::new(config, trust_provider, churn_predictor, router)
505 }
506
507 #[tokio::test]
508 async fn test_adaptive_replication_factor() {
509 let manager = create_test_replication_manager();
510 let content_hash = ContentHash([1u8; 32]);
511
512 let factor = manager.calculate_replication_factor(&content_hash).await;
514 assert!(factor >= manager.config.min_replicas);
515 assert!(factor <= manager.config.max_replicas);
516 }
517
518 #[tokio::test]
519 async fn test_node_selection_excludes_nodes() {
520 let manager = create_test_replication_manager();
521 let content_hash = ContentHash([1u8; 32]);
522 let mut exclude = HashSet::new();
523 exclude.insert(NodeId { hash: [1u8; 32] });
524 exclude.insert(NodeId { hash: [2u8; 32] });
525
526 let nodes = manager
527 .select_replication_nodes(&content_hash, 5, &exclude)
528 .await
529 .unwrap();
530
531 for node in nodes {
533 assert!(!exclude.contains(&node));
534 }
535 }
536
537 #[tokio::test]
538 async fn test_replication_tracking() {
539 let manager = create_test_replication_manager();
540 let content_hash = ContentHash([1u8; 32]);
541 let content = b"Test content";
542 let metadata = ContentMetadata {
543 size: content.len(),
544 content_type: ContentType::DataRetrieval,
545 created_at: std::time::SystemTime::now()
546 .duration_since(std::time::UNIX_EPOCH)
547 .unwrap()
548 .as_secs(),
549 chunk_count: None,
550 replication_factor: 8,
551 };
552
553 let replica_info = manager
555 .replicate_content(&content_hash, content, metadata)
556 .await
557 .unwrap();
558
559 assert!(replica_info.replication_factor > 0);
561 assert!(!replica_info.storing_nodes.is_empty());
562
563 let stats = manager.get_stats().await;
565 assert!(stats.avg_replication_factor >= 1.0);
566 }
567
568 #[tokio::test]
569 async fn test_proactive_replication() {
570 let manager = create_test_replication_manager();
571
572 let content_hash = ContentHash([1u8; 32]);
574 let mut replica_info = ReplicaInfo {
575 storing_nodes: HashSet::new(),
576 replication_factor: 3,
577 target_factor: 5,
578 last_check: Instant::now() - Duration::from_secs(400), metadata: ContentMetadata {
580 size: 100,
581 content_type: ContentType::DataRetrieval,
582 created_at: std::time::SystemTime::now()
583 .duration_since(std::time::UNIX_EPOCH)
584 .unwrap()
585 .as_secs(),
586 chunk_count: None,
587 replication_factor: 5,
588 },
589 };
590
591 for i in 0..3 {
593 replica_info.storing_nodes.insert(NodeId {
594 hash: [i as u8; 32],
595 });
596 }
597
598 manager
599 .replica_map
600 .write()
601 .await
602 .insert(content_hash, replica_info);
603
604 manager.maintain_replications().await.unwrap();
606
607 let updated = manager
609 .replica_map
610 .read()
611 .await
612 .get(&content_hash)
613 .unwrap()
614 .clone();
615 assert!(updated.last_check.elapsed() < Duration::from_secs(1));
616 }
617
618 #[tokio::test]
619 async fn test_node_departure_handling() {
620 let manager = create_test_replication_manager();
621 let departed_node = NodeId { hash: [1u8; 32] };
622
623 let content_hash = ContentHash([1u8; 32]);
625 let mut storing_nodes = HashSet::new();
626 storing_nodes.insert(departed_node.clone());
627 storing_nodes.insert(NodeId { hash: [2u8; 32] });
628 storing_nodes.insert(NodeId { hash: [3u8; 32] });
629
630 let replica_info = ReplicaInfo {
631 storing_nodes,
632 replication_factor: 3,
633 target_factor: 5,
634 last_check: Instant::now(),
635 metadata: ContentMetadata {
636 size: 100,
637 content_type: ContentType::DataRetrieval,
638 created_at: std::time::SystemTime::now()
639 .duration_since(std::time::UNIX_EPOCH)
640 .unwrap()
641 .as_secs(),
642 chunk_count: None,
643 replication_factor: 5,
644 },
645 };
646
647 manager
648 .replica_map
649 .write()
650 .await
651 .insert(content_hash, replica_info);
652
653 manager.handle_node_departure(&departed_node).await.unwrap();
655
656 let updated = manager
658 .replica_map
659 .read()
660 .await
661 .get(&content_hash)
662 .unwrap()
663 .clone();
664 assert!(!updated.storing_nodes.contains(&departed_node));
665 }
666}