use std::collections::{HashMap, VecDeque};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FecConfig {
pub k: usize,
pub n: usize,
pub l: usize,
}
impl Default for FecConfig {
fn default() -> Self {
Self { k: 5, n: 6, l: 1 }
}
}
impl FecConfig {
#[must_use]
pub fn one_dimensional(k: usize) -> Self {
Self { k, n: k + 1, l: 1 }
}
#[must_use]
pub fn two_dimensional(k: usize, l: usize) -> Self {
Self { k, n: k + l + 1, l }
}
#[must_use]
pub fn is_2d(&self) -> bool {
self.l > 1
}
}
#[derive(Debug, Clone)]
pub struct FecPacket {
pub sequence_number: u16,
pub timestamp: u32,
pub fec_index: u32,
pub payload: Vec<u8>,
pub mask: u64,
}
impl FecPacket {
#[must_use]
pub fn covered_count(&self) -> u32 {
self.mask.count_ones()
}
}
pub struct FecEncoder {
config: FecConfig,
row_buffer: VecDeque<(u16, Vec<u8>)>,
row_xor: Vec<u8>,
fec_seq: u16,
group_timestamp: u32,
group_mask: u64,
col_buffers: Vec<VecDeque<Vec<u8>>>,
row_count: usize,
}
impl FecEncoder {
#[must_use]
pub fn new(config: FecConfig) -> Self {
let l = config.l;
let k = config.k;
let col_buffers = vec![VecDeque::with_capacity(l); k];
Self {
config,
row_buffer: VecDeque::new(),
row_xor: Vec::new(),
fec_seq: 0,
group_timestamp: 0,
group_mask: 0,
col_buffers,
row_count: 0,
}
}
pub fn feed_packet(&mut self, seq: u16, payload: &[u8]) -> Option<FecPacket> {
if self.row_buffer.is_empty() {
self.group_timestamp = u32::from(seq) * 90;
self.group_mask = 0;
}
xor_into(&mut self.row_xor, payload);
let bit_pos = self.row_buffer.len() as u64;
self.group_mask |= 1u64 << bit_pos;
self.row_buffer.push_back((seq, payload.to_vec()));
if self.config.is_2d() {
let col = (self.row_buffer.len() - 1) % self.config.k;
self.col_buffers[col].push_back(payload.to_vec());
}
if self.row_buffer.len() >= self.config.k {
let fec_pkt = self.emit_row_fec();
self.row_count += 1;
return Some(fec_pkt);
}
None
}
pub fn take_column_fec(&mut self) -> Vec<FecPacket> {
if !self.config.is_2d() || self.row_count < self.config.l {
return Vec::new();
}
let mut out = Vec::new();
for col in 0..self.config.k {
let mut col_xor: Vec<u8> = Vec::new();
let mut mask = 0u64;
for (row, payload) in self.col_buffers[col].iter().enumerate() {
xor_into(&mut col_xor, payload);
mask |= 1u64 << row as u64;
}
let fec_pkt = FecPacket {
sequence_number: self.fec_seq,
timestamp: 0,
fec_index: (self.config.k + col) as u32,
payload: col_xor,
mask,
};
self.fec_seq = self.fec_seq.wrapping_add(1);
out.push(fec_pkt);
self.col_buffers[col].clear();
}
self.row_count = 0;
out
}
fn emit_row_fec(&mut self) -> FecPacket {
let fec_pkt = FecPacket {
sequence_number: self.fec_seq,
timestamp: self.group_timestamp,
fec_index: 0,
payload: std::mem::take(&mut self.row_xor),
mask: self.group_mask,
};
self.fec_seq = self.fec_seq.wrapping_add(1);
self.row_buffer.clear();
self.group_mask = 0;
fec_pkt
}
}
pub struct FecDecoder {
config: FecConfig,
received_source: HashMap<u16, Vec<u8>>,
received_fec: HashMap<u32, FecPacket>,
recovered: HashMap<u16, Vec<u8>>,
group_bases: Vec<u16>,
recovered_groups: std::collections::HashSet<u16>,
}
impl FecDecoder {
#[must_use]
pub fn new(config: FecConfig) -> Self {
Self {
config,
received_source: HashMap::new(),
received_fec: HashMap::new(),
recovered: HashMap::new(),
group_bases: Vec::new(),
recovered_groups: std::collections::HashSet::new(),
}
}
pub fn feed_source(&mut self, seq: u16, payload: Vec<u8>) {
self.received_source.insert(seq, payload);
}
pub fn feed_fec(&mut self, pkt: FecPacket) {
self.received_fec.insert(pkt.fec_index, pkt);
}
pub fn register_group_base(&mut self, base_seq: u16) {
if !self.group_bases.contains(&base_seq) {
self.group_bases.push(base_seq);
}
}
pub fn try_recover(&mut self) -> Vec<(u16, Vec<u8>)> {
let mut bases: Vec<u16> = self.group_bases.clone();
for (&seq, _) in &self.received_source {
let base = self.group_base_for(seq);
if !bases.contains(&base) {
bases.push(base);
}
}
for base in bases {
self.attempt_group_recovery(base);
}
self.recovered.drain().collect()
}
#[must_use]
pub fn received_source(&self) -> &HashMap<u16, Vec<u8>> {
&self.received_source
}
#[must_use]
pub fn received_fec(&self) -> &HashMap<u32, FecPacket> {
&self.received_fec
}
fn group_base_for(&self, seq: u16) -> u16 {
let k = self.config.k as u16;
let offset = seq.wrapping_div(k);
offset.wrapping_mul(k)
}
fn attempt_group_recovery(&mut self, base: u16) {
if self.recovered_groups.contains(&base) {
return;
}
let k = self.config.k;
let group_seqs: Vec<u16> = (0..k).map(|i| base.wrapping_add(i as u16)).collect();
let mut missing: Vec<u16> = Vec::new();
for &seq in &group_seqs {
if !self.received_source.contains_key(&seq) && !self.recovered.contains_key(&seq) {
missing.push(seq);
}
}
if missing.len() != 1 {
return;
}
let fec_opt = self.received_fec.values().find(|fec| {
let covered_count = fec.covered_count() as usize;
covered_count == k
});
let fec_payload = match fec_opt {
Some(f) => f.payload.clone(),
None => return,
};
let mut recovered_payload = fec_payload;
for &seq in &group_seqs {
if seq == missing[0] {
continue;
}
let src = self
.received_source
.get(&seq)
.or_else(|| self.recovered.get(&seq));
if let Some(payload) = src {
xor_into(&mut recovered_payload, payload);
}
}
self.recovered.insert(missing[0], recovered_payload);
self.recovered_groups.insert(base);
}
}
fn xor_into(dst: &mut Vec<u8>, src: &[u8]) {
if dst.len() < src.len() {
dst.resize(src.len(), 0);
}
for (d, s) in dst.iter_mut().zip(src.iter()) {
*d ^= s;
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_payload(seed: u8, len: usize) -> Vec<u8> {
(0..len).map(|i| seed.wrapping_add(i as u8)).collect()
}
#[test]
fn test_default_config() {
let cfg = FecConfig::default();
assert_eq!(cfg.k, 5);
assert_eq!(cfg.n, 6);
assert_eq!(cfg.l, 1);
}
#[test]
fn test_one_dimensional_config() {
let cfg = FecConfig::one_dimensional(8);
assert_eq!(cfg.k, 8);
assert_eq!(cfg.n, 9);
assert_eq!(cfg.l, 1);
assert!(!cfg.is_2d());
}
#[test]
fn test_two_dimensional_config() {
let cfg = FecConfig::two_dimensional(4, 3);
assert_eq!(cfg.k, 4);
assert_eq!(cfg.l, 3);
assert_eq!(cfg.n, 8); assert!(cfg.is_2d());
}
#[test]
fn test_encoder_produces_fec_after_k_packets() {
let cfg = FecConfig::one_dimensional(3);
let mut enc = FecEncoder::new(cfg);
assert!(enc.feed_packet(0, &make_payload(0xAA, 10)).is_none());
assert!(enc.feed_packet(1, &make_payload(0xBB, 10)).is_none());
let fec = enc.feed_packet(2, &make_payload(0xCC, 10));
assert!(fec.is_some());
}
#[test]
fn test_fec_payload_is_xor() {
let cfg = FecConfig::one_dimensional(3);
let mut enc = FecEncoder::new(cfg);
let p0 = make_payload(0x11, 4);
let p1 = make_payload(0x22, 4);
let p2 = make_payload(0x33, 4);
enc.feed_packet(0, &p0);
enc.feed_packet(1, &p1);
let fec = enc.feed_packet(2, &p2).expect("must produce FEC");
for i in 0..4 {
assert_eq!(fec.payload[i], p0[i] ^ p1[i] ^ p2[i]);
}
}
#[test]
fn test_fec_mask_popcount() {
let cfg = FecConfig::one_dimensional(5);
let mut enc = FecEncoder::new(cfg.clone());
for seq in 0..cfg.k as u16 {
enc.feed_packet(seq, &make_payload(seq as u8, 8));
}
let mut enc2 = FecEncoder::new(FecConfig::one_dimensional(5));
let mut fec_pkt = None;
for seq in 0..5u16 {
let result = enc2.feed_packet(seq, &make_payload(seq as u8, 8));
if result.is_some() {
fec_pkt = result;
}
}
let fec = fec_pkt.expect("FEC must be produced");
assert_eq!(fec.covered_count(), 5);
}
#[test]
fn test_xor_into_extends_dst() {
let mut dst = vec![0xFFu8; 2];
let src = vec![0x0Fu8; 4];
xor_into(&mut dst, &src);
assert_eq!(dst.len(), 4);
assert_eq!(dst[0], 0xFF ^ 0x0F);
assert_eq!(dst[2], 0x00 ^ 0x0F); }
#[test]
fn test_decoder_recovers_single_loss() {
let cfg = FecConfig::one_dimensional(3);
let mut enc = FecEncoder::new(cfg.clone());
let payloads: Vec<Vec<u8>> = (0..3).map(|i| make_payload(i * 0x10, 8)).collect();
let mut fec_pkt = None;
for (seq, payload) in payloads.iter().enumerate() {
let r = enc.feed_packet(seq as u16, payload);
if r.is_some() {
fec_pkt = r;
}
}
let fec = fec_pkt.expect("FEC must be produced");
let mut dec = FecDecoder::new(cfg);
dec.feed_source(0, payloads[0].clone());
dec.feed_source(2, payloads[2].clone());
dec.feed_fec(fec);
dec.register_group_base(0);
let recovered = dec.try_recover();
assert_eq!(recovered.len(), 1);
let (seq, data) = &recovered[0];
assert_eq!(*seq, 1);
assert_eq!(data, &payloads[1]);
}
#[test]
fn test_decoder_cannot_recover_two_losses() {
let cfg = FecConfig::one_dimensional(3);
let mut enc = FecEncoder::new(cfg.clone());
let payloads: Vec<Vec<u8>> = (0..3).map(|i| make_payload(i * 0x10, 8)).collect();
let mut fec_pkt = None;
for (seq, payload) in payloads.iter().enumerate() {
let r = enc.feed_packet(seq as u16, payload);
if r.is_some() {
fec_pkt = r;
}
}
let fec = fec_pkt.expect("FEC must be produced");
let mut dec = FecDecoder::new(cfg);
dec.feed_source(0, payloads[0].clone());
dec.feed_fec(fec);
dec.register_group_base(0);
let recovered = dec.try_recover();
assert_eq!(recovered.len(), 0);
}
#[test]
fn test_encoder_second_group() {
let cfg = FecConfig::one_dimensional(2);
let mut enc = FecEncoder::new(cfg);
enc.feed_packet(0, &make_payload(0xAA, 4));
let fec1 = enc.feed_packet(1, &make_payload(0xBB, 4)).expect("FEC1");
enc.feed_packet(2, &make_payload(0xCC, 4));
let fec2 = enc.feed_packet(3, &make_payload(0xDD, 4)).expect("FEC2");
assert_ne!(fec1.sequence_number, fec2.sequence_number);
}
#[test]
fn test_fec_packet_covered_count() {
let pkt = FecPacket {
sequence_number: 0,
timestamp: 0,
fec_index: 0,
payload: vec![],
mask: 0b0001_0111, };
assert_eq!(pkt.covered_count(), 4);
}
#[test]
fn test_decoder_no_fec_no_recovery() {
let cfg = FecConfig::one_dimensional(3);
let mut dec = FecDecoder::new(cfg);
dec.feed_source(0, make_payload(0x11, 8));
dec.feed_source(2, make_payload(0x33, 8));
dec.register_group_base(0);
let recovered = dec.try_recover();
assert!(recovered.is_empty());
}
#[test]
fn test_2d_config_is_2d() {
assert!(FecConfig::two_dimensional(4, 2).is_2d());
assert!(!FecConfig::one_dimensional(4).is_2d());
}
#[test]
fn test_encoder_2d_column_fec() {
let cfg = FecConfig::two_dimensional(3, 2); let mut enc = FecEncoder::new(cfg.clone());
for seq in 0..3u16 {
enc.feed_packet(seq, &make_payload(seq as u8, 6));
}
assert!(enc.take_column_fec().is_empty());
for seq in 3..6u16 {
enc.feed_packet(seq, &make_payload(seq as u8, 6));
}
let col_fec = enc.take_column_fec();
assert_eq!(col_fec.len(), cfg.k); }
#[test]
fn test_recovered_map_drained_after_try_recover() {
let cfg = FecConfig::one_dimensional(2);
let mut enc = FecEncoder::new(cfg.clone());
let p0 = make_payload(0xAA, 4);
let p1 = make_payload(0xBB, 4);
enc.feed_packet(0, &p0);
let fec = enc.feed_packet(1, &p1).expect("FEC");
let mut dec = FecDecoder::new(cfg);
dec.feed_source(0, p0);
dec.feed_fec(fec);
dec.register_group_base(0);
let r1 = dec.try_recover();
assert_eq!(r1.len(), 1);
let r2 = dec.try_recover();
assert!(r2.is_empty());
}
}