saorsa_core/dht/
reed_solomon.rs1use anyhow::{Result, anyhow};
7use saorsa_fec::{FecCodec, FecParams};
8use serde::{Deserialize, Serialize};
9use std::sync::Arc;
10use tokio::sync::RwLock;
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct RSConfig {
15 pub k: usize,
17 pub m: usize,
19 pub max_chunk_size: usize,
21}
22
23impl RSConfig {
24 pub fn new(k: usize, m: usize) -> Result<Self> {
26 if k == 0 || m == 0 || k > 128 || m > 128 {
27 return Err(anyhow!("Invalid Reed-Solomon parameters"));
28 }
29 Ok(Self {
30 k,
31 m,
32 max_chunk_size: 1_048_576, })
34 }
35
36 pub fn n(&self) -> usize {
38 self.k + self.m
39 }
40
41 pub fn overhead(&self) -> f64 {
43 (self.m as f64 / self.k as f64) * 100.0
44 }
45}
46
47pub struct ReedSolomonEncoder {
49 pub(crate) config: RSConfig,
50 encoder: Arc<RwLock<FecCodec>>,
51}
52
53impl ReedSolomonEncoder {
54 pub fn new(k: usize, m: usize) -> Result<Self> {
56 let config = RSConfig::new(k, m)?;
57 let fec_params = FecParams::new(k as u16, m as u16)
58 .map_err(|e| anyhow!("Failed to create FEC params: {:?}", e))?;
59 let encoder = FecCodec::new(fec_params)
60 .map_err(|e| anyhow!("Failed to create FEC encoder: {:?}", e))?;
61
62 Ok(Self {
63 config,
64 encoder: Arc::new(RwLock::new(encoder)),
65 })
66 }
67
68 pub async fn encode(&self, data: Vec<u8>) -> Result<Vec<Vec<u8>>> {
70 if data.is_empty() {
71 return Err(anyhow!("Cannot encode empty data"));
72 }
73
74 let encoder = self.encoder.read().await;
76 let encoded_chunks = encoder
77 .encode(&data)
78 .map_err(|e| anyhow!("Encoding failed: {:?}", e))?;
79
80 Ok(encoded_chunks)
81 }
82
83 pub async fn decode(&self, chunks: Vec<Option<Vec<u8>>>) -> Result<Vec<u8>> {
85 if chunks.len() != self.config.n() {
86 return Err(anyhow!("Invalid number of chunks"));
87 }
88
89 let available_count = chunks.iter().filter(|c| c.is_some()).count();
91 if available_count < self.config.k {
92 return Err(anyhow!(
93 "Insufficient chunks for recovery: {} < {}",
94 available_count,
95 self.config.k
96 ));
97 }
98
99 let encoder = self.encoder.read().await;
101 let decoded_data = encoder
102 .decode(&chunks)
103 .map_err(|e| anyhow!("Decoding failed: {:?}", e))?;
104
105 Ok(decoded_data)
106 }
107
108 pub fn can_recover(&self, available_chunks: &[bool]) -> bool {
110 available_chunks.iter().filter(|&&x| x).count() >= self.config.k
111 }
112
113 pub async fn adjust_redundancy(&mut self, network_reliability: f64) -> Result<()> {
115 let (new_k, new_m) = if network_reliability < 0.7 {
116 (4, 4) } else if network_reliability < 0.9 {
118 (6, 3) } else {
120 (8, 2) };
122
123 if new_k != self.config.k || new_m != self.config.m {
124 self.config = RSConfig::new(new_k, new_m)?;
125 let fec_params = FecParams::new(new_k as u16, new_m as u16)
126 .map_err(|e| anyhow!("Failed to create FEC params: {:?}", e))?;
127 let new_encoder = FecCodec::new(fec_params)
128 .map_err(|e| anyhow!("Failed to adjust encoder: {:?}", e))?;
129 *self.encoder.write().await = new_encoder;
130 }
131
132 Ok(())
133 }
134}
135
136pub struct AdaptiveRedundancyManager {
138 encoder: Arc<RwLock<ReedSolomonEncoder>>,
139 network_reliability: Arc<RwLock<f64>>,
140}
141
142impl AdaptiveRedundancyManager {
143 pub fn new(k: usize, m: usize) -> Result<Self> {
145 Ok(Self {
146 encoder: Arc::new(RwLock::new(ReedSolomonEncoder::new(k, m)?)),
147 network_reliability: Arc::new(RwLock::new(0.9)),
148 })
149 }
150
151 pub async fn update_reliability(&self, reliability: f64) -> Result<()> {
153 *self.network_reliability.write().await = reliability.clamp(0.0, 1.0);
154
155 let mut encoder = self.encoder.write().await;
157 encoder.adjust_redundancy(reliability).await?;
158
159 Ok(())
160 }
161
162 pub async fn get_config(&self) -> RSConfig {
164 let encoder = self.encoder.read().await;
165 encoder.config.clone()
166 }
167}
168
169#[cfg(test)]
170mod tests {
171 use super::*;
172
173 #[tokio::test]
174 async fn test_encoding_decoding_roundtrip() -> Result<()> {
175 let encoder = ReedSolomonEncoder::new(4, 2)?;
176 let original_data = vec![1u8; 1000];
177
178 let encoded = encoder.encode(original_data.clone()).await?;
179 assert_eq!(encoded.len(), 6); let mut corrupted = encoded.into_iter().map(Some).collect::<Vec<_>>();
183 let k = encoder.config.k;
184 for i in 0..encoder.config.m {
185 corrupted[k + i] = None;
186 }
187
188 let decoded = encoder.decode(corrupted).await?;
189 assert_eq!(decoded.len(), original_data.len());
190
191 Ok(())
192 }
193
194 #[tokio::test]
195 async fn test_maximum_recoverable_failures() -> Result<()> {
196 let encoder = ReedSolomonEncoder::new(4, 2)?;
197 let data = vec![42u8; 1000];
198
199 let encoded = encoder.encode(data.clone()).await?;
200
201 let mut chunks = encoded.clone().into_iter().map(Some).collect::<Vec<_>>();
203 let k = encoder.config.k;
204 for i in 0..encoder.config.m {
205 chunks[k + i] = None;
206 }
207 assert!(encoder.decode(chunks).await.is_ok());
208
209 let mut chunks = encoded.into_iter().map(Some).collect::<Vec<_>>();
211 for i in 0..encoder.config.m {
212 chunks[k + i] = None;
213 }
214 chunks[0] = None; assert!(encoder.decode(chunks).await.is_err());
216
217 Ok(())
218 }
219
220 #[tokio::test]
221 async fn test_redundancy_adjustment() -> Result<()> {
222 let mut encoder = ReedSolomonEncoder::new(6, 3)?;
223
224 encoder.adjust_redundancy(0.5).await?;
226 assert_eq!(encoder.config.k, 4);
227 assert_eq!(encoder.config.m, 4);
228
229 encoder.adjust_redundancy(0.95).await?;
231 assert_eq!(encoder.config.k, 8);
232 assert_eq!(encoder.config.m, 2);
233
234 Ok(())
235 }
236}