use reed_solomon_erasure::galois_8::ReedSolomon;
#[derive(Debug, Clone, Copy)]
pub struct FecConfig {
pub data_shards: usize,
pub parity_shards: usize,
}
impl FecConfig {
pub fn for_loss_rate(loss: f64) -> Self {
match loss {
l if l < 0.005 => Self {
data_shards: 20,
parity_shards: 1,
}, l if l < 0.01 => Self {
data_shards: 10,
parity_shards: 2,
}, l if l < 0.03 => Self {
data_shards: 10,
parity_shards: 4,
}, l if l < 0.05 => Self {
data_shards: 8,
parity_shards: 4,
}, _ => Self {
data_shards: 6,
parity_shards: 4,
}, }
}
pub fn total_shards(&self) -> usize {
self.data_shards + self.parity_shards
}
pub fn overhead(&self) -> f64 {
self.parity_shards as f64 / self.data_shards as f64
}
}
pub struct FecEncoder {
rs: ReedSolomon,
pub config: FecConfig,
}
impl FecEncoder {
pub fn new(config: FecConfig) -> Self {
let rs =
ReedSolomon::new(config.data_shards, config.parity_shards).expect("invalid FEC config");
Self { rs, config }
}
pub fn encode(&self, shards: &mut Vec<Vec<u8>>) {
let shard_len = shards[0].len();
while shards.len() < self.config.total_shards() {
shards.push(vec![0u8; shard_len]);
}
self.rs.encode(shards).expect("FEC encode failed");
}
pub fn reconstruct(&self, shards: &mut [Option<Vec<u8>>]) -> Result<(), FecError> {
self.rs
.reconstruct(shards)
.map_err(|_| FecError::TooManyLost)
}
}
pub struct LossTracker {
window: Vec<bool>, pos: usize,
count: usize,
}
impl LossTracker {
pub fn new(window_size: usize) -> Self {
Self {
window: vec![true; window_size],
pos: 0,
count: 0,
}
}
pub fn record(&mut self, received: bool) {
if !self.window[self.pos] {
self.count = self.count.saturating_sub(1);
}
if !received {
self.count += 1;
}
self.window[self.pos] = received;
self.pos = (self.pos + 1) % self.window.len();
}
pub fn loss_rate(&self) -> f64 {
self.count as f64 / self.window.len() as f64
}
pub fn recommended_config(&self) -> FecConfig {
FecConfig::for_loss_rate(self.loss_rate())
}
}
#[derive(Debug)]
pub enum FecError {
TooManyLost,
}
impl std::fmt::Display for FecError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
FecError::TooManyLost => write!(f, "too many shards lost to reconstruct"),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn encode_decode_no_loss() {
let config = FecConfig {
data_shards: 4,
parity_shards: 2,
};
let enc = FecEncoder::new(config);
let mut shards: Vec<Vec<u8>> = (0..4).map(|i| vec![i as u8; 100]).collect();
enc.encode(&mut shards);
assert_eq!(shards.len(), 6);
let mut opt: Vec<Option<Vec<u8>>> = shards.into_iter().map(Some).collect();
enc.reconstruct(&mut opt).unwrap();
}
#[test]
fn recover_from_loss() {
let config = FecConfig {
data_shards: 4,
parity_shards: 2,
};
let enc = FecEncoder::new(config);
let original: Vec<Vec<u8>> = (0..4).map(|i| vec![i as u8 + 10; 100]).collect();
let mut shards = original.clone();
enc.encode(&mut shards);
let mut opt: Vec<Option<Vec<u8>>> = shards.into_iter().map(Some).collect();
opt[0] = None;
opt[2] = None;
enc.reconstruct(&mut opt).unwrap();
assert_eq!(opt[0].as_ref().unwrap(), &original[0]);
assert_eq!(opt[2].as_ref().unwrap(), &original[2]);
}
#[test]
fn too_many_lost_fails() {
let config = FecConfig {
data_shards: 4,
parity_shards: 2,
};
let enc = FecEncoder::new(config);
let mut shards: Vec<Vec<u8>> = (0..4).map(|i| vec![i as u8; 100]).collect();
enc.encode(&mut shards);
let mut opt: Vec<Option<Vec<u8>>> = shards.into_iter().map(Some).collect();
opt[0] = None;
opt[1] = None;
opt[2] = None;
assert!(enc.reconstruct(&mut opt).is_err());
}
#[test]
fn loss_tracker_adapts() {
let mut tracker = LossTracker::new(100);
for _ in 0..100 {
tracker.record(true);
}
let config = tracker.recommended_config();
assert_eq!(config.parity_shards, 1);
let mut tracker2 = LossTracker::new(100);
for i in 0..100 {
tracker2.record(i % 20 != 0); }
let config2 = tracker2.recommended_config();
assert!(config2.parity_shards >= 4);
}
#[test]
fn adaptive_config_thresholds() {
assert_eq!(FecConfig::for_loss_rate(0.0).parity_shards, 1);
assert_eq!(FecConfig::for_loss_rate(0.008).parity_shards, 2);
assert_eq!(FecConfig::for_loss_rate(0.02).parity_shards, 4);
assert_eq!(FecConfig::for_loss_rate(0.04).parity_shards, 4);
assert_eq!(FecConfig::for_loss_rate(0.10).parity_shards, 4);
}
#[test]
fn adaptive_config_all_boundaries() {
let t1 = FecConfig::for_loss_rate(0.004);
assert_eq!((t1.data_shards, t1.parity_shards), (20, 1));
let t2 = FecConfig::for_loss_rate(0.005); assert_eq!((t2.data_shards, t2.parity_shards), (10, 2));
let t3 = FecConfig::for_loss_rate(0.01); assert_eq!((t3.data_shards, t3.parity_shards), (10, 4));
let t4 = FecConfig::for_loss_rate(0.03); assert_eq!((t4.data_shards, t4.parity_shards), (8, 4));
let t5 = FecConfig::for_loss_rate(0.05); assert_eq!((t5.data_shards, t5.parity_shards), (6, 4));
}
#[test]
fn overhead_calculation() {
let c = FecConfig {
data_shards: 10,
parity_shards: 2,
};
assert!((c.overhead() - 0.2).abs() < f64::EPSILON);
let c2 = FecConfig {
data_shards: 6,
parity_shards: 4,
};
assert!((c2.overhead() - 4.0 / 6.0).abs() < f64::EPSILON);
}
#[test]
fn total_shards() {
let c = FecConfig {
data_shards: 10,
parity_shards: 4,
};
assert_eq!(c.total_shards(), 14);
}
#[test]
fn loss_tracker_window_wraparound() {
let mut tracker = LossTracker::new(4);
tracker.record(true);
tracker.record(false);
tracker.record(true);
tracker.record(false);
assert!((tracker.loss_rate() - 0.5).abs() < f64::EPSILON);
tracker.record(true);
tracker.record(true);
assert!((tracker.loss_rate() - 0.25).abs() < f64::EPSILON);
}
}