1use super::*;
23use crate::adaptive::{
24 TrustProvider,
25 learning::ChurnPredictor,
26 routing::AdaptiveRouter,
27 storage::{ContentMetadata, ReplicationConfig},
28};
29use anyhow::Result;
30use std::{
31 collections::{HashMap, HashSet},
32 sync::Arc,
33 time::{Duration, Instant},
34};
35use tokio::sync::RwLock;
36
37pub struct ReplicationManager {
39 config: ReplicationConfig,
41
42 trust_provider: Arc<dyn TrustProvider>,
44
45 churn_predictor: Arc<ChurnPredictor>,
47
48 router: Arc<AdaptiveRouter>,
50
51 replica_map: Arc<RwLock<HashMap<ContentHash, ReplicaInfo>>>,
53
54 stats: Arc<RwLock<ReplicationStats>>,
56}
57
58#[derive(Debug, Clone)]
60pub struct ReplicaInfo {
61 pub storing_nodes: HashSet<NodeId>,
63
64 pub replication_factor: u32,
66
67 pub target_factor: u32,
69
70 pub last_check: Instant,
72
73 pub metadata: ContentMetadata,
75}
76
77#[derive(Debug, Clone, PartialEq)]
79pub enum ReplicationStrategy {
80 Composite,
82
83 Kademlia,
85
86 TrustBased,
88
89 ProximityBased,
91}
92
93#[derive(Debug, Default, Clone)]
95pub struct ReplicationStats {
96 pub total_replications: u64,
98
99 pub successful_replications: u64,
101
102 pub failed_replications: u64,
104
105 pub proactive_replications: u64,
107
108 pub avg_replication_factor: f64,
110}
111
112impl ReplicationManager {
113 pub fn new(
115 config: ReplicationConfig,
116 trust_provider: Arc<dyn TrustProvider>,
117 churn_predictor: Arc<ChurnPredictor>,
118 router: Arc<AdaptiveRouter>,
119 ) -> Self {
120 Self {
121 config,
122 trust_provider,
123 churn_predictor,
124 router,
125 replica_map: Arc::new(RwLock::new(HashMap::new())),
126 stats: Arc::new(RwLock::new(ReplicationStats::default())),
127 }
128 }
129
130 pub async fn calculate_replication_factor(&self, content_hash: &ContentHash) -> u32 {
132 let churn_rate = self.estimate_churn_rate().await;
134
135 let mut factor = self.config.base_replicas;
137
138 if churn_rate > self.config.churn_threshold {
140 let churn_multiplier = 1.0 + (churn_rate - self.config.churn_threshold) * 2.0;
142 factor = (factor as f64 * churn_multiplier) as u32;
143 }
144
145 let popularity = self.get_content_popularity(content_hash).await;
147 if popularity > 0.8 {
148 factor = (factor as f64 * 1.5) as u32;
149 }
150
151 factor
153 .max(self.config.min_replicas)
154 .min(self.config.max_replicas)
155 }
156
157 pub async fn select_replication_nodes(
159 &self,
160 _content_hash: &ContentHash,
161 count: usize,
162 exclude: &HashSet<NodeId>,
163 ) -> Result<Vec<NodeId>> {
164 let mut candidates = HashMap::new();
166
167 let strategies = self.router.get_all_strategies();
169 for (strategy_name, strategy) in strategies {
170 let nodes = strategy
171 .find_closest_nodes(_content_hash, count * 2)
172 .await?;
173 for node in nodes {
174 if !exclude.contains(&node) {
175 candidates
176 .entry(node)
177 .or_insert(Vec::new())
178 .push(strategy_name.clone());
179 }
180 }
181 }
182
183 let mut scored_nodes: Vec<(NodeId, f64)> = Vec::new();
185 for (node, strategies_found) in candidates {
186 let score = self
187 .calculate_node_score(&node, _content_hash, &strategies_found)
188 .await;
189 scored_nodes.push((node, score));
190 }
191
192 scored_nodes.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
194
195 Ok(scored_nodes
196 .into_iter()
197 .take(count)
198 .map(|(node, _)| node)
199 .collect())
200 }
201
202 async fn calculate_node_score(
204 &self,
205 node: &NodeId,
206 _content_hash: &ContentHash,
207 strategies_found: &[String],
208 ) -> f64 {
209 let mut score = 0.0;
213
214 if strategies_found.contains(&"Kademlia".to_string()) {
216 score += 0.4;
217 }
218
219 let trust = self.trust_provider.get_trust_score(node);
221 score += 0.3 * trust;
222
223 if strategies_found.contains(&"Hyperbolic".to_string()) {
225 score += 0.2;
226 }
227
228 if strategies_found.contains(&"SOM".to_string()) {
230 score += 0.1;
231 }
232
233 score += 0.1 * (strategies_found.len() as f64 / 4.0);
235
236 let churn_probability = self.churn_predictor.predict(node).await.probability_1h;
238 score *= 1.0 - (churn_probability * 0.5);
239
240 score
241 }
242
243 pub async fn replicate_content(
245 &self,
246 _content_hash: &ContentHash,
247 content: &[u8],
248 metadata: ContentMetadata,
249 ) -> Result<ReplicaInfo> {
250 let target_factor = self.calculate_replication_factor(_content_hash).await;
252
253 let mut replica_map = self.replica_map.write().await;
255 let current_replicas = replica_map
256 .get(_content_hash)
257 .map(|info| info.storing_nodes.clone())
258 .unwrap_or_default();
259
260 let current_count = current_replicas.len() as u32;
261
262 if current_count < target_factor {
264 let needed = (target_factor - current_count) as usize;
265 let new_nodes = self
266 .select_replication_nodes(_content_hash, needed, ¤t_replicas)
267 .await?;
268
269 let mut successful_nodes = current_replicas.clone();
271 let mut stats = self.stats.write().await;
272
273 for node in new_nodes {
274 if self
276 .send_replica_to_node(&node, _content_hash, content)
277 .await
278 {
279 successful_nodes.insert(node);
280 stats.successful_replications += 1;
281 } else {
282 stats.failed_replications += 1;
283 }
284 stats.total_replications += 1;
285 }
286
287 let replication_factor = successful_nodes.len() as u32;
289 let replica_info = ReplicaInfo {
290 storing_nodes: successful_nodes,
291 replication_factor,
292 target_factor,
293 last_check: Instant::now(),
294 metadata,
295 };
296
297 replica_map.insert(*_content_hash, replica_info.clone());
298 Ok(replica_info)
299 } else {
300 let replica_info = replica_map
302 .get(_content_hash)
303 .cloned()
304 .unwrap_or(ReplicaInfo {
305 storing_nodes: current_replicas,
306 replication_factor: current_count,
307 target_factor,
308 last_check: Instant::now(),
309 metadata,
310 });
311 Ok(replica_info)
312 }
313 }
314
315 pub async fn maintain_replications(&self) -> Result<()> {
317 let replica_map = self.replica_map.read().await;
318 let content_to_check: Vec<_> = replica_map
319 .iter()
320 .filter(|(_, info)| {
321 info.last_check.elapsed() > Duration::from_secs(300) })
324 .map(|(hash, info)| (*hash, info.clone()))
325 .collect();
326 drop(replica_map);
327
328 for (content_hash, mut replica_info) in content_to_check {
329 let mut at_risk_nodes = Vec::new();
331 for node in &replica_info.storing_nodes {
332 if self.churn_predictor.should_replicate(node).await {
333 at_risk_nodes.push(node.clone());
334 }
335 }
336
337 if !at_risk_nodes.is_empty() {
339 let mut stats = self.stats.write().await;
340 stats.proactive_replications += 1;
341 drop(stats);
342
343 let replacement_nodes = self
345 .select_replication_nodes(
346 &content_hash,
347 at_risk_nodes.len(),
348 &replica_info.storing_nodes,
349 )
350 .await?;
351
352 for (old_node, new_node) in at_risk_nodes.iter().zip(replacement_nodes.iter()) {
354 replica_info.storing_nodes.remove(old_node);
355 replica_info.storing_nodes.insert(new_node.clone());
356 }
357 }
358
359 replica_info.last_check = Instant::now();
361 self.replica_map
362 .write()
363 .await
364 .insert(content_hash, replica_info);
365 }
366
367 Ok(())
368 }
369
370 pub async fn handle_node_departure(&self, departed_node: &NodeId) -> Result<()> {
372 let replica_map = self.replica_map.read().await;
373 let affected_content: Vec<_> = replica_map
374 .iter()
375 .filter(|(_, info)| info.storing_nodes.contains(departed_node))
376 .map(|(hash, info)| (*hash, info.clone()))
377 .collect();
378 drop(replica_map);
379
380 for (content_hash, mut replica_info) in affected_content {
381 replica_info.storing_nodes.remove(departed_node);
383 replica_info.replication_factor = replica_info.storing_nodes.len() as u32;
384
385 if replica_info.replication_factor < replica_info.target_factor {
387 let needed =
388 (replica_info.target_factor - replica_info.replication_factor) as usize;
389 let new_nodes = self
390 .select_replication_nodes(&content_hash, needed, &replica_info.storing_nodes)
391 .await?;
392
393 for node in new_nodes {
395 replica_info.storing_nodes.insert(node);
396 }
397 replica_info.replication_factor = replica_info.storing_nodes.len() as u32;
398 }
399
400 self.replica_map
401 .write()
402 .await
403 .insert(content_hash, replica_info);
404 }
405
406 Ok(())
407 }
408
409 async fn send_replica_to_node(
411 &self,
412 node: &NodeId,
413 _content_hash: &ContentHash,
414 _content: &[u8],
415 ) -> bool {
416 let trust = self.trust_provider.get_trust_score(node);
424 rand::random::<f64>() < trust
425 }
426
427 async fn estimate_churn_rate(&self) -> f64 {
429 0.2 }
433
434 async fn get_content_popularity(&self, _content_hash: &ContentHash) -> f64 {
436 0.5
439 }
440
441 pub async fn get_stats(&self) -> ReplicationStats {
443 let stats = self.stats.read().await;
444 let replica_map = self.replica_map.read().await;
445
446 let avg_factor = if replica_map.is_empty() {
448 0.0
449 } else {
450 let total_factor: u32 = replica_map
451 .values()
452 .map(|info| info.replication_factor)
453 .sum();
454 total_factor as f64 / replica_map.len() as f64
455 };
456
457 ReplicationStats {
458 avg_replication_factor: avg_factor,
459 ..stats.clone()
460 }
461 }
462
463 pub async fn increase_global_replication(&self, _multiplier: f64) {
465 }
472}
473
474#[cfg(test)]
475mod tests {
476 use super::*;
477 use crate::adaptive::trust::MockTrustProvider;
478 use std::sync::Arc;
479
480 fn create_test_replication_manager() -> ReplicationManager {
481 let config = ReplicationConfig::default();
482 let trust_provider = Arc::new(MockTrustProvider::new());
483 let churn_predictor = Arc::new(ChurnPredictor::new());
484 let router = Arc::new(AdaptiveRouter::new(
485 trust_provider.clone(),
486 Arc::new(crate::adaptive::hyperbolic::HyperbolicSpace::new()),
487 Arc::new(crate::adaptive::som::SelfOrganizingMap::new(
488 crate::adaptive::som::SomConfig {
489 initial_learning_rate: 0.5,
490 initial_radius: 3.0,
491 iterations: 1000,
492 grid_size: crate::adaptive::som::GridSize::Fixed(10, 10),
493 },
494 )),
495 ));
496
497 ReplicationManager::new(config, trust_provider, churn_predictor, router)
498 }
499
500 #[tokio::test]
501 async fn test_adaptive_replication_factor() {
502 let manager = create_test_replication_manager();
503 let content_hash = ContentHash([1u8; 32]);
504
505 let factor = manager.calculate_replication_factor(&content_hash).await;
507 assert!(factor >= manager.config.min_replicas);
508 assert!(factor <= manager.config.max_replicas);
509 }
510
511 #[tokio::test]
512 async fn test_node_selection_excludes_nodes() {
513 let manager = create_test_replication_manager();
514 let content_hash = ContentHash([1u8; 32]);
515 let mut exclude = HashSet::new();
516 exclude.insert(NodeId { hash: [1u8; 32] });
517 exclude.insert(NodeId { hash: [2u8; 32] });
518
519 let nodes = manager
520 .select_replication_nodes(&content_hash, 5, &exclude)
521 .await
522 .unwrap();
523
524 for node in nodes {
526 assert!(!exclude.contains(&node));
527 }
528 }
529
530 #[tokio::test]
531 async fn test_replication_tracking() {
532 let manager = create_test_replication_manager();
533 let content_hash = ContentHash([1u8; 32]);
534 let content = b"Test content";
535 let metadata = ContentMetadata {
536 size: content.len(),
537 content_type: ContentType::DataRetrieval,
538 created_at: std::time::SystemTime::now()
539 .duration_since(std::time::UNIX_EPOCH)
540 .unwrap()
541 .as_secs(),
542 chunk_count: None,
543 replication_factor: 8,
544 };
545
546 let replica_info = manager
548 .replicate_content(&content_hash, content, metadata)
549 .await
550 .unwrap();
551
552 assert!(replica_info.replication_factor > 0);
554 assert!(!replica_info.storing_nodes.is_empty());
555
556 let stats = manager.get_stats().await;
558 assert!(stats.total_replications > 0);
559 }
560
561 #[tokio::test]
562 async fn test_proactive_replication() {
563 let manager = create_test_replication_manager();
564
565 let content_hash = ContentHash([1u8; 32]);
567 let mut replica_info = ReplicaInfo {
568 storing_nodes: HashSet::new(),
569 replication_factor: 3,
570 target_factor: 5,
571 last_check: Instant::now() - Duration::from_secs(400), metadata: ContentMetadata {
573 size: 100,
574 content_type: ContentType::DataRetrieval,
575 created_at: std::time::SystemTime::now()
576 .duration_since(std::time::UNIX_EPOCH)
577 .unwrap()
578 .as_secs(),
579 chunk_count: None,
580 replication_factor: 5,
581 },
582 };
583
584 for i in 0..3 {
586 replica_info.storing_nodes.insert(NodeId {
587 hash: [i as u8; 32],
588 });
589 }
590
591 manager
592 .replica_map
593 .write()
594 .await
595 .insert(content_hash, replica_info);
596
597 manager.maintain_replications().await.unwrap();
599
600 let updated = manager
602 .replica_map
603 .read()
604 .await
605 .get(&content_hash)
606 .unwrap()
607 .clone();
608 assert!(updated.last_check.elapsed() < Duration::from_secs(1));
609 }
610
611 #[tokio::test]
612 async fn test_node_departure_handling() {
613 let manager = create_test_replication_manager();
614 let departed_node = NodeId { hash: [1u8; 32] };
615
616 let content_hash = ContentHash([1u8; 32]);
618 let mut storing_nodes = HashSet::new();
619 storing_nodes.insert(departed_node.clone());
620 storing_nodes.insert(NodeId { hash: [2u8; 32] });
621 storing_nodes.insert(NodeId { hash: [3u8; 32] });
622
623 let replica_info = ReplicaInfo {
624 storing_nodes,
625 replication_factor: 3,
626 target_factor: 5,
627 last_check: Instant::now(),
628 metadata: ContentMetadata {
629 size: 100,
630 content_type: ContentType::DataRetrieval,
631 created_at: std::time::SystemTime::now()
632 .duration_since(std::time::UNIX_EPOCH)
633 .unwrap()
634 .as_secs(),
635 chunk_count: None,
636 replication_factor: 5,
637 },
638 };
639
640 manager
641 .replica_map
642 .write()
643 .await
644 .insert(content_hash.clone(), replica_info);
645
646 manager.handle_node_departure(&departed_node).await.unwrap();
648
649 let updated = manager
651 .replica_map
652 .read()
653 .await
654 .get(&content_hash)
655 .unwrap()
656 .clone();
657 assert!(!updated.storing_nodes.contains(&departed_node));
658 }
659}