use std::fmt;
use std::sync::atomic::{AtomicU64, Ordering};
static LRC_LOCAL_REPAIRS_TOTAL: AtomicU64 = AtomicU64::new(0);
static LRC_GLOBAL_REPAIRS_TOTAL: AtomicU64 = AtomicU64::new(0);
static LRC_ENCODE_TOTAL: AtomicU64 = AtomicU64::new(0);
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct LrcMetricsSnapshot {
pub local_repairs_total: u64,
pub global_repairs_total: u64,
pub encode_total: u64,
}
impl fmt::Display for LrcMetricsSnapshot {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"lrc_local_repairs={} lrc_global_repairs={} lrc_encodes={}",
self.local_repairs_total, self.global_repairs_total, self.encode_total,
)
}
}
#[must_use]
pub fn lrc_metrics_snapshot() -> LrcMetricsSnapshot {
LrcMetricsSnapshot {
local_repairs_total: LRC_LOCAL_REPAIRS_TOTAL.load(Ordering::Relaxed),
global_repairs_total: LRC_GLOBAL_REPAIRS_TOTAL.load(Ordering::Relaxed),
encode_total: LRC_ENCODE_TOTAL.load(Ordering::Relaxed),
}
}
pub fn reset_lrc_metrics() {
LRC_LOCAL_REPAIRS_TOTAL.store(0, Ordering::Relaxed);
LRC_GLOBAL_REPAIRS_TOTAL.store(0, Ordering::Relaxed);
LRC_ENCODE_TOTAL.store(0, Ordering::Relaxed);
}
#[derive(Debug, Clone, Copy)]
pub struct LrcConfig {
pub locality: usize,
}
impl Default for LrcConfig {
fn default() -> Self {
Self { locality: 4 }
}
}
#[derive(Debug, Clone)]
pub struct LrcEncodeResult {
pub source_symbols: Vec<(u32, Vec<u8>)>,
pub local_parities: Vec<(u32, Vec<u8>)>,
pub global_parity: Vec<u8>,
pub k_source: u32,
pub locality: usize,
pub num_groups: usize,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum LrcRepairOutcome {
LocalRepair {
symbol_index: u32,
group_index: u32,
symbols_read: usize,
data: Vec<u8>,
},
GlobalRepair {
symbol_index: u32,
symbols_read: usize,
data: Vec<u8>,
},
Unrecoverable {
missing: Vec<u32>,
reason: String,
},
}
pub struct LrcCodec {
config: LrcConfig,
}
impl LrcCodec {
pub fn new(config: LrcConfig) -> Self {
assert!(
config.locality >= 2,
"locality must be >= 2, got {}",
config.locality
);
Self { config }
}
#[must_use]
pub fn locality(&self) -> usize {
self.config.locality
}
#[allow(clippy::cast_possible_truncation)]
pub fn encode(&self, source_data: &[u8], symbol_size: usize) -> LrcEncodeResult {
assert!(symbol_size > 0, "symbol_size must be > 0");
LRC_ENCODE_TOTAL.fetch_add(1, Ordering::Relaxed);
let k = source_data.len().div_ceil(symbol_size);
let mut source_symbols: Vec<(u32, Vec<u8>)> = Vec::with_capacity(k);
for i in 0..k {
let start = i * symbol_size;
let end = (start + symbol_size).min(source_data.len());
let mut sym = vec![0u8; symbol_size];
sym[..end - start].copy_from_slice(&source_data[start..end]);
source_symbols.push((i as u32, sym));
}
let r = self.config.locality;
let num_groups = k.div_ceil(r);
let mut local_parities: Vec<(u32, Vec<u8>)> = Vec::with_capacity(num_groups);
for g in 0..num_groups {
let group_start = g * r;
let group_end = ((g + 1) * r).min(k);
let mut parity = vec![0u8; symbol_size];
for (_, sym) in source_symbols.iter().take(group_end).skip(group_start) {
xor_into(&mut parity, sym);
}
local_parities.push((g as u32, parity));
}
let mut global_parity = vec![0u8; symbol_size];
for (_, sym) in &source_symbols {
xor_into(&mut global_parity, sym);
}
LrcEncodeResult {
source_symbols,
local_parities,
global_parity,
k_source: k as u32,
locality: r,
num_groups,
}
}
#[allow(clippy::cast_possible_truncation)]
pub fn repair(
&self,
encode_result: &LrcEncodeResult,
available: &std::collections::HashMap<u32, Vec<u8>>,
missing: &[u32],
) -> Vec<LrcRepairOutcome> {
let r = encode_result.locality;
let k = encode_result.k_source as usize;
let mut outcomes = Vec::with_capacity(missing.len());
let mut repaired: std::collections::HashMap<u32, Vec<u8>> =
std::collections::HashMap::new();
for &miss_idx in missing {
let group_idx = miss_idx as usize / r;
let group_start = group_idx * r;
let group_end = ((group_idx + 1) * r).min(k);
let group_missing: Vec<u32> = (group_start as u32..group_end as u32)
.filter(|&i| !available.contains_key(&i) && !repaired.contains_key(&i))
.collect();
if group_missing.len() == 1 && group_missing[0] == miss_idx {
let local_parity = &encode_result.local_parities[group_idx].1;
let mut restored = local_parity.clone();
let mut syms_read = 1; for i in group_start as u32..group_end as u32 {
if i != miss_idx {
let sym = available
.get(&i)
.or_else(|| repaired.get(&i))
.expect("non-missing symbol should be available");
xor_into(&mut restored, sym);
syms_read += 1;
}
}
LRC_LOCAL_REPAIRS_TOTAL.fetch_add(1, Ordering::Relaxed);
repaired.insert(miss_idx, restored.clone());
outcomes.push(LrcRepairOutcome::LocalRepair {
symbol_index: miss_idx,
group_index: group_idx as u32,
symbols_read: syms_read,
data: restored,
});
} else if missing.len() == 1 {
let mut restored = encode_result.global_parity.clone();
let mut syms_read = 1;
for i in 0..k as u32 {
if i != miss_idx {
let sym = available
.get(&i)
.or_else(|| repaired.get(&i))
.expect("non-missing symbol should be available");
xor_into(&mut restored, sym);
syms_read += 1;
}
}
LRC_GLOBAL_REPAIRS_TOTAL.fetch_add(1, Ordering::Relaxed);
repaired.insert(miss_idx, restored.clone());
outcomes.push(LrcRepairOutcome::GlobalRepair {
symbol_index: miss_idx,
symbols_read: syms_read,
data: restored,
});
} else {
outcomes.push(LrcRepairOutcome::Unrecoverable {
missing: group_missing,
reason: format!(
"multiple erasures in locality group {group_idx}: need more advanced repair"
),
});
}
}
outcomes
}
}
impl fmt::Debug for LrcCodec {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("LrcCodec")
.field("locality", &self.config.locality)
.finish()
}
}
fn xor_into(dst: &mut [u8], src: &[u8]) {
assert_eq!(dst.len(), src.len());
for (d, s) in dst.iter_mut().zip(src.iter()) {
*d ^= s;
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
#[test]
fn basic_encode_decode() {
let codec = LrcCodec::new(LrcConfig { locality: 2 });
let data = b"Hello, LRC world! This is a test of local reconstruction codes.";
let result = codec.encode(data, 16);
assert_eq!(result.k_source, 4); assert_eq!(result.num_groups, 2); assert_eq!(result.local_parities.len(), 2);
assert_eq!(result.global_parity.len(), 16);
}
#[test]
fn local_repair_single_failure() {
let codec = LrcCodec::new(LrcConfig { locality: 2 });
let data = vec![0xAA; 64];
let result = codec.encode(&data, 16);
let mut available: HashMap<u32, Vec<u8>> = HashMap::new();
for &(idx, ref sym) in &result.source_symbols {
if idx != 0 {
available.insert(idx, sym.clone());
}
}
let outcomes = codec.repair(&result, &available, &[0]);
assert_eq!(outcomes.len(), 1);
match &outcomes[0] {
LrcRepairOutcome::LocalRepair {
symbol_index,
group_index,
data: repaired,
..
} => {
assert_eq!(*symbol_index, 0);
assert_eq!(*group_index, 0);
assert_eq!(repaired, &result.source_symbols[0].1);
}
other => panic!("expected LocalRepair, got {other:?}"),
}
}
#[test]
fn global_repair_fallback() {
let codec = LrcCodec::new(LrcConfig { locality: 4 });
let data = vec![0xBB; 64];
let result = codec.encode(&data, 16);
let mut available: HashMap<u32, Vec<u8>> = HashMap::new();
for &(idx, ref sym) in &result.source_symbols {
if idx != 0 {
available.insert(idx, sym.clone());
}
}
let outcomes = codec.repair(&result, &available, &[0]);
assert_eq!(outcomes.len(), 1);
match &outcomes[0] {
LrcRepairOutcome::LocalRepair { data: repaired, .. } => {
assert_eq!(repaired, &result.source_symbols[0].1);
}
other => panic!("expected LocalRepair with single missing in group, got {other:?}"),
}
}
}