Skip to main content

saorsa_core/placement/
mod.rs

1// Copyright (c) 2025 Saorsa Labs Limited
2//
3// This file is part of the Saorsa P2P network.
4//
5// Licensed under the AGPL-3.0 license:
6// <https://www.gnu.org/licenses/agpl-3.0.html>
7
8//! Placement Loop & Storage Orchestration System
9//!
10//! This module implements the core placement system for optimal distribution
11//! of erasure-coded shards across the network, integrating EigenTrust reputation,
12//! churn prediction, capacity constraints, and diversity rules.
13//!
14//! ## Core Concepts
15//!
16//! ### Weighted Selection Algorithm
17//!
18//! The placement system uses Efraimidis-Spirakis weighted sampling with the formula:
19//!
20//! ```text
21//! w_i = (τ_i^α) * (p_i^β) * (c_i^γ) * d_i
22//! ```
23//!
24//! Where:
25//! - `τ_i`: EigenTrust reputation score (0.0-1.0)
26//! - `p_i`: Node performance score (0.0-1.0)
27//! - `c_i`: Available capacity score (0.0-1.0)
28//! - `d_i`: Geographic/network diversity bonus (1.0-2.0)
29//! - `α, β, γ`: Configurable weight exponents
30//!
31//! ### Byzantine Fault Tolerance
32//!
33//! Implements configurable f-out-of-3f+1 Byzantine fault tolerance:
34//! - Tolerates up to f Byzantine (malicious) nodes
35//! - Requires minimum 3f+1 nodes for safety
36//! - Automatically adjusts replication based on network size
37//!
38//! ### Geographic Diversity
39//!
40//! Ensures optimal shard distribution across:
41//! - Geographic regions (7 major regions)
42//! - Autonomous System Numbers (ASNs)
43//! - Network operators and data centers
44//!
45//! ## Usage Examples
46//!
47//! ### Basic Placement
48//!
49//! ```rust,ignore
50//! use saorsa_core::placement::{PlacementConfig, PlacementOrchestrator};
51//! use std::time::Duration;
52//!
53//! async fn example() -> Result<(), Box<dyn std::error::Error>> {
54//!     // Create placement configuration
55//!     let config = PlacementConfig::default();
56//!
57//!     // Create orchestrator with required components
58//!     let orchestrator = PlacementOrchestrator::new(config).await?;
59//!
60//!     // Start the placement system
61//!     orchestrator.start().await?;
62//!     Ok(())
63//! }
64//! ```
65//!
66//! ### Advanced Configuration
67//!
68//! ```rust,ignore
69//! use saorsa_core::placement::{PlacementConfig, OptimizationWeights};
70//!
71//! // PlacementConfig uses Default for standard setups
72//! // See PlacementConfig documentation for available fields
73//! let config = PlacementConfig::default();
74//! ```
75//!
76//! ### Storage Orchestration
77//!
78//! ```rust,ignore
79//! use saorsa_core::placement::PlacementOrchestrator;
80//!
81//! async fn example() -> Result<(), Box<dyn std::error::Error>> {
82//!     // Create orchestrator with default configuration
83//!     let orchestrator = PlacementOrchestrator::new(Default::default()).await?;
84//!
85//!     // Start audit and repair systems
86//!     orchestrator.start().await?;
87//!     Ok(())
88//! }
89//! ```
90//!
91//! ## Architecture
92//!
93//! The placement system consists of several key components:
94//!
95//! - **PlacementEngine**: Main orchestrator for placement decisions
96//! - **WeightedPlacementStrategy**: Implements the weighted selection algorithm
97//! - **StorageOrchestrator**: Manages shard storage and retrieval
98//! - **AuditSystem**: Continuous monitoring of shard health
99//! - **RepairSystem**: Automatic repair with hysteresis control
100//! - **DiversityEnforcer**: Geographic and network diversity constraints
101//!
102//! ## Performance Characteristics
103//!
104//! - **Selection Speed**: <1 second for 8-node selection from 1000+ candidates
105//! - **Memory Usage**: O(n) where n is candidate node count
106//! - **Audit Frequency**: Every 5 minutes with concurrent limits
107//! - **Repair Latency**: <1 hour detection, immediate repair initiation
108//!
109//! ## Security Features
110//!
111//! - EigenTrust integration for reputation-based selection
112//! - Byzantine fault tolerance with configurable parameters
113//! - Proof-of-work for DHT records (~18 bits difficulty)
114//! - Cryptographic verification of all operations
115//! - Secure random selection with cryptographic entropy
116
117pub mod algorithms;
118pub mod dht_records;
119pub mod errors;
120pub mod orchestrator;
121pub mod traits;
122pub mod types;
123
124// Re-export core types for convenience
125pub use algorithms::{DiversityEnforcer, WeightedPlacementStrategy, WeightedSampler};
126pub use dht_records::{
127    DataPointer, DhtRecord, GroupBeacon, NatType, NodeAd, NodeCapabilities, OsSignature,
128    RegisterPointer,
129};
130pub use errors::{PlacementError, PlacementResult};
131pub use orchestrator::{AuditSystem, PlacementOrchestrator, RepairSystem, StorageOrchestrator};
132pub use traits::{
133    NetworkTopology, NodePerformanceMetrics, PerformanceEstimator, PlacementConstraint,
134    PlacementStrategy, PlacementValidator,
135};
136pub use types::{
137    ByzantineTolerance, GeographicLocation, NetworkRegion, OptimizationWeights, PlacementConfig,
138    PlacementDecision, PlacementMetrics, ReplicationFactor,
139};
140
141use std::collections::HashSet;
142use std::time::Instant;
143
144use crate::adaptive::{NodeId, performance::PerformanceMonitor, trust::EigenTrustEngine};
145
146/// Main placement engine that orchestrates the entire placement process
147#[derive(Debug)]
148pub struct PlacementEngine {
149    config: PlacementConfig,
150    strategy: Box<dyn PlacementStrategy + Send + Sync>,
151}
152
153impl PlacementEngine {
154    /// Create new placement engine with default weighted strategy
155    pub fn new(config: PlacementConfig) -> Self {
156        let strategy = Box::new(algorithms::WeightedPlacementStrategy::new(config.clone()));
157
158        Self { config, strategy }
159    }
160
161    /// Create placement engine with custom strategy
162    pub fn with_strategy(
163        config: PlacementConfig,
164        strategy: Box<dyn PlacementStrategy + Send + Sync>,
165    ) -> Self {
166        Self { config, strategy }
167    }
168
169    /// Select optimal nodes for shard placement
170    pub async fn select_nodes(
171        &mut self,
172        available_nodes: &HashSet<NodeId>,
173        replication_factor: u8,
174        trust_system: &EigenTrustEngine,
175        performance_monitor: &PerformanceMonitor,
176        node_metadata: &std::collections::HashMap<NodeId, (GeographicLocation, u32, NetworkRegion)>,
177    ) -> PlacementResult<PlacementDecision> {
178        let start_time = Instant::now();
179
180        // Validate inputs
181        if available_nodes.is_empty() {
182            return Err(PlacementError::InsufficientNodes {
183                required: replication_factor as usize,
184                available: 0,
185            });
186        }
187
188        if replication_factor < self.config.replication_factor.min_value() {
189            return Err(PlacementError::InvalidReplicationFactor(replication_factor));
190        }
191
192        // Execute placement with timeout using idiomatic tokio::time::timeout
193        let mut decision = tokio::time::timeout(
194            self.config.placement_timeout,
195            self.strategy.select_nodes(
196                available_nodes,
197                replication_factor,
198                trust_system,
199                performance_monitor,
200                node_metadata,
201            ),
202        )
203        .await
204        .map_err(|_| PlacementError::PlacementTimeout)??;
205
206        // Update timing information
207        decision.selection_time = start_time.elapsed();
208
209        // Validate against configuration constraints
210        self.validate_decision(&decision)?;
211
212        Ok(decision)
213    }
214
215    /// Validate placement decision against configuration constraints
216    fn validate_decision(&self, decision: &PlacementDecision) -> PlacementResult<()> {
217        // Check minimum nodes
218        if decision.selected_nodes.len() < self.config.replication_factor.min_value() as usize {
219            return Err(PlacementError::InsufficientNodes {
220                required: self.config.replication_factor.min_value() as usize,
221                available: decision.selected_nodes.len(),
222            });
223        }
224
225        // Check Byzantine fault tolerance
226        let required_for_byzantine = self.config.byzantine_tolerance.required_nodes();
227        if decision.selected_nodes.len() < required_for_byzantine {
228            return Err(PlacementError::ByzantineToleranceViolation {
229                required: required_for_byzantine,
230                available: decision.selected_nodes.len(),
231            });
232        }
233
234        // Check reliability threshold
235        if decision.estimated_reliability < 0.8 {
236            return Err(PlacementError::ReliabilityTooLow {
237                estimated: decision.estimated_reliability,
238                minimum: 0.8,
239            });
240        }
241
242        Ok(())
243    }
244
245    /// Get current configuration
246    pub fn config(&self) -> &PlacementConfig {
247        &self.config
248    }
249
250    /// Update configuration
251    pub fn update_config(&mut self, config: PlacementConfig) {
252        self.config = config;
253    }
254
255    /// Get strategy name
256    pub fn strategy_name(&self) -> &str {
257        self.strategy.name()
258    }
259}