1use std::fmt;
21use std::sync::atomic::{AtomicU64, Ordering};
22
23static LRC_LOCAL_REPAIRS_TOTAL: AtomicU64 = AtomicU64::new(0);
26static LRC_GLOBAL_REPAIRS_TOTAL: AtomicU64 = AtomicU64::new(0);
27static LRC_ENCODE_TOTAL: AtomicU64 = AtomicU64::new(0);
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
31pub struct LrcMetricsSnapshot {
32 pub local_repairs_total: u64,
34 pub global_repairs_total: u64,
36 pub encode_total: u64,
38}
39
40impl fmt::Display for LrcMetricsSnapshot {
41 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42 write!(
43 f,
44 "lrc_local_repairs={} lrc_global_repairs={} lrc_encodes={}",
45 self.local_repairs_total, self.global_repairs_total, self.encode_total,
46 )
47 }
48}
49
50#[must_use]
52pub fn lrc_metrics_snapshot() -> LrcMetricsSnapshot {
53 LrcMetricsSnapshot {
54 local_repairs_total: LRC_LOCAL_REPAIRS_TOTAL.load(Ordering::Relaxed),
55 global_repairs_total: LRC_GLOBAL_REPAIRS_TOTAL.load(Ordering::Relaxed),
56 encode_total: LRC_ENCODE_TOTAL.load(Ordering::Relaxed),
57 }
58}
59
60pub fn reset_lrc_metrics() {
62 LRC_LOCAL_REPAIRS_TOTAL.store(0, Ordering::Relaxed);
63 LRC_GLOBAL_REPAIRS_TOTAL.store(0, Ordering::Relaxed);
64 LRC_ENCODE_TOTAL.store(0, Ordering::Relaxed);
65}
66
67#[derive(Debug, Clone, Copy)]
71pub struct LrcConfig {
72 pub locality: usize,
76}
77
78impl Default for LrcConfig {
79 fn default() -> Self {
80 Self { locality: 4 }
81 }
82}
83
84#[derive(Debug, Clone)]
88pub struct LrcEncodeResult {
89 pub source_symbols: Vec<(u32, Vec<u8>)>,
91 pub local_parities: Vec<(u32, Vec<u8>)>,
94 pub global_parity: Vec<u8>,
96 pub k_source: u32,
98 pub locality: usize,
100 pub num_groups: usize,
102}
103
104#[derive(Debug, Clone, PartialEq, Eq)]
106pub enum LrcRepairOutcome {
107 LocalRepair {
109 symbol_index: u32,
111 group_index: u32,
113 symbols_read: usize,
115 data: Vec<u8>,
117 },
118 GlobalRepair {
120 symbol_index: u32,
122 symbols_read: usize,
124 data: Vec<u8>,
126 },
127 Unrecoverable {
129 missing: Vec<u32>,
131 reason: String,
133 },
134}
135
136pub struct LrcCodec {
143 config: LrcConfig,
144}
145
146impl LrcCodec {
147 pub fn new(config: LrcConfig) -> Self {
149 assert!(
150 config.locality >= 2,
151 "locality must be >= 2, got {}",
152 config.locality
153 );
154 Self { config }
155 }
156
157 #[must_use]
159 pub fn locality(&self) -> usize {
160 self.config.locality
161 }
162
163 #[allow(clippy::cast_possible_truncation)]
169 pub fn encode(&self, source_data: &[u8], symbol_size: usize) -> LrcEncodeResult {
170 assert!(symbol_size > 0, "symbol_size must be > 0");
171
172 LRC_ENCODE_TOTAL.fetch_add(1, Ordering::Relaxed);
173
174 let k = source_data.len().div_ceil(symbol_size);
176 let mut source_symbols: Vec<(u32, Vec<u8>)> = Vec::with_capacity(k);
177
178 for i in 0..k {
179 let start = i * symbol_size;
180 let end = (start + symbol_size).min(source_data.len());
181 let mut sym = vec![0u8; symbol_size];
182 sym[..end - start].copy_from_slice(&source_data[start..end]);
183 source_symbols.push((i as u32, sym));
184 }
185
186 let r = self.config.locality;
188 let num_groups = k.div_ceil(r);
189 let mut local_parities: Vec<(u32, Vec<u8>)> = Vec::with_capacity(num_groups);
190
191 for g in 0..num_groups {
192 let group_start = g * r;
193 let group_end = ((g + 1) * r).min(k);
194
195 let mut parity = vec![0u8; symbol_size];
196 for (_, sym) in source_symbols.iter().take(group_end).skip(group_start) {
197 xor_into(&mut parity, sym);
198 }
199 local_parities.push((g as u32, parity));
200 }
201
202 let mut global_parity = vec![0u8; symbol_size];
204 for (_, sym) in &source_symbols {
205 xor_into(&mut global_parity, sym);
206 }
207
208 LrcEncodeResult {
209 source_symbols,
210 local_parities,
211 global_parity,
212 k_source: k as u32,
213 locality: r,
214 num_groups,
215 }
216 }
217
218 #[allow(clippy::cast_possible_truncation)]
224 pub fn repair(
225 &self,
226 encode_result: &LrcEncodeResult,
227 available: &std::collections::HashMap<u32, Vec<u8>>,
228 missing: &[u32],
229 ) -> Vec<LrcRepairOutcome> {
230 let r = encode_result.locality;
231 let k = encode_result.k_source as usize;
232 let mut outcomes = Vec::with_capacity(missing.len());
233
234 let mut repaired: std::collections::HashMap<u32, Vec<u8>> =
237 std::collections::HashMap::new();
238
239 for &miss_idx in missing {
240 let group_idx = miss_idx as usize / r;
242 let group_start = group_idx * r;
243 let group_end = ((group_idx + 1) * r).min(k);
244
245 let group_missing: Vec<u32> = (group_start as u32..group_end as u32)
247 .filter(|&i| !available.contains_key(&i) && !repaired.contains_key(&i))
248 .collect();
249
250 if group_missing.len() == 1 && group_missing[0] == miss_idx {
251 let local_parity = &encode_result.local_parities[group_idx].1;
253 let mut restored = local_parity.clone();
254
255 let mut syms_read = 1; for i in group_start as u32..group_end as u32 {
257 if i != miss_idx {
258 let sym = available
259 .get(&i)
260 .or_else(|| repaired.get(&i))
261 .expect("non-missing symbol should be available");
262 xor_into(&mut restored, sym);
263 syms_read += 1;
264 }
265 }
266
267 LRC_LOCAL_REPAIRS_TOTAL.fetch_add(1, Ordering::Relaxed);
268 repaired.insert(miss_idx, restored.clone());
269 outcomes.push(LrcRepairOutcome::LocalRepair {
270 symbol_index: miss_idx,
271 group_index: group_idx as u32,
272 symbols_read: syms_read,
273 data: restored,
274 });
275 } else if missing.len() == 1 {
276 let mut restored = encode_result.global_parity.clone();
278 let mut syms_read = 1; for i in 0..k as u32 {
281 if i != miss_idx {
282 let sym = available
283 .get(&i)
284 .or_else(|| repaired.get(&i))
285 .expect("non-missing symbol should be available");
286 xor_into(&mut restored, sym);
287 syms_read += 1;
288 }
289 }
290
291 LRC_GLOBAL_REPAIRS_TOTAL.fetch_add(1, Ordering::Relaxed);
292 repaired.insert(miss_idx, restored.clone());
293 outcomes.push(LrcRepairOutcome::GlobalRepair {
294 symbol_index: miss_idx,
295 symbols_read: syms_read,
296 data: restored,
297 });
298 } else {
299 outcomes.push(LrcRepairOutcome::Unrecoverable {
302 missing: group_missing,
303 reason: format!(
304 "multiple erasures in locality group {group_idx}: need more advanced repair"
305 ),
306 });
307 }
308 }
309
310 outcomes
311 }
312}
313
314impl fmt::Debug for LrcCodec {
315 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
316 f.debug_struct("LrcCodec")
317 .field("locality", &self.config.locality)
318 .finish()
319 }
320}
321
322fn xor_into(dst: &mut [u8], src: &[u8]) {
324 assert_eq!(dst.len(), src.len());
325 for (d, s) in dst.iter_mut().zip(src.iter()) {
326 *d ^= s;
327 }
328}
329
330#[cfg(test)]
331mod tests {
332 use super::*;
333 use std::collections::HashMap;
334
335 #[test]
336 fn basic_encode_decode() {
337 let codec = LrcCodec::new(LrcConfig { locality: 2 });
338 let data = b"Hello, LRC world! This is a test of local reconstruction codes.";
339 let result = codec.encode(data, 16);
340
341 assert_eq!(result.k_source, 4); assert_eq!(result.num_groups, 2); assert_eq!(result.local_parities.len(), 2);
344 assert_eq!(result.global_parity.len(), 16);
345 }
346
347 #[test]
348 fn local_repair_single_failure() {
349 let codec = LrcCodec::new(LrcConfig { locality: 2 });
350 let data = vec![0xAA; 64];
351 let result = codec.encode(&data, 16);
352
353 let mut available: HashMap<u32, Vec<u8>> = HashMap::new();
355 for &(idx, ref sym) in &result.source_symbols {
356 if idx != 0 {
357 available.insert(idx, sym.clone());
358 }
359 }
360
361 let outcomes = codec.repair(&result, &available, &[0]);
362 assert_eq!(outcomes.len(), 1);
363 match &outcomes[0] {
364 LrcRepairOutcome::LocalRepair {
365 symbol_index,
366 group_index,
367 data: repaired,
368 ..
369 } => {
370 assert_eq!(*symbol_index, 0);
371 assert_eq!(*group_index, 0);
372 assert_eq!(repaired, &result.source_symbols[0].1);
373 }
374 other => panic!("expected LocalRepair, got {other:?}"),
375 }
376 }
377
378 #[test]
379 fn global_repair_fallback() {
380 let codec = LrcCodec::new(LrcConfig { locality: 4 });
381 let data = vec![0xBB; 64];
382 let result = codec.encode(&data, 16);
383
384 let mut available: HashMap<u32, Vec<u8>> = HashMap::new();
387 for &(idx, ref sym) in &result.source_symbols {
388 if idx != 0 {
389 available.insert(idx, sym.clone());
390 }
391 }
392
393 let outcomes = codec.repair(&result, &available, &[0]);
394 assert_eq!(outcomes.len(), 1);
395 match &outcomes[0] {
396 LrcRepairOutcome::LocalRepair { data: repaired, .. } => {
397 assert_eq!(repaired, &result.source_symbols[0].1);
398 }
399 other => panic!("expected LocalRepair with single missing in group, got {other:?}"),
400 }
401 }
402}