saorsa_core/adaptive/
replication.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: saorsalabs@gmail.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Adaptive replication manager for content durability
15//!
16//! This module manages content replication based on:
17//! - Network churn rate
18//! - Content popularity
19//! - Node reliability
20//! - Available storage capacity
21
22use super::*;
23use crate::adaptive::{
24    TrustProvider,
25    learning::ChurnPredictor,
26    routing::AdaptiveRouter,
27    storage::{ContentMetadata, ReplicationConfig},
28};
29use anyhow::Result;
30use std::{
31    collections::{HashMap, HashSet},
32    sync::Arc,
33    time::{Duration, Instant},
34};
35use tokio::sync::RwLock;
36
37/// Replication manager for adaptive content replication
38pub struct ReplicationManager {
39    /// Replication configuration
40    config: ReplicationConfig,
41
42    /// Trust provider for node selection
43    trust_provider: Arc<dyn TrustProvider>,
44
45    /// Churn predictor for proactive replication
46    churn_predictor: Arc<ChurnPredictor>,
47
48    /// Routing strategies for node selection
49    router: Arc<AdaptiveRouter>,
50
51    /// Content replica tracking
52    replica_map: Arc<RwLock<HashMap<ContentHash, ReplicaInfo>>>,
53
54    /// Replication statistics
55    stats: Arc<RwLock<ReplicationStats>>,
56}
57
58/// Information about content replicas
59#[derive(Debug, Clone)]
60pub struct ReplicaInfo {
61    /// Set of nodes storing this content
62    pub storing_nodes: HashSet<NodeId>,
63
64    /// Current replication factor
65    pub replication_factor: u32,
66
67    /// Target replication factor
68    pub target_factor: u32,
69
70    /// Last replication check
71    pub last_check: Instant,
72
73    /// Content metadata
74    pub metadata: ContentMetadata,
75}
76
77/// Replication strategy selection
78#[derive(Debug, Clone, PartialEq)]
79pub enum ReplicationStrategy {
80    /// Use all available strategies
81    Composite,
82
83    /// Kademlia-based replication
84    Kademlia,
85
86    /// Trust-based replication
87    TrustBased,
88
89    /// Proximity-based replication
90    ProximityBased,
91}
92
93/// Replication statistics
94#[derive(Debug, Default, Clone)]
95pub struct ReplicationStats {
96    /// Total replications performed
97    pub total_replications: u64,
98
99    /// Successful replications
100    pub successful_replications: u64,
101
102    /// Failed replications
103    pub failed_replications: u64,
104
105    /// Proactive replications (due to churn prediction)
106    pub proactive_replications: u64,
107
108    /// Average replication factor
109    pub avg_replication_factor: f64,
110}
111
112impl ReplicationManager {
113    /// Create a new replication manager
114    pub fn new(
115        config: ReplicationConfig,
116        trust_provider: Arc<dyn TrustProvider>,
117        churn_predictor: Arc<ChurnPredictor>,
118        router: Arc<AdaptiveRouter>,
119    ) -> Self {
120        Self {
121            config,
122            trust_provider,
123            churn_predictor,
124            router,
125            replica_map: Arc::new(RwLock::new(HashMap::new())),
126            stats: Arc::new(RwLock::new(ReplicationStats::default())),
127        }
128    }
129
130    /// Calculate adaptive replication factor based on network conditions
131    pub async fn calculate_replication_factor(&self, content_hash: &ContentHash) -> u32 {
132        // Get current network churn rate (would be calculated from network stats)
133        let churn_rate = self.estimate_churn_rate().await;
134
135        // Base factor
136        let mut factor = self.config.base_replicas;
137
138        // Adjust based on churn rate
139        if churn_rate > self.config.churn_threshold {
140            // Increase replication linearly with churn rate
141            let churn_multiplier = 1.0 + (churn_rate - self.config.churn_threshold) * 2.0;
142            factor = (factor as f64 * churn_multiplier) as u32;
143        }
144
145        // Check content popularity (would track access patterns)
146        let popularity = self.get_content_popularity(content_hash).await;
147        if popularity > 0.8 {
148            factor = (factor as f64 * 1.5) as u32;
149        }
150
151        // Clamp to configured range
152        factor
153            .max(self.config.min_replicas)
154            .min(self.config.max_replicas)
155    }
156
157    /// Select nodes for replication based on composite scoring
158    pub async fn select_replication_nodes(
159        &self,
160        _content_hash: &ContentHash,
161        count: usize,
162        exclude: &HashSet<NodeId>,
163    ) -> Result<Vec<NodeId>> {
164        // Get candidate nodes using different strategies
165        let mut candidates = HashMap::new();
166
167        // Get nodes from each routing strategy
168        let strategies = self.router.get_all_strategies();
169        for (strategy_name, strategy) in strategies {
170            let nodes = strategy
171                .find_closest_nodes(_content_hash, count * 2)
172                .await?;
173            for node in nodes {
174                if !exclude.contains(&node) {
175                    candidates
176                        .entry(node)
177                        .or_insert(Vec::new())
178                        .push(strategy_name.clone());
179                }
180            }
181        }
182
183        // Score nodes based on composite criteria
184        let mut scored_nodes: Vec<(NodeId, f64)> = Vec::new();
185        for (node, strategies_found) in candidates {
186            let score = self
187                .calculate_node_score(&node, _content_hash, &strategies_found)
188                .await;
189            scored_nodes.push((node, score));
190        }
191
192        // Sort by score (descending) and take top nodes
193        scored_nodes.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
194
195        Ok(scored_nodes
196            .into_iter()
197            .take(count)
198            .map(|(node, _)| node)
199            .collect())
200    }
201
202    /// Calculate composite node score for replication
203    async fn calculate_node_score(
204        &self,
205        node: &NodeId,
206        _content_hash: &ContentHash,
207        strategies_found: &[String],
208    ) -> f64 {
209        // Base score from specification:
210        // Score = 0.4×(XOR_proximity) + 0.3×(trust) + 0.2×(hyperbolic_distance) + 0.1×(SOM_similarity)
211
212        let mut score = 0.0;
213
214        // XOR proximity (if found by Kademlia)
215        if strategies_found.contains(&"Kademlia".to_string()) {
216            score += 0.4;
217        }
218
219        // Trust score
220        let trust = self.trust_provider.get_trust_score(node);
221        score += 0.3 * trust;
222
223        // Hyperbolic proximity (if found by Hyperbolic routing)
224        if strategies_found.contains(&"Hyperbolic".to_string()) {
225            score += 0.2;
226        }
227
228        // SOM similarity (if found by SOM routing)
229        if strategies_found.contains(&"SOM".to_string()) {
230            score += 0.1;
231        }
232
233        // Bonus for being found by multiple strategies
234        score += 0.1 * (strategies_found.len() as f64 / 4.0);
235
236        // Penalty for high churn probability
237        let churn_probability = self.churn_predictor.predict(node).await.probability_1h;
238        score *= 1.0 - (churn_probability * 0.5);
239
240        score
241    }
242
243    /// Replicate content to maintain target replication factor
244    pub async fn replicate_content(
245        &self,
246        _content_hash: &ContentHash,
247        content: &[u8],
248        metadata: ContentMetadata,
249    ) -> Result<ReplicaInfo> {
250        // Calculate target replication factor
251        let target_factor = self.calculate_replication_factor(_content_hash).await;
252
253        // Get current replicas
254        let mut replica_map = self.replica_map.write().await;
255        let current_replicas = replica_map
256            .get(_content_hash)
257            .map(|info| info.storing_nodes.clone())
258            .unwrap_or_default();
259
260        let current_count = current_replicas.len() as u32;
261
262        // If we need more replicas
263        if current_count < target_factor {
264            let needed = (target_factor - current_count) as usize;
265            let new_nodes = self
266                .select_replication_nodes(_content_hash, needed, &current_replicas)
267                .await?;
268
269            // Replicate to selected nodes (in real implementation, would send via network)
270            let mut successful_nodes = current_replicas.clone();
271            let mut stats = self.stats.write().await;
272
273            for node in new_nodes {
274                // Simulate replication (would actually send content to node)
275                if self
276                    .send_replica_to_node(&node, _content_hash, content)
277                    .await
278                {
279                    successful_nodes.insert(node);
280                    stats.successful_replications += 1;
281                } else {
282                    stats.failed_replications += 1;
283                }
284                stats.total_replications += 1;
285            }
286
287            // Update replica info
288            let replication_factor = successful_nodes.len() as u32;
289            let replica_info = ReplicaInfo {
290                storing_nodes: successful_nodes,
291                replication_factor,
292                target_factor,
293                last_check: Instant::now(),
294                metadata,
295            };
296
297            replica_map.insert(*_content_hash, replica_info.clone());
298            Ok(replica_info)
299        } else {
300            // Already have enough replicas
301            let replica_info = replica_map
302                .get(_content_hash)
303                .cloned()
304                .unwrap_or(ReplicaInfo {
305                    storing_nodes: current_replicas,
306                    replication_factor: current_count,
307                    target_factor,
308                    last_check: Instant::now(),
309                    metadata,
310                });
311            Ok(replica_info)
312        }
313    }
314
315    /// Check and maintain replication for stored content
316    pub async fn maintain_replications(&self) -> Result<()> {
317        let replica_map = self.replica_map.read().await;
318        let content_to_check: Vec<_> = replica_map
319            .iter()
320            .filter(|(_, info)| {
321                // Check content that hasn't been checked recently
322                info.last_check.elapsed() > Duration::from_secs(300) // 5 minutes
323            })
324            .map(|(hash, info)| (*hash, info.clone()))
325            .collect();
326        drop(replica_map);
327
328        for (content_hash, mut replica_info) in content_to_check {
329            // Check if any storing nodes are at risk of churning
330            let mut at_risk_nodes = Vec::new();
331            for node in &replica_info.storing_nodes {
332                if self.churn_predictor.should_replicate(node).await {
333                    at_risk_nodes.push(node.clone());
334                }
335            }
336
337            // Proactively replicate if nodes are at risk
338            if !at_risk_nodes.is_empty() {
339                let mut stats = self.stats.write().await;
340                stats.proactive_replications += 1;
341                drop(stats);
342
343                // Select new nodes to replace at-risk ones
344                let replacement_nodes = self
345                    .select_replication_nodes(
346                        &content_hash,
347                        at_risk_nodes.len(),
348                        &replica_info.storing_nodes,
349                    )
350                    .await?;
351
352                // Update storing nodes (in real implementation, would trigger actual replication)
353                for (old_node, new_node) in at_risk_nodes.iter().zip(replacement_nodes.iter()) {
354                    replica_info.storing_nodes.remove(old_node);
355                    replica_info.storing_nodes.insert(new_node.clone());
356                }
357            }
358
359            // Update last check time
360            replica_info.last_check = Instant::now();
361            self.replica_map
362                .write()
363                .await
364                .insert(content_hash, replica_info);
365        }
366
367        Ok(())
368    }
369
370    /// Handle node departure by checking affected content
371    pub async fn handle_node_departure(&self, departed_node: &NodeId) -> Result<()> {
372        let replica_map = self.replica_map.read().await;
373        let affected_content: Vec<_> = replica_map
374            .iter()
375            .filter(|(_, info)| info.storing_nodes.contains(departed_node))
376            .map(|(hash, info)| (*hash, info.clone()))
377            .collect();
378        drop(replica_map);
379
380        for (content_hash, mut replica_info) in affected_content {
381            // Remove departed node
382            replica_info.storing_nodes.remove(departed_node);
383            replica_info.replication_factor = replica_info.storing_nodes.len() as u32;
384
385            // Check if we need to replicate to maintain factor
386            if replica_info.replication_factor < replica_info.target_factor {
387                let needed =
388                    (replica_info.target_factor - replica_info.replication_factor) as usize;
389                let new_nodes = self
390                    .select_replication_nodes(&content_hash, needed, &replica_info.storing_nodes)
391                    .await?;
392
393                // Add new replicas (in real implementation)
394                for node in new_nodes {
395                    replica_info.storing_nodes.insert(node);
396                }
397                replica_info.replication_factor = replica_info.storing_nodes.len() as u32;
398            }
399
400            self.replica_map
401                .write()
402                .await
403                .insert(content_hash, replica_info);
404        }
405
406        Ok(())
407    }
408
409    /// Simulate sending replica to a node (would use network in real implementation)
410    async fn send_replica_to_node(
411        &self,
412        node: &NodeId,
413        _content_hash: &ContentHash,
414        _content: &[u8],
415    ) -> bool {
416        // In real implementation, this would:
417        // 1. Establish connection to node
418        // 2. Send STORE_REPLICA message
419        // 3. Wait for acknowledgment
420        // 4. Return success/failure
421
422        // For now, simulate with trust-based success probability
423        let trust = self.trust_provider.get_trust_score(node);
424        rand::random::<f64>() < trust
425    }
426
427    /// Estimate current network churn rate
428    async fn estimate_churn_rate(&self) -> f64 {
429        // In real implementation, would calculate from network statistics
430        // For now, return a simulated value
431        0.2 // 20% churn rate
432    }
433
434    /// Get content popularity score
435    async fn get_content_popularity(&self, _content_hash: &ContentHash) -> f64 {
436        // In real implementation, would track access patterns
437        // For now, return a simulated value
438        0.5
439    }
440
441    /// Get replication statistics
442    pub async fn get_stats(&self) -> ReplicationStats {
443        let stats = self.stats.read().await;
444        let replica_map = self.replica_map.read().await;
445
446        // Calculate average replication factor
447        let avg_factor = if replica_map.is_empty() {
448            0.0
449        } else {
450            let total_factor: u32 = replica_map
451                .values()
452                .map(|info| info.replication_factor)
453                .sum();
454            total_factor as f64 / replica_map.len() as f64
455        };
456
457        ReplicationStats {
458            avg_replication_factor: avg_factor,
459            ..stats.clone()
460        }
461    }
462
463    /// Increase global replication factor during high churn
464    pub async fn increase_global_replication(&self, _multiplier: f64) {
465        // This would increase the replication factor for all content
466        // For now, just log the action
467        // log::info!("Increasing global replication by {:.2}x due to high churn", multiplier);
468
469        // In a real implementation, would update config and trigger re-replication
470        // self.config.base_replicas = (self.config.base_replicas as f64 * multiplier) as u32;
471    }
472}
473
474#[cfg(test)]
475mod tests {
476    use super::*;
477    use crate::adaptive::trust::MockTrustProvider;
478    use std::sync::Arc;
479
480    fn create_test_replication_manager() -> ReplicationManager {
481        let config = ReplicationConfig::default();
482        let trust_provider = Arc::new(MockTrustProvider::new());
483        let churn_predictor = Arc::new(ChurnPredictor::new());
484        let router = Arc::new(AdaptiveRouter::new(
485            trust_provider.clone(),
486            Arc::new(crate::adaptive::hyperbolic::HyperbolicSpace::new()),
487            Arc::new(crate::adaptive::som::SelfOrganizingMap::new(
488                crate::adaptive::som::SomConfig {
489                    initial_learning_rate: 0.5,
490                    initial_radius: 3.0,
491                    iterations: 1000,
492                    grid_size: crate::adaptive::som::GridSize::Fixed(10, 10),
493                },
494            )),
495        ));
496
497        ReplicationManager::new(config, trust_provider, churn_predictor, router)
498    }
499
500    #[tokio::test]
501    async fn test_adaptive_replication_factor() {
502        let manager = create_test_replication_manager();
503        let content_hash = ContentHash([1u8; 32]);
504
505        // Test base replication factor
506        let factor = manager.calculate_replication_factor(&content_hash).await;
507        assert!(factor >= manager.config.min_replicas);
508        assert!(factor <= manager.config.max_replicas);
509    }
510
511    #[tokio::test]
512    async fn test_node_selection_excludes_nodes() {
513        let manager = create_test_replication_manager();
514        let content_hash = ContentHash([1u8; 32]);
515        let mut exclude = HashSet::new();
516        exclude.insert(NodeId { hash: [1u8; 32] });
517        exclude.insert(NodeId { hash: [2u8; 32] });
518
519        let nodes = manager
520            .select_replication_nodes(&content_hash, 5, &exclude)
521            .await
522            .unwrap();
523
524        // Verify excluded nodes are not in results
525        for node in nodes {
526            assert!(!exclude.contains(&node));
527        }
528    }
529
530    #[tokio::test]
531    async fn test_replication_tracking() {
532        let manager = create_test_replication_manager();
533        let content_hash = ContentHash([1u8; 32]);
534        let content = b"Test content";
535        let metadata = ContentMetadata {
536            size: content.len(),
537            content_type: ContentType::DataRetrieval,
538            created_at: std::time::SystemTime::now()
539                .duration_since(std::time::UNIX_EPOCH)
540                .unwrap()
541                .as_secs(),
542            chunk_count: None,
543            replication_factor: 8,
544        };
545
546        // Perform replication
547        let replica_info = manager
548            .replicate_content(&content_hash, content, metadata)
549            .await
550            .unwrap();
551
552        // Check that replicas were tracked
553        assert!(replica_info.replication_factor > 0);
554        assert!(!replica_info.storing_nodes.is_empty());
555
556        // Check stats
557        let stats = manager.get_stats().await;
558        assert!(stats.total_replications > 0);
559    }
560
561    #[tokio::test]
562    async fn test_proactive_replication() {
563        let manager = create_test_replication_manager();
564
565        // Add some content to track
566        let content_hash = ContentHash([1u8; 32]);
567        let mut replica_info = ReplicaInfo {
568            storing_nodes: HashSet::new(),
569            replication_factor: 3,
570            target_factor: 5,
571            last_check: Instant::now() - Duration::from_secs(400), // Old check
572            metadata: ContentMetadata {
573                size: 100,
574                content_type: ContentType::DataRetrieval,
575                created_at: std::time::SystemTime::now()
576                    .duration_since(std::time::UNIX_EPOCH)
577                    .unwrap()
578                    .as_secs(),
579                chunk_count: None,
580                replication_factor: 5,
581            },
582        };
583
584        // Add some nodes
585        for i in 0..3 {
586            replica_info.storing_nodes.insert(NodeId {
587                hash: [i as u8; 32],
588            });
589        }
590
591        manager
592            .replica_map
593            .write()
594            .await
595            .insert(content_hash, replica_info);
596
597        // Run maintenance
598        manager.maintain_replications().await.unwrap();
599
600        // Check that maintenance was performed
601        let updated = manager
602            .replica_map
603            .read()
604            .await
605            .get(&content_hash)
606            .unwrap()
607            .clone();
608        assert!(updated.last_check.elapsed() < Duration::from_secs(1));
609    }
610
611    #[tokio::test]
612    async fn test_node_departure_handling() {
613        let manager = create_test_replication_manager();
614        let departed_node = NodeId { hash: [1u8; 32] };
615
616        // Add content that includes the departed node
617        let content_hash = ContentHash([1u8; 32]);
618        let mut storing_nodes = HashSet::new();
619        storing_nodes.insert(departed_node.clone());
620        storing_nodes.insert(NodeId { hash: [2u8; 32] });
621        storing_nodes.insert(NodeId { hash: [3u8; 32] });
622
623        let replica_info = ReplicaInfo {
624            storing_nodes,
625            replication_factor: 3,
626            target_factor: 5,
627            last_check: Instant::now(),
628            metadata: ContentMetadata {
629                size: 100,
630                content_type: ContentType::DataRetrieval,
631                created_at: std::time::SystemTime::now()
632                    .duration_since(std::time::UNIX_EPOCH)
633                    .unwrap()
634                    .as_secs(),
635                chunk_count: None,
636                replication_factor: 5,
637            },
638        };
639
640        manager
641            .replica_map
642            .write()
643            .await
644            .insert(content_hash.clone(), replica_info);
645
646        // Handle departure
647        manager.handle_node_departure(&departed_node).await.unwrap();
648
649        // Check that node was removed and replication adjusted
650        let updated = manager
651            .replica_map
652            .read()
653            .await
654            .get(&content_hash)
655            .unwrap()
656            .clone();
657        assert!(!updated.storing_nodes.contains(&departed_node));
658    }
659}