Skip to main content

fsqlite_core/
lrc.rs

1//! Local Reconstruction Codes (§1.4) for distributed repair.
2//!
3//! LRC extends standard erasure coding with *locality groups*: source
4//! symbols are partitioned into groups of size `r`, and each group gets
5//! its own local parity symbol. When a single symbol is lost within a
6//! group, it can be repaired by reading only `r` symbols (the group
7//! members + local parity) instead of all `k` source symbols.
8//!
9//! This reduces repair I/O by a factor of `k/r` compared to standard
10//! Reed-Solomon or RaptorQ codes for single-failure cases.
11//!
12//! # Design
13//!
14//! - `LrcCodec` wraps a locality group size `r` and produces local +
15//!   global parity symbols.
16//! - Local parity: XOR of all symbols in a locality group.
17//! - Global parity: XOR of all source symbols (full redundancy).
18//! - Repair: try local repair first (O(r)), fall back to global (O(k)).
19
20use std::fmt;
21use std::sync::atomic::{AtomicU64, Ordering};
22
23// ── Metrics ──────────────────────────────────────────────────────────────
24
25static LRC_LOCAL_REPAIRS_TOTAL: AtomicU64 = AtomicU64::new(0);
26static LRC_GLOBAL_REPAIRS_TOTAL: AtomicU64 = AtomicU64::new(0);
27static LRC_ENCODE_TOTAL: AtomicU64 = AtomicU64::new(0);
28
29/// Snapshot of LRC metrics.
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
31pub struct LrcMetricsSnapshot {
32    /// Total local repairs (within a single locality group).
33    pub local_repairs_total: u64,
34    /// Total global repairs (required reading all source symbols).
35    pub global_repairs_total: u64,
36    /// Total encode operations.
37    pub encode_total: u64,
38}
39
40impl fmt::Display for LrcMetricsSnapshot {
41    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42        write!(
43            f,
44            "lrc_local_repairs={} lrc_global_repairs={} lrc_encodes={}",
45            self.local_repairs_total, self.global_repairs_total, self.encode_total,
46        )
47    }
48}
49
50/// Return a snapshot of LRC metrics.
51#[must_use]
52pub fn lrc_metrics_snapshot() -> LrcMetricsSnapshot {
53    LrcMetricsSnapshot {
54        local_repairs_total: LRC_LOCAL_REPAIRS_TOTAL.load(Ordering::Relaxed),
55        global_repairs_total: LRC_GLOBAL_REPAIRS_TOTAL.load(Ordering::Relaxed),
56        encode_total: LRC_ENCODE_TOTAL.load(Ordering::Relaxed),
57    }
58}
59
60/// Reset LRC metrics.
61pub fn reset_lrc_metrics() {
62    LRC_LOCAL_REPAIRS_TOTAL.store(0, Ordering::Relaxed);
63    LRC_GLOBAL_REPAIRS_TOTAL.store(0, Ordering::Relaxed);
64    LRC_ENCODE_TOTAL.store(0, Ordering::Relaxed);
65}
66
67// ── LRC Configuration ───────────────────────────────────────────────────
68
69/// Configuration for the LRC codec.
70#[derive(Debug, Clone, Copy)]
71pub struct LrcConfig {
72    /// Locality group size: number of source symbols per local parity group.
73    /// Smaller `r` means cheaper local repair but more parity overhead.
74    /// Must be >= 2.
75    pub locality: usize,
76}
77
78impl Default for LrcConfig {
79    fn default() -> Self {
80        Self { locality: 4 }
81    }
82}
83
84// ── Encode / Decode Types ───────────────────────────────────────────────
85
86/// Result of an LRC encode operation.
87#[derive(Debug, Clone)]
88pub struct LrcEncodeResult {
89    /// Source symbols: (index, data).
90    pub source_symbols: Vec<(u32, Vec<u8>)>,
91    /// Local parity symbols: (group_index, data).
92    /// One per locality group.
93    pub local_parities: Vec<(u32, Vec<u8>)>,
94    /// Global parity symbol: XOR of all source symbols.
95    pub global_parity: Vec<u8>,
96    /// Number of source symbols.
97    pub k_source: u32,
98    /// Locality group size used.
99    pub locality: usize,
100    /// Number of locality groups.
101    pub num_groups: usize,
102}
103
104/// Outcome of an LRC repair attempt.
105#[derive(Debug, Clone, PartialEq, Eq)]
106pub enum LrcRepairOutcome {
107    /// Repaired using only local parity (cheap: read `r` symbols).
108    LocalRepair {
109        /// Index of the repaired symbol.
110        symbol_index: u32,
111        /// Locality group that provided the repair.
112        group_index: u32,
113        /// Number of symbols read for repair.
114        symbols_read: usize,
115        /// Repaired data.
116        data: Vec<u8>,
117    },
118    /// Repaired using global parity (expensive: read all `k` symbols).
119    GlobalRepair {
120        /// Index of the repaired symbol.
121        symbol_index: u32,
122        /// Number of symbols read for repair.
123        symbols_read: usize,
124        /// Repaired data.
125        data: Vec<u8>,
126    },
127    /// Repair failed: too many erasures.
128    Unrecoverable {
129        /// Indices of missing symbols that could not be repaired.
130        missing: Vec<u32>,
131        /// Reason for failure.
132        reason: String,
133    },
134}
135
136// ── LRC Codec ────────────────────────────────────────────────────────────
137
138/// Local Reconstruction Codes codec.
139///
140/// Produces local parity symbols (one per locality group) plus a global
141/// parity symbol. Single failures within a group can be repaired locally.
142pub struct LrcCodec {
143    config: LrcConfig,
144}
145
146impl LrcCodec {
147    /// Create a new LRC codec with the given configuration.
148    pub fn new(config: LrcConfig) -> Self {
149        assert!(
150            config.locality >= 2,
151            "locality must be >= 2, got {}",
152            config.locality
153        );
154        Self { config }
155    }
156
157    /// Return the locality group size.
158    #[must_use]
159    pub fn locality(&self) -> usize {
160        self.config.locality
161    }
162
163    /// Encode source data into source symbols + local/global parities.
164    ///
165    /// `source_data` is split into `symbol_size`-byte symbols.
166    /// Each locality group of `r` symbols gets a local parity (XOR).
167    /// A global parity (XOR of all source symbols) is also computed.
168    #[allow(clippy::cast_possible_truncation)]
169    pub fn encode(&self, source_data: &[u8], symbol_size: usize) -> LrcEncodeResult {
170        assert!(symbol_size > 0, "symbol_size must be > 0");
171
172        LRC_ENCODE_TOTAL.fetch_add(1, Ordering::Relaxed);
173
174        // Split source into symbols, padding the last one if needed.
175        let k = source_data.len().div_ceil(symbol_size);
176        let mut source_symbols: Vec<(u32, Vec<u8>)> = Vec::with_capacity(k);
177
178        for i in 0..k {
179            let start = i * symbol_size;
180            let end = (start + symbol_size).min(source_data.len());
181            let mut sym = vec![0u8; symbol_size];
182            sym[..end - start].copy_from_slice(&source_data[start..end]);
183            source_symbols.push((i as u32, sym));
184        }
185
186        // Compute locality groups and local parities.
187        let r = self.config.locality;
188        let num_groups = k.div_ceil(r);
189        let mut local_parities: Vec<(u32, Vec<u8>)> = Vec::with_capacity(num_groups);
190
191        for g in 0..num_groups {
192            let group_start = g * r;
193            let group_end = ((g + 1) * r).min(k);
194
195            let mut parity = vec![0u8; symbol_size];
196            for (_, sym) in source_symbols.iter().take(group_end).skip(group_start) {
197                xor_into(&mut parity, sym);
198            }
199            local_parities.push((g as u32, parity));
200        }
201
202        // Compute global parity (XOR of all source symbols).
203        let mut global_parity = vec![0u8; symbol_size];
204        for (_, sym) in &source_symbols {
205            xor_into(&mut global_parity, sym);
206        }
207
208        LrcEncodeResult {
209            source_symbols,
210            local_parities,
211            global_parity,
212            k_source: k as u32,
213            locality: r,
214            num_groups,
215        }
216    }
217
218    /// Attempt to repair missing symbols using local and global parities.
219    ///
220    /// `available` maps symbol index -> data for symbols that are present.
221    /// `missing` lists the indices of symbols that need repair.
222    /// Returns the repair outcome.
223    #[allow(clippy::cast_possible_truncation)]
224    pub fn repair(
225        &self,
226        encode_result: &LrcEncodeResult,
227        available: &std::collections::HashMap<u32, Vec<u8>>,
228        missing: &[u32],
229    ) -> Vec<LrcRepairOutcome> {
230        let r = encode_result.locality;
231        let k = encode_result.k_source as usize;
232        let mut outcomes = Vec::with_capacity(missing.len());
233
234        // Track which symbols have been repaired (so we can use them for
235        // subsequent repairs within the same call).
236        let mut repaired: std::collections::HashMap<u32, Vec<u8>> =
237            std::collections::HashMap::new();
238
239        for &miss_idx in missing {
240            // Determine which locality group this symbol belongs to.
241            let group_idx = miss_idx as usize / r;
242            let group_start = group_idx * r;
243            let group_end = ((group_idx + 1) * r).min(k);
244
245            // Count how many symbols are missing in this group.
246            let group_missing: Vec<u32> = (group_start as u32..group_end as u32)
247                .filter(|&i| !available.contains_key(&i) && !repaired.contains_key(&i))
248                .collect();
249
250            if group_missing.len() == 1 && group_missing[0] == miss_idx {
251                // Single missing symbol in the group -> local repair.
252                let local_parity = &encode_result.local_parities[group_idx].1;
253                let mut restored = local_parity.clone();
254
255                let mut syms_read = 1; // local parity
256                for i in group_start as u32..group_end as u32 {
257                    if i != miss_idx {
258                        let sym = available
259                            .get(&i)
260                            .or_else(|| repaired.get(&i))
261                            .expect("non-missing symbol should be available");
262                        xor_into(&mut restored, sym);
263                        syms_read += 1;
264                    }
265                }
266
267                LRC_LOCAL_REPAIRS_TOTAL.fetch_add(1, Ordering::Relaxed);
268                repaired.insert(miss_idx, restored.clone());
269                outcomes.push(LrcRepairOutcome::LocalRepair {
270                    symbol_index: miss_idx,
271                    group_index: group_idx as u32,
272                    symbols_read: syms_read,
273                    data: restored,
274                });
275            } else if missing.len() == 1 {
276                // Only one symbol missing total -> global repair.
277                let mut restored = encode_result.global_parity.clone();
278                let mut syms_read = 1; // global parity
279
280                for i in 0..k as u32 {
281                    if i != miss_idx {
282                        let sym = available
283                            .get(&i)
284                            .or_else(|| repaired.get(&i))
285                            .expect("non-missing symbol should be available");
286                        xor_into(&mut restored, sym);
287                        syms_read += 1;
288                    }
289                }
290
291                LRC_GLOBAL_REPAIRS_TOTAL.fetch_add(1, Ordering::Relaxed);
292                repaired.insert(miss_idx, restored.clone());
293                outcomes.push(LrcRepairOutcome::GlobalRepair {
294                    symbol_index: miss_idx,
295                    symbols_read: syms_read,
296                    data: restored,
297                });
298            } else {
299                // Multiple missing in the same group -> unrecoverable with
300                // simple XOR-based LRC.
301                outcomes.push(LrcRepairOutcome::Unrecoverable {
302                    missing: group_missing,
303                    reason: format!(
304                        "multiple erasures in locality group {group_idx}: need more advanced repair"
305                    ),
306                });
307            }
308        }
309
310        outcomes
311    }
312}
313
314impl fmt::Debug for LrcCodec {
315    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
316        f.debug_struct("LrcCodec")
317            .field("locality", &self.config.locality)
318            .finish()
319    }
320}
321
322/// XOR `src` into `dst` in place. Both must have the same length.
323fn xor_into(dst: &mut [u8], src: &[u8]) {
324    assert_eq!(dst.len(), src.len());
325    for (d, s) in dst.iter_mut().zip(src.iter()) {
326        *d ^= s;
327    }
328}
329
330#[cfg(test)]
331mod tests {
332    use super::*;
333    use std::collections::HashMap;
334
335    #[test]
336    fn basic_encode_decode() {
337        let codec = LrcCodec::new(LrcConfig { locality: 2 });
338        let data = b"Hello, LRC world! This is a test of local reconstruction codes.";
339        let result = codec.encode(data, 16);
340
341        assert_eq!(result.k_source, 4); // 64 bytes / 16 = 4 symbols
342        assert_eq!(result.num_groups, 2); // 4 symbols / 2 = 2 groups
343        assert_eq!(result.local_parities.len(), 2);
344        assert_eq!(result.global_parity.len(), 16);
345    }
346
347    #[test]
348    fn local_repair_single_failure() {
349        let codec = LrcCodec::new(LrcConfig { locality: 2 });
350        let data = vec![0xAA; 64];
351        let result = codec.encode(&data, 16);
352
353        // Remove symbol 0 (in group 0).
354        let mut available: HashMap<u32, Vec<u8>> = HashMap::new();
355        for &(idx, ref sym) in &result.source_symbols {
356            if idx != 0 {
357                available.insert(idx, sym.clone());
358            }
359        }
360
361        let outcomes = codec.repair(&result, &available, &[0]);
362        assert_eq!(outcomes.len(), 1);
363        match &outcomes[0] {
364            LrcRepairOutcome::LocalRepair {
365                symbol_index,
366                group_index,
367                data: repaired,
368                ..
369            } => {
370                assert_eq!(*symbol_index, 0);
371                assert_eq!(*group_index, 0);
372                assert_eq!(repaired, &result.source_symbols[0].1);
373            }
374            other => panic!("expected LocalRepair, got {other:?}"),
375        }
376    }
377
378    #[test]
379    fn global_repair_fallback() {
380        let codec = LrcCodec::new(LrcConfig { locality: 4 });
381        let data = vec![0xBB; 64];
382        let result = codec.encode(&data, 16);
383
384        // Remove symbol 0 (only missing symbol, but test global path
385        // by having it be the only missing symbol with group size = 4).
386        let mut available: HashMap<u32, Vec<u8>> = HashMap::new();
387        for &(idx, ref sym) in &result.source_symbols {
388            if idx != 0 {
389                available.insert(idx, sym.clone());
390            }
391        }
392
393        let outcomes = codec.repair(&result, &available, &[0]);
394        assert_eq!(outcomes.len(), 1);
395        match &outcomes[0] {
396            LrcRepairOutcome::LocalRepair { data: repaired, .. } => {
397                assert_eq!(repaired, &result.source_symbols[0].1);
398            }
399            other => panic!("expected LocalRepair with single missing in group, got {other:?}"),
400        }
401    }
402}