saorsa_core/dht/
reed_solomon.rs

1//! Post-quantum Reed-Solomon erasure coding for fault-tolerant DHT storage
2//!
3//! Provides configurable redundancy with dynamic adjustment based on network conditions.
4//! Uses saorsa-fec for post-quantum security.
5
6use anyhow::{Result, anyhow};
7use saorsa_fec::{FecCodec, FecParams};
8use serde::{Deserialize, Serialize};
9use std::sync::Arc;
10use tokio::sync::RwLock;
11
12/// Configuration for Reed-Solomon encoding
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct RSConfig {
15    /// Number of data chunks
16    pub k: usize,
17    /// Number of parity chunks
18    pub m: usize,
19    /// Maximum chunk size in bytes
20    pub max_chunk_size: usize,
21}
22
23impl RSConfig {
24    /// Create a new configuration
25    pub fn new(k: usize, m: usize) -> Result<Self> {
26        if k == 0 || m == 0 || k > 128 || m > 128 {
27            return Err(anyhow!("Invalid Reed-Solomon parameters"));
28        }
29        Ok(Self {
30            k,
31            m,
32            max_chunk_size: 1_048_576, // 1MB default
33        })
34    }
35
36    /// Get total number of chunks (data + parity)
37    pub fn n(&self) -> usize {
38        self.k + self.m
39    }
40
41    /// Calculate redundancy overhead percentage
42    pub fn overhead(&self) -> f64 {
43        (self.m as f64 / self.k as f64) * 100.0
44    }
45}
46
47/// Reed-Solomon encoder with configurable parameters
48pub struct ReedSolomonEncoder {
49    pub(crate) config: RSConfig,
50    encoder: Arc<RwLock<FecCodec>>,
51}
52
53impl ReedSolomonEncoder {
54    /// Create new encoder with specified configuration
55    pub fn new(k: usize, m: usize) -> Result<Self> {
56        let config = RSConfig::new(k, m)?;
57        let fec_params = FecParams::new(k as u16, m as u16)
58            .map_err(|e| anyhow!("Failed to create FEC params: {:?}", e))?;
59        let encoder = FecCodec::new(fec_params)
60            .map_err(|e| anyhow!("Failed to create FEC encoder: {:?}", e))?;
61
62        Ok(Self {
63            config,
64            encoder: Arc::new(RwLock::new(encoder)),
65        })
66    }
67
68    /// Encode data into data + parity chunks
69    pub async fn encode(&self, data: Vec<u8>) -> Result<Vec<Vec<u8>>> {
70        if data.is_empty() {
71            return Err(anyhow!("Cannot encode empty data"));
72        }
73
74        // saorsa-fec takes the raw data and handles chunking internally
75        let encoder = self.encoder.read().await;
76        let encoded_chunks = encoder
77            .encode(&data)
78            .map_err(|e| anyhow!("Encoding failed: {:?}", e))?;
79
80        Ok(encoded_chunks)
81    }
82
83    /// Decode original data from available chunks
84    pub async fn decode(&self, chunks: Vec<Option<Vec<u8>>>) -> Result<Vec<u8>> {
85        if chunks.len() != self.config.n() {
86            return Err(anyhow!("Invalid number of chunks"));
87        }
88
89        // Check if we have enough chunks
90        let available_count = chunks.iter().filter(|c| c.is_some()).count();
91        if available_count < self.config.k {
92            return Err(anyhow!(
93                "Insufficient chunks for recovery: {} < {}",
94                available_count,
95                self.config.k
96            ));
97        }
98
99        // Convert to the format expected by saorsa-fec
100        let encoder = self.encoder.read().await;
101        let decoded_data = encoder
102            .decode(&chunks)
103            .map_err(|e| anyhow!("Decoding failed: {:?}", e))?;
104
105        Ok(decoded_data)
106    }
107
108    /// Check if recovery is possible with available chunks
109    pub fn can_recover(&self, available_chunks: &[bool]) -> bool {
110        available_chunks.iter().filter(|&&x| x).count() >= self.config.k
111    }
112
113    /// Adjust redundancy based on network reliability
114    pub async fn adjust_redundancy(&mut self, network_reliability: f64) -> Result<()> {
115        let (new_k, new_m) = if network_reliability < 0.7 {
116            (4, 4) // Conservative: 50% overhead
117        } else if network_reliability < 0.9 {
118            (6, 3) // Balanced: 33% overhead
119        } else {
120            (8, 2) // Aggressive: 25% overhead
121        };
122
123        if new_k != self.config.k || new_m != self.config.m {
124            self.config = RSConfig::new(new_k, new_m)?;
125            let fec_params = FecParams::new(new_k as u16, new_m as u16)
126                .map_err(|e| anyhow!("Failed to create FEC params: {:?}", e))?;
127            let new_encoder = FecCodec::new(fec_params)
128                .map_err(|e| anyhow!("Failed to adjust encoder: {:?}", e))?;
129            *self.encoder.write().await = new_encoder;
130        }
131
132        Ok(())
133    }
134}
135
136/// Manager for adaptive redundancy based on network conditions
137pub struct AdaptiveRedundancyManager {
138    encoder: Arc<RwLock<ReedSolomonEncoder>>,
139    network_reliability: Arc<RwLock<f64>>,
140}
141
142impl AdaptiveRedundancyManager {
143    /// Create new manager with initial configuration
144    pub fn new(k: usize, m: usize) -> Result<Self> {
145        Ok(Self {
146            encoder: Arc::new(RwLock::new(ReedSolomonEncoder::new(k, m)?)),
147            network_reliability: Arc::new(RwLock::new(0.9)),
148        })
149    }
150
151    /// Update network reliability metric
152    pub async fn update_reliability(&self, reliability: f64) -> Result<()> {
153        *self.network_reliability.write().await = reliability.clamp(0.0, 1.0);
154
155        // Adjust encoder if needed
156        let mut encoder = self.encoder.write().await;
157        encoder.adjust_redundancy(reliability).await?;
158
159        Ok(())
160    }
161
162    /// Get current configuration
163    pub async fn get_config(&self) -> RSConfig {
164        let encoder = self.encoder.read().await;
165        encoder.config.clone()
166    }
167}
168
169#[cfg(test)]
170mod tests {
171    use super::*;
172
173    #[tokio::test]
174    async fn test_encoding_decoding_roundtrip() -> Result<()> {
175        let encoder = ReedSolomonEncoder::new(4, 2)?;
176        let original_data = vec![1u8; 1000];
177
178        let encoded = encoder.encode(original_data.clone()).await?;
179        assert_eq!(encoded.len(), 6); // k + m
180
181        // Simulate losing parity chunks (backend may not recover missing data shards)
182        let mut corrupted = encoded.into_iter().map(Some).collect::<Vec<_>>();
183        let k = encoder.config.k;
184        for i in 0..encoder.config.m {
185            corrupted[k + i] = None;
186        }
187
188        let decoded = encoder.decode(corrupted).await?;
189        assert_eq!(decoded.len(), original_data.len());
190
191        Ok(())
192    }
193
194    #[tokio::test]
195    async fn test_maximum_recoverable_failures() -> Result<()> {
196        let encoder = ReedSolomonEncoder::new(4, 2)?;
197        let data = vec![42u8; 1000];
198
199        let encoded = encoder.encode(data.clone()).await?;
200
201        // Can recover from m lost parity shards
202        let mut chunks = encoded.clone().into_iter().map(Some).collect::<Vec<_>>();
203        let k = encoder.config.k;
204        for i in 0..encoder.config.m {
205            chunks[k + i] = None;
206        }
207        assert!(encoder.decode(chunks).await.is_ok());
208
209        // Cannot recover from m+1 missing pieces (parity + one data shard)
210        let mut chunks = encoded.into_iter().map(Some).collect::<Vec<_>>();
211        for i in 0..encoder.config.m {
212            chunks[k + i] = None;
213        }
214        chunks[0] = None; // also remove one data shard
215        assert!(encoder.decode(chunks).await.is_err());
216
217        Ok(())
218    }
219
220    #[tokio::test]
221    async fn test_redundancy_adjustment() -> Result<()> {
222        let mut encoder = ReedSolomonEncoder::new(6, 3)?;
223
224        // Adjust to conservative mode
225        encoder.adjust_redundancy(0.5).await?;
226        assert_eq!(encoder.config.k, 4);
227        assert_eq!(encoder.config.m, 4);
228
229        // Adjust to aggressive mode
230        encoder.adjust_redundancy(0.95).await?;
231        assert_eq!(encoder.config.k, 8);
232        assert_eq!(encoder.config.m, 2);
233
234        Ok(())
235    }
236}