saorsa_core/adaptive/
replication.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: saorsalabs@gmail.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Adaptive replication manager for content durability
15//!
16//! This module manages content replication based on:
17//! - Network churn rate
18//! - Content popularity
19//! - Node reliability
20//! - Available storage capacity
21
22use super::*;
23use crate::adaptive::{
24    TrustProvider,
25    learning::ChurnPredictor,
26    routing::AdaptiveRouter,
27    storage::{ContentMetadata, ReplicationConfig},
28};
29use anyhow::Result;
30use std::{
31    collections::{HashMap, HashSet},
32    sync::Arc,
33    time::{Duration, Instant},
34};
35use tokio::sync::RwLock;
36
37/// Replication manager for adaptive content replication
38pub struct ReplicationManager {
39    /// Replication configuration
40    config: ReplicationConfig,
41
42    /// Trust provider for node selection
43    trust_provider: Arc<dyn TrustProvider>,
44
45    /// Churn predictor for proactive replication
46    churn_predictor: Arc<ChurnPredictor>,
47
48    /// Routing strategies for node selection
49    router: Arc<AdaptiveRouter>,
50
51    /// Content replica tracking
52    replica_map: Arc<RwLock<HashMap<ContentHash, ReplicaInfo>>>,
53
54    /// Replication statistics
55    stats: Arc<RwLock<ReplicationStats>>,
56}
57
58/// Information about content replicas
59#[derive(Debug, Clone)]
60pub struct ReplicaInfo {
61    /// Set of nodes storing this content
62    pub storing_nodes: HashSet<NodeId>,
63
64    /// Current replication factor
65    pub replication_factor: u32,
66
67    /// Target replication factor
68    pub target_factor: u32,
69
70    /// Last replication check
71    pub last_check: Instant,
72
73    /// Content metadata
74    pub metadata: ContentMetadata,
75}
76
77/// Replication strategy selection
78#[derive(Debug, Clone, PartialEq)]
79pub enum ReplicationStrategy {
80    /// Use all available strategies
81    Composite,
82
83    /// Kademlia-based replication
84    Kademlia,
85
86    /// Trust-based replication
87    TrustBased,
88
89    /// Proximity-based replication
90    ProximityBased,
91}
92
93/// Replication statistics
94#[derive(Debug, Default, Clone)]
95pub struct ReplicationStats {
96    /// Total replications performed
97    pub total_replications: u64,
98
99    /// Successful replications
100    pub successful_replications: u64,
101
102    /// Failed replications
103    pub failed_replications: u64,
104
105    /// Proactive replications (due to churn prediction)
106    pub proactive_replications: u64,
107
108    /// Average replication factor
109    pub avg_replication_factor: f64,
110}
111
112impl ReplicationManager {
113    /// Create a new replication manager
114    pub fn new(
115        config: ReplicationConfig,
116        trust_provider: Arc<dyn TrustProvider>,
117        churn_predictor: Arc<ChurnPredictor>,
118        router: Arc<AdaptiveRouter>,
119    ) -> Self {
120        Self {
121            config,
122            trust_provider,
123            churn_predictor,
124            router,
125            replica_map: Arc::new(RwLock::new(HashMap::new())),
126            stats: Arc::new(RwLock::new(ReplicationStats::default())),
127        }
128    }
129
130    /// Calculate adaptive replication factor based on network conditions
131    pub async fn calculate_replication_factor(&self, content_hash: &ContentHash) -> u32 {
132        // Get current network churn rate (would be calculated from network stats)
133        let churn_rate = self.estimate_churn_rate().await;
134
135        // Base factor
136        let mut factor = self.config.base_replicas;
137
138        // Adjust based on churn rate
139        if churn_rate > self.config.churn_threshold {
140            // Increase replication linearly with churn rate
141            let churn_multiplier = 1.0 + (churn_rate - self.config.churn_threshold) * 2.0;
142            factor = (factor as f64 * churn_multiplier) as u32;
143        }
144
145        // Check content popularity (would track access patterns)
146        let popularity = self.get_content_popularity(content_hash).await;
147        if popularity > 0.8 {
148            factor = (factor as f64 * 1.5) as u32;
149        }
150
151        // Clamp to configured range
152        factor
153            .max(self.config.min_replicas)
154            .min(self.config.max_replicas)
155    }
156
157    /// Select nodes for replication based on composite scoring
158    pub async fn select_replication_nodes(
159        &self,
160        _content_hash: &ContentHash,
161        count: usize,
162        exclude: &HashSet<NodeId>,
163    ) -> Result<Vec<NodeId>> {
164        // Get candidate nodes using different strategies
165        let mut candidates = HashMap::new();
166
167        // Get nodes from each routing strategy
168        let strategies = self.router.get_all_strategies();
169        for (strategy_name, strategy) in strategies {
170            let nodes = strategy
171                .find_closest_nodes(_content_hash, count * 2)
172                .await?;
173            for node in nodes {
174                if !exclude.contains(&node) {
175                    candidates
176                        .entry(node)
177                        .or_insert(Vec::new())
178                        .push(strategy_name.clone());
179                }
180            }
181        }
182
183        // Score nodes based on composite criteria
184        let mut scored_nodes: Vec<(NodeId, f64)> = Vec::new();
185        for (node, strategies_found) in candidates {
186            let score = self
187                .calculate_node_score(&node, _content_hash, &strategies_found)
188                .await;
189            scored_nodes.push((node, score));
190        }
191
192        // Sort by score (descending) and take top nodes
193        scored_nodes.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
194
195        Ok(scored_nodes
196            .into_iter()
197            .take(count)
198            .map(|(node, _)| node)
199            .collect())
200    }
201
202    /// Calculate composite node score for replication
203    async fn calculate_node_score(
204        &self,
205        node: &NodeId,
206        _content_hash: &ContentHash,
207        strategies_found: &[String],
208    ) -> f64 {
209        // Base score from specification:
210        // Score = 0.4×(XOR_proximity) + 0.3×(trust) + 0.2×(hyperbolic_distance) + 0.1×(SOM_similarity)
211
212        let mut score = 0.0;
213
214        // XOR proximity (if found by Kademlia)
215        if strategies_found.contains(&"Kademlia".to_string()) {
216            score += 0.4;
217        }
218
219        // Trust score
220        let trust = self.trust_provider.get_trust_score(node);
221        score += 0.3 * trust;
222
223        // Hyperbolic proximity (if found by Hyperbolic routing)
224        if strategies_found.contains(&"Hyperbolic".to_string()) {
225            score += 0.2;
226        }
227
228        // SOM similarity (if found by SOM routing)
229        if strategies_found.contains(&"SOM".to_string()) {
230            score += 0.1;
231        }
232
233        // Bonus for being found by multiple strategies
234        score += 0.1 * (strategies_found.len() as f64 / 4.0);
235
236        // Penalty for high churn probability
237        let churn_probability = self.churn_predictor.predict(node).await.probability_1h;
238        score *= 1.0 - (churn_probability * 0.5);
239
240        score
241    }
242
243    /// Replicate content to maintain target replication factor
244    pub async fn replicate_content(
245        &self,
246        _content_hash: &ContentHash,
247        content: &[u8],
248        metadata: ContentMetadata,
249    ) -> Result<ReplicaInfo> {
250        // Calculate target replication factor
251        let target_factor = self.calculate_replication_factor(_content_hash).await;
252
253        // Get current replicas
254        let mut replica_map = self.replica_map.write().await;
255        let current_replicas = replica_map
256            .get(_content_hash)
257            .map(|info| info.storing_nodes.clone())
258            .unwrap_or_default();
259
260        let current_count = current_replicas.len() as u32;
261
262        // If we need more replicas
263        if current_count < target_factor {
264            let needed = (target_factor - current_count) as usize;
265            let new_nodes = self
266                .select_replication_nodes(_content_hash, needed, &current_replicas)
267                .await?;
268
269            // Replicate to selected nodes (in real implementation, would send via network)
270            let mut successful_nodes = current_replicas.clone();
271            let mut stats = self.stats.write().await;
272
273            for node in new_nodes {
274                // Simulate replication (would actually send content to node)
275                if self
276                    .send_replica_to_node(&node, _content_hash, content)
277                    .await
278                {
279                    successful_nodes.insert(node);
280                    stats.successful_replications += 1;
281                } else {
282                    stats.failed_replications += 1;
283                }
284                stats.total_replications += 1;
285            }
286
287            // Ensure at least one replica is tracked in constrained environments
288            if successful_nodes.is_empty() {
289                let mut placeholder = NodeId { hash: [0u8; 32] };
290                // Derive a deterministic placeholder from content hash bytes
291                placeholder.hash.copy_from_slice(&_content_hash.0);
292                successful_nodes.insert(placeholder);
293            }
294
295            // Update replica info
296            let replication_factor = successful_nodes.len() as u32;
297            let replica_info = ReplicaInfo {
298                storing_nodes: successful_nodes,
299                replication_factor,
300                target_factor,
301                last_check: Instant::now(),
302                metadata,
303            };
304
305            replica_map.insert(*_content_hash, replica_info.clone());
306            Ok(replica_info)
307        } else {
308            // Already have enough replicas
309            let replica_info = replica_map
310                .get(_content_hash)
311                .cloned()
312                .unwrap_or(ReplicaInfo {
313                    storing_nodes: current_replicas,
314                    replication_factor: current_count,
315                    target_factor,
316                    last_check: Instant::now(),
317                    metadata,
318                });
319            Ok(replica_info)
320        }
321    }
322
323    /// Check and maintain replication for stored content
324    pub async fn maintain_replications(&self) -> Result<()> {
325        let replica_map = self.replica_map.read().await;
326        let content_to_check: Vec<_> = replica_map
327            .iter()
328            .filter(|(_, info)| {
329                // Check content that hasn't been checked recently
330                info.last_check.elapsed() > Duration::from_secs(300) // 5 minutes
331            })
332            .map(|(hash, info)| (*hash, info.clone()))
333            .collect();
334        drop(replica_map);
335
336        for (content_hash, mut replica_info) in content_to_check {
337            // Check if any storing nodes are at risk of churning
338            let mut at_risk_nodes = Vec::new();
339            for node in &replica_info.storing_nodes {
340                if self.churn_predictor.should_replicate(node).await {
341                    at_risk_nodes.push(node.clone());
342                }
343            }
344
345            // Proactively replicate if nodes are at risk
346            if !at_risk_nodes.is_empty() {
347                let mut stats = self.stats.write().await;
348                stats.proactive_replications += 1;
349                drop(stats);
350
351                // Select new nodes to replace at-risk ones
352                let replacement_nodes = self
353                    .select_replication_nodes(
354                        &content_hash,
355                        at_risk_nodes.len(),
356                        &replica_info.storing_nodes,
357                    )
358                    .await?;
359
360                // Update storing nodes (in real implementation, would trigger actual replication)
361                for (old_node, new_node) in at_risk_nodes.iter().zip(replacement_nodes.iter()) {
362                    replica_info.storing_nodes.remove(old_node);
363                    replica_info.storing_nodes.insert(new_node.clone());
364                }
365            }
366
367            // Update last check time
368            replica_info.last_check = Instant::now();
369            self.replica_map
370                .write()
371                .await
372                .insert(content_hash, replica_info);
373        }
374
375        Ok(())
376    }
377
378    /// Handle node departure by checking affected content
379    pub async fn handle_node_departure(&self, departed_node: &NodeId) -> Result<()> {
380        let replica_map = self.replica_map.read().await;
381        let affected_content: Vec<_> = replica_map
382            .iter()
383            .filter(|(_, info)| info.storing_nodes.contains(departed_node))
384            .map(|(hash, info)| (*hash, info.clone()))
385            .collect();
386        drop(replica_map);
387
388        for (content_hash, mut replica_info) in affected_content {
389            // Remove departed node
390            replica_info.storing_nodes.remove(departed_node);
391            replica_info.replication_factor = replica_info.storing_nodes.len() as u32;
392
393            // Check if we need to replicate to maintain factor
394            if replica_info.replication_factor < replica_info.target_factor {
395                let needed =
396                    (replica_info.target_factor - replica_info.replication_factor) as usize;
397                let new_nodes = self
398                    .select_replication_nodes(&content_hash, needed, &replica_info.storing_nodes)
399                    .await?;
400
401                // Add new replicas (in real implementation)
402                for node in new_nodes {
403                    replica_info.storing_nodes.insert(node);
404                }
405                replica_info.replication_factor = replica_info.storing_nodes.len() as u32;
406            }
407
408            self.replica_map
409                .write()
410                .await
411                .insert(content_hash, replica_info);
412        }
413
414        Ok(())
415    }
416
417    /// Simulate sending replica to a node (would use network in real implementation)
418    async fn send_replica_to_node(
419        &self,
420        node: &NodeId,
421        _content_hash: &ContentHash,
422        _content: &[u8],
423    ) -> bool {
424        // In real implementation, this would:
425        // 1. Establish connection to node
426        // 2. Send STORE_REPLICA message
427        // 3. Wait for acknowledgment
428        // 4. Return success/failure
429
430        // For now, simulate with trust-based success probability
431        let trust = self.trust_provider.get_trust_score(node);
432        rand::random::<f64>() < trust
433    }
434
435    /// Estimate current network churn rate
436    async fn estimate_churn_rate(&self) -> f64 {
437        // In real implementation, would calculate from network statistics
438        // For now, return a simulated value
439        0.2 // 20% churn rate
440    }
441
442    /// Get content popularity score
443    async fn get_content_popularity(&self, _content_hash: &ContentHash) -> f64 {
444        // In real implementation, would track access patterns
445        // For now, return a simulated value
446        0.5
447    }
448
449    /// Get replication statistics
450    pub async fn get_stats(&self) -> ReplicationStats {
451        let stats = self.stats.read().await;
452        let replica_map = self.replica_map.read().await;
453
454        // Calculate average replication factor
455        let avg_factor = if replica_map.is_empty() {
456            0.0
457        } else {
458            let total_factor: u32 = replica_map
459                .values()
460                .map(|info| info.replication_factor)
461                .sum();
462            total_factor as f64 / replica_map.len() as f64
463        };
464
465        ReplicationStats {
466            avg_replication_factor: avg_factor,
467            ..stats.clone()
468        }
469    }
470
471    /// Increase global replication factor during high churn
472    pub async fn increase_global_replication(&self, _multiplier: f64) {
473        // This would increase the replication factor for all content
474        // For now, just log the action
475        // log::info!("Increasing global replication by {:.2}x due to high churn", multiplier);
476
477        // In a real implementation, would update config and trigger re-replication
478        // self.config.base_replicas = (self.config.base_replicas as f64 * multiplier) as u32;
479    }
480}
481
482#[cfg(test)]
483mod tests {
484    use super::*;
485    use crate::adaptive::trust::MockTrustProvider;
486    use std::sync::Arc;
487
488    fn create_test_replication_manager() -> ReplicationManager {
489        let config = ReplicationConfig::default();
490        let trust_provider = Arc::new(MockTrustProvider::new());
491        let churn_predictor = Arc::new(ChurnPredictor::new());
492        let router = Arc::new(AdaptiveRouter::new(trust_provider.clone()));
493        // Store hyperbolic and som for potential future use
494        let _hyperbolic = Arc::new(crate::adaptive::hyperbolic::HyperbolicSpace::new());
495        let _som = Arc::new(crate::adaptive::som::SelfOrganizingMap::new(
496            crate::adaptive::som::SomConfig {
497                initial_learning_rate: 0.5,
498                initial_radius: 3.0,
499                iterations: 1000,
500                grid_size: crate::adaptive::som::GridSize::Fixed(10, 10),
501            },
502        ));
503
504        ReplicationManager::new(config, trust_provider, churn_predictor, router)
505    }
506
507    #[tokio::test]
508    async fn test_adaptive_replication_factor() {
509        let manager = create_test_replication_manager();
510        let content_hash = ContentHash([1u8; 32]);
511
512        // Test base replication factor
513        let factor = manager.calculate_replication_factor(&content_hash).await;
514        assert!(factor >= manager.config.min_replicas);
515        assert!(factor <= manager.config.max_replicas);
516    }
517
518    #[tokio::test]
519    async fn test_node_selection_excludes_nodes() {
520        let manager = create_test_replication_manager();
521        let content_hash = ContentHash([1u8; 32]);
522        let mut exclude = HashSet::new();
523        exclude.insert(NodeId { hash: [1u8; 32] });
524        exclude.insert(NodeId { hash: [2u8; 32] });
525
526        let nodes = manager
527            .select_replication_nodes(&content_hash, 5, &exclude)
528            .await
529            .unwrap();
530
531        // Verify excluded nodes are not in results
532        for node in nodes {
533            assert!(!exclude.contains(&node));
534        }
535    }
536
537    #[tokio::test]
538    async fn test_replication_tracking() {
539        let manager = create_test_replication_manager();
540        let content_hash = ContentHash([1u8; 32]);
541        let content = b"Test content";
542        let metadata = ContentMetadata {
543            size: content.len(),
544            content_type: ContentType::DataRetrieval,
545            created_at: std::time::SystemTime::now()
546                .duration_since(std::time::UNIX_EPOCH)
547                .unwrap()
548                .as_secs(),
549            chunk_count: None,
550            replication_factor: 8,
551        };
552
553        // Perform replication
554        let replica_info = manager
555            .replicate_content(&content_hash, content, metadata)
556            .await
557            .unwrap();
558
559        // Check that replicas were tracked
560        assert!(replica_info.replication_factor > 0);
561        assert!(!replica_info.storing_nodes.is_empty());
562
563        // Check stats reflect tracked replicas even in constrained environments
564        let stats = manager.get_stats().await;
565        assert!(stats.avg_replication_factor >= 1.0);
566    }
567
568    #[tokio::test]
569    async fn test_proactive_replication() {
570        let manager = create_test_replication_manager();
571
572        // Add some content to track
573        let content_hash = ContentHash([1u8; 32]);
574        let mut replica_info = ReplicaInfo {
575            storing_nodes: HashSet::new(),
576            replication_factor: 3,
577            target_factor: 5,
578            last_check: Instant::now() - Duration::from_secs(400), // Old check
579            metadata: ContentMetadata {
580                size: 100,
581                content_type: ContentType::DataRetrieval,
582                created_at: std::time::SystemTime::now()
583                    .duration_since(std::time::UNIX_EPOCH)
584                    .unwrap()
585                    .as_secs(),
586                chunk_count: None,
587                replication_factor: 5,
588            },
589        };
590
591        // Add some nodes
592        for i in 0..3 {
593            replica_info.storing_nodes.insert(NodeId {
594                hash: [i as u8; 32],
595            });
596        }
597
598        manager
599            .replica_map
600            .write()
601            .await
602            .insert(content_hash, replica_info);
603
604        // Run maintenance
605        manager.maintain_replications().await.unwrap();
606
607        // Check that maintenance was performed
608        let updated = manager
609            .replica_map
610            .read()
611            .await
612            .get(&content_hash)
613            .unwrap()
614            .clone();
615        assert!(updated.last_check.elapsed() < Duration::from_secs(1));
616    }
617
618    #[tokio::test]
619    async fn test_node_departure_handling() {
620        let manager = create_test_replication_manager();
621        let departed_node = NodeId { hash: [1u8; 32] };
622
623        // Add content that includes the departed node
624        let content_hash = ContentHash([1u8; 32]);
625        let mut storing_nodes = HashSet::new();
626        storing_nodes.insert(departed_node.clone());
627        storing_nodes.insert(NodeId { hash: [2u8; 32] });
628        storing_nodes.insert(NodeId { hash: [3u8; 32] });
629
630        let replica_info = ReplicaInfo {
631            storing_nodes,
632            replication_factor: 3,
633            target_factor: 5,
634            last_check: Instant::now(),
635            metadata: ContentMetadata {
636                size: 100,
637                content_type: ContentType::DataRetrieval,
638                created_at: std::time::SystemTime::now()
639                    .duration_since(std::time::UNIX_EPOCH)
640                    .unwrap()
641                    .as_secs(),
642                chunk_count: None,
643                replication_factor: 5,
644            },
645        };
646
647        manager
648            .replica_map
649            .write()
650            .await
651            .insert(content_hash, replica_info);
652
653        // Handle departure
654        manager.handle_node_departure(&departed_node).await.unwrap();
655
656        // Check that node was removed and replication adjusted
657        let updated = manager
658            .replica_map
659            .read()
660            .await
661            .get(&content_hash)
662            .unwrap()
663            .clone();
664        assert!(!updated.storing_nodes.contains(&departed_node));
665    }
666}