saorsa_core/placement/
mod.rs

1// Copyright (c) 2025 Saorsa Labs Limited
2//
3// This file is part of the Saorsa P2P network.
4//
5// Licensed under the AGPL-3.0 license:
6// <https://www.gnu.org/licenses/agpl-3.0.html>
7
8//! Placement Loop & Storage Orchestration System
9//!
10//! This module implements the core placement system for optimal distribution
11//! of erasure-coded shards across the network, integrating EigenTrust reputation,
12//! churn prediction, capacity constraints, and diversity rules.
13//!
14//! ## Core Concepts
15//!
16//! ### Weighted Selection Algorithm
17//!
18//! The placement system uses Efraimidis-Spirakis weighted sampling with the formula:
19//!
20//! ```text
21//! w_i = (τ_i^α) * (p_i^β) * (c_i^γ) * d_i
22//! ```
23//!
24//! Where:
25//! - `τ_i`: EigenTrust reputation score (0.0-1.0)
26//! - `p_i`: Node performance score (0.0-1.0)
27//! - `c_i`: Available capacity score (0.0-1.0)
28//! - `d_i`: Geographic/network diversity bonus (1.0-2.0)
29//! - `α, β, γ`: Configurable weight exponents
30//!
31//! ### Byzantine Fault Tolerance
32//!
33//! Implements configurable f-out-of-3f+1 Byzantine fault tolerance:
34//! - Tolerates up to f Byzantine (malicious) nodes
35//! - Requires minimum 3f+1 nodes for safety
36//! - Automatically adjusts replication based on network size
37//!
38//! ### Geographic Diversity
39//!
40//! Ensures optimal shard distribution across:
41//! - Geographic regions (7 major regions)
42//! - Autonomous System Numbers (ASNs)
43//! - Network operators and data centers
44//!
45//! ## Usage Examples
46//!
47//! ### Basic Placement
48//!
49//! ```rust,no_run
50//! use saorsa_core::placement::{PlacementEngine, PlacementConfig};
51//! use std::time::Duration;
52//!
53//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
54//! let config = PlacementConfig {
55//!     replication_factor: (3, 8).into(),
56//!     byzantine_tolerance: 2.into(),
57//!     placement_timeout: Duration::from_secs(30),
58//!     geographic_diversity: true,
59//!     ..Default::default()
60//! };
61//!
62//! let mut engine = PlacementEngine::new(config);
63//!
64//! // Select optimal nodes for shard placement
65//! let decision = engine.select_nodes(
66//!     &available_nodes,
67//!     8, // replication factor
68//!     &trust_system,
69//!     &performance_monitor,
70//!     &node_metadata,
71//! ).await?;
72//!
73//! println!("Selected {} nodes with {:.2}% reliability",
74//!          decision.selected_nodes.len(),
75//!          decision.estimated_reliability * 100.0);
76//! # Ok(())
77//! # }
78//! ```
79//!
80//! ### Advanced Configuration
81//!
82//! ```rust,no_run
83//! use saorsa_core::placement::{
84//!     PlacementConfig, OptimizationWeights, PlacementConstraint
85//! };
86//! use std::time::Duration;
87//!
88//! let config = PlacementConfig {
89//!     weights: OptimizationWeights {
90//!         trust_weight: 0.5,      // High trust emphasis
91//!         performance_weight: 0.25,
92//!         capacity_weight: 0.15,
93//!         diversity_bonus: 0.1,
94//!     },
95//!     constraints: vec![
96//!         PlacementConstraint::MinimumTrustScore(0.7),
97//!         PlacementConstraint::MaximumLatency(Duration::from_millis(500)),
98//!         PlacementConstraint::RequireGeographicDiversity,
99//!     ],
100//!     ..Default::default()
101//! };
102//! ```
103//!
104//! ### Storage Orchestration
105//!
106//! ```rust,no_run
107//! use saorsa_core::placement::PlacementOrchestrator;
108//!
109//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
110//! let orchestrator = PlacementOrchestrator::new(
111//!     config,
112//!     dht_engine,
113//!     trust_system,
114//!     performance_monitor,
115//!     churn_predictor,
116//! ).await?;
117//!
118//! // Start audit and repair systems
119//! orchestrator.start().await?;
120//!
121//! // Place data with optimal distribution
122//! let decision = orchestrator.place_data(
123//!     data,
124//!     8, // replication factor
125//!     Some(NetworkRegion::Europe),
126//! ).await?;
127//! # Ok(())
128//! # }
129//! ```
130//!
131//! ## Architecture
132//!
133//! The placement system consists of several key components:
134//!
135//! - **PlacementEngine**: Main orchestrator for placement decisions
136//! - **WeightedPlacementStrategy**: Implements the weighted selection algorithm
137//! - **StorageOrchestrator**: Manages shard storage and retrieval
138//! - **AuditSystem**: Continuous monitoring of shard health
139//! - **RepairSystem**: Automatic repair with hysteresis control
140//! - **DiversityEnforcer**: Geographic and network diversity constraints
141//!
142//! ## Performance Characteristics
143//!
144//! - **Selection Speed**: <1 second for 8-node selection from 1000+ candidates
145//! - **Memory Usage**: O(n) where n is candidate node count
146//! - **Audit Frequency**: Every 5 minutes with concurrent limits
147//! - **Repair Latency**: <1 hour detection, immediate repair initiation
148//!
149//! ## Security Features
150//!
151//! - EigenTrust integration for reputation-based selection
152//! - Byzantine fault tolerance with configurable parameters
153//! - Proof-of-work for DHT records (~18 bits difficulty)
154//! - Cryptographic verification of all operations
155//! - Secure random selection with cryptographic entropy
156
157pub mod traits;
158pub mod types;
159pub mod errors;
160pub mod dht_records;
161pub mod algorithms;
162pub mod orchestrator;
163
164// Re-export core types for convenience
165pub use errors::{PlacementError, PlacementResult};
166pub use traits::{
167    PlacementStrategy, NetworkTopology, PerformanceEstimator,
168    PlacementConstraint, PlacementValidator, NodePerformanceMetrics,
169};
170pub use types::{
171    PlacementConfig, PlacementDecision, PlacementMetrics,
172    GeographicLocation, NetworkRegion,
173    ReplicationFactor, ByzantineTolerance, OptimizationWeights,
174};
175pub use dht_records::{
176    DhtRecord, NodeAd, GroupBeacon, DataPointer, RegisterPointer,
177    NodeCapabilities, NatType, OsSignature,
178};
179pub use algorithms::{
180    WeightedSampler, DiversityEnforcer, WeightedPlacementStrategy,
181};
182pub use orchestrator::{
183    PlacementOrchestrator, StorageOrchestrator, AuditSystem, RepairSystem,
184};
185
186use std::collections::HashSet;
187use std::time::Instant;
188
189use crate::adaptive::{NodeId, trust::EigenTrustEngine, performance::PerformanceMonitor};
190
191/// Main placement engine that orchestrates the entire placement process
192#[derive(Debug)]
193pub struct PlacementEngine {
194    config: PlacementConfig,
195    strategy: Box<dyn PlacementStrategy + Send + Sync>,
196}
197
198impl PlacementEngine {
199    /// Create new placement engine with default weighted strategy
200    pub fn new(config: PlacementConfig) -> Self {
201        let strategy = Box::new(algorithms::WeightedPlacementStrategy::new(config.clone()));
202        
203        Self {
204            config,
205            strategy,
206        }
207    }
208
209    /// Create placement engine with custom strategy
210    pub fn with_strategy(
211        config: PlacementConfig,
212        strategy: Box<dyn PlacementStrategy + Send + Sync>,
213    ) -> Self {
214        Self {
215            config,
216            strategy,
217        }
218    }
219
220    /// Select optimal nodes for shard placement
221    pub async fn select_nodes(
222        &mut self,
223        available_nodes: &HashSet<NodeId>,
224        replication_factor: u8,
225        trust_system: &EigenTrustEngine,
226        performance_monitor: &PerformanceMonitor,
227        node_metadata: &std::collections::HashMap<NodeId, (GeographicLocation, u32, NetworkRegion)>,
228    ) -> PlacementResult<PlacementDecision> {
229        let start_time = Instant::now();
230
231        // Validate inputs
232        if available_nodes.is_empty() {
233            return Err(PlacementError::InsufficientNodes {
234                required: replication_factor as usize,
235                available: 0,
236            });
237        }
238
239        if replication_factor < self.config.replication_factor.min_value() {
240            return Err(PlacementError::InvalidReplicationFactor(replication_factor));
241        }
242
243        // Apply placement timeout
244        let timeout_future = async {
245            tokio::time::sleep(self.config.placement_timeout).await;
246            Err(PlacementError::PlacementTimeout)
247        };
248
249        let placement_future = self.strategy.select_nodes(
250            available_nodes,
251            replication_factor,
252            trust_system,
253            performance_monitor,
254            node_metadata,
255        );
256
257        // Race placement against timeout
258        let mut decision = match tokio::select! {
259            result = placement_future => result?,
260            timeout_result = timeout_future => timeout_result?,
261        } {
262            result => result,
263        };
264
265        // Update timing information
266        decision.selection_time = start_time.elapsed();
267
268        // Validate against configuration constraints
269        self.validate_decision(&decision)?;
270
271        Ok(decision)
272    }
273
274    /// Validate placement decision against configuration constraints
275    fn validate_decision(&self, decision: &PlacementDecision) -> PlacementResult<()> {
276        // Check minimum nodes
277        if decision.selected_nodes.len() < self.config.replication_factor.min_value() as usize {
278            return Err(PlacementError::InsufficientNodes {
279                required: self.config.replication_factor.min_value() as usize,
280                available: decision.selected_nodes.len(),
281            });
282        }
283
284        // Check Byzantine fault tolerance
285        let required_for_byzantine = self.config.byzantine_tolerance.required_nodes();
286        if decision.selected_nodes.len() < required_for_byzantine {
287            return Err(PlacementError::ByzantineToleranceViolation {
288                required: required_for_byzantine,
289                available: decision.selected_nodes.len(),
290            });
291        }
292
293        // Check reliability threshold
294        if decision.estimated_reliability < 0.8 {
295            return Err(PlacementError::ReliabilityTooLow {
296                estimated: decision.estimated_reliability,
297                minimum: 0.8,
298            });
299        }
300
301        Ok(())
302    }
303
304    /// Get current configuration
305    pub fn config(&self) -> &PlacementConfig {
306        &self.config
307    }
308
309    /// Update configuration
310    pub fn update_config(&mut self, config: PlacementConfig) {
311        self.config = config;
312    }
313
314    /// Get strategy name
315    pub fn strategy_name(&self) -> &str {
316        self.strategy.name()
317    }
318}