Skip to main content

nodedb_array/codec/
coord_rle.rs

1// SPDX-License-Identifier: Apache-2.0
2
3// Leading-axis run-length compression for DimDict index streams.
4//
5// When the leading axis has long runs of the same dict index (common in
6// genomic tiles where chrom is constant across thousands of positions),
7// RLE saves ~75 % vs raw indices. For tiles with no runs longer than 2,
8// we fall back to the delta codec so overhead is bounded.
9//
10// Wire format:
11//   RLE mode:   [u32 LE = 0xFFFF_FFFF marker][u32 LE run_count] ...
12//   Delta mode: delegates to coord_delta::encode_coord_axis / decode_coord_axis
13//
14// The RLE marker 0xFFFF_FFFF is safe because DimDict cardinality fits in
15// a u32 but realistic tile dicts never exceed ~10M distinct values; an
16// all-ones 4-byte value is reserved as the RLE sentinel.
17
18use crate::codec::coord_delta::{decode_coord_axis, encode_coord_axis};
19use crate::codec::limits::{
20    MAX_CELLS_PER_TILE, MAX_DICT_CARDINALITY, MAX_RLE_RUN_LEN, MAX_RLE_RUNS, check_decoded_size,
21};
22use crate::error::{ArrayError, ArrayResult};
23use crate::tile::sparse_tile::DimDict;
24use crate::types::coord::value::CoordValue;
25
26/// Minimum ratio of run savings to total indices before RLE is preferred.
27/// Using integer arithmetic: savings_cells / total_cells >= 1/2.
28const RLE_BENEFIT_NUMERATOR: usize = 1;
29const RLE_BENEFIT_DENOMINATOR: usize = 2;
30
31fn should_use_rle(indices: &[u32]) -> bool {
32    if indices.len() < 4 {
33        return false;
34    }
35    let mut run_count = 1usize;
36    for i in 1..indices.len() {
37        if indices[i] != indices[i - 1] {
38            run_count += 1;
39        }
40    }
41    // RLE cells needed: run_count * 8 bytes (value + length pairs)
42    // Delta cells approx: indices.len() * 1.5 bytes average
43    // Use simpler heuristic: worth it if run_count < indices.len() / 2
44    run_count * RLE_BENEFIT_DENOMINATOR < indices.len() * RLE_BENEFIT_NUMERATOR
45}
46
47/// RLE mode sentinel: u32::MAX. This value can never appear as a dict count
48/// because a DimDict with u32::MAX distinct values would require ~16 GB of
49/// coordinate storage in the tile before it reaches this codec.
50const RLE_MARKER: u32 = u32::MAX;
51
52/// Encode a DimDict using RLE if beneficial, falling back to delta otherwise.
53pub fn encode_coord_axis_rle(dict: &DimDict, out: &mut Vec<u8>) -> ArrayResult<()> {
54    if should_use_rle(&dict.indices) {
55        encode_rle(dict, out)
56    } else {
57        encode_coord_axis(dict, out)
58    }
59}
60
61fn encode_rle(dict: &DimDict, out: &mut Vec<u8>) -> ArrayResult<()> {
62    out.extend_from_slice(&RLE_MARKER.to_le_bytes());
63
64    // Compute runs over index stream.
65    let mut runs: Vec<(u32, u32)> = Vec::new(); // (value, length)
66    let mut i = 0;
67    while i < dict.indices.len() {
68        let val = dict.indices[i];
69        let mut len = 1u32;
70        while i + (len as usize) < dict.indices.len() && dict.indices[i + (len as usize)] == val {
71            len += 1;
72        }
73        runs.push((val, len));
74        i += len as usize;
75    }
76
77    out.extend_from_slice(&(runs.len() as u32).to_le_bytes());
78    for (val, len) in &runs {
79        out.extend_from_slice(&val.to_le_bytes());
80        out.extend_from_slice(&len.to_le_bytes());
81    }
82
83    // Dict values.
84    out.extend_from_slice(&(dict.values.len() as u32).to_le_bytes());
85    for cv in &dict.values {
86        let bytes = zerompk::to_msgpack_vec(cv).map_err(|e| ArrayError::SegmentCorruption {
87            detail: format!("rle coord dict encode: {e}"),
88        })?;
89        out.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
90        out.extend_from_slice(&bytes);
91    }
92
93    Ok(())
94}
95
96/// Decode a DimDict from bytes previously encoded by `encode_coord_axis_rle`.
97pub fn decode_coord_axis_rle(data: &[u8], pos: &mut usize) -> ArrayResult<DimDict> {
98    if *pos + 4 > data.len() {
99        return Err(ArrayError::SegmentCorruption {
100            detail: "rle coord: truncated mode marker".into(),
101        });
102    }
103    let first_u32 = u32::from_le_bytes(
104        data[*pos..*pos + 4]
105            .try_into()
106            .expect("invariant: bounds check at line 96 guarantees 4 bytes available"),
107    );
108
109    if first_u32 == RLE_MARKER {
110        *pos += 4;
111        decode_rle(data, pos)
112    } else {
113        // Delta mode — don't advance pos; coord_delta reads dict_count first.
114        decode_coord_axis(data, pos)
115    }
116}
117
118fn decode_rle(data: &[u8], pos: &mut usize) -> ArrayResult<DimDict> {
119    if *pos + 4 > data.len() {
120        return Err(ArrayError::SegmentCorruption {
121            detail: "rle coord: truncated run count".into(),
122        });
123    }
124    let run_count = u32::from_le_bytes(
125        data[*pos..*pos + 4]
126            .try_into()
127            .expect("invariant: bounds check at line 113 guarantees 4 bytes available"),
128    ) as usize;
129    *pos += 4;
130    check_decoded_size(run_count, MAX_RLE_RUNS, "rle run_count")?;
131
132    let mut indices: Vec<u32> = Vec::new();
133    let mut total_len: usize = 0;
134    for _ in 0..run_count {
135        if *pos + 8 > data.len() {
136            return Err(ArrayError::SegmentCorruption {
137                detail: "rle coord: truncated run entry".into(),
138            });
139        }
140        let val =
141            u32::from_le_bytes(data[*pos..*pos + 4].try_into().expect(
142                "invariant: bounds check at line 125 guarantees 8 bytes available; first 4",
143            ));
144        *pos += 4;
145        let len =
146            u32::from_le_bytes(data[*pos..*pos + 4].try_into().expect(
147                "invariant: bounds check at line 125 guarantees 8 bytes available; second 4",
148            )) as usize;
149        *pos += 4;
150        check_decoded_size(len, MAX_RLE_RUN_LEN, "rle run len")?;
151        total_len = total_len.saturating_add(len);
152        check_decoded_size(total_len, MAX_CELLS_PER_TILE, "rle indices total_len")?;
153        for _ in 0..len {
154            indices.push(val);
155        }
156    }
157
158    // Dict values.
159    if *pos + 4 > data.len() {
160        return Err(ArrayError::SegmentCorruption {
161            detail: "rle coord: truncated dict count".into(),
162        });
163    }
164    let dict_count = u32::from_le_bytes(
165        data[*pos..*pos + 4]
166            .try_into()
167            .expect("invariant: bounds check at line 143 guarantees 4 bytes available"),
168    ) as usize;
169    *pos += 4;
170    check_decoded_size(dict_count, MAX_DICT_CARDINALITY, "rle dict_count")?;
171
172    let mut values: Vec<CoordValue> = Vec::with_capacity(dict_count);
173    for _ in 0..dict_count {
174        if *pos + 4 > data.len() {
175            return Err(ArrayError::SegmentCorruption {
176                detail: "rle coord: truncated dict entry len".into(),
177            });
178        }
179        let len = u32::from_le_bytes(
180            data[*pos..*pos + 4]
181                .try_into()
182                .expect("invariant: bounds check at line 154 guarantees 4 bytes available"),
183        ) as usize;
184        *pos += 4;
185        if *pos + len > data.len() {
186            return Err(ArrayError::SegmentCorruption {
187                detail: "rle coord: truncated dict entry bytes".into(),
188            });
189        }
190        let cv: CoordValue = zerompk::from_msgpack(&data[*pos..*pos + len]).map_err(|e| {
191            ArrayError::SegmentCorruption {
192                detail: format!("rle coord dict decode: {e}"),
193            }
194        })?;
195        *pos += len;
196        values.push(cv);
197    }
198
199    Ok(DimDict { values, indices })
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205    use crate::types::coord::value::CoordValue;
206
207    fn make_dict(indices: Vec<u32>, values: Vec<CoordValue>) -> DimDict {
208        DimDict { values, indices }
209    }
210
211    fn roundtrip(dict: &DimDict) -> DimDict {
212        let mut buf = Vec::new();
213        encode_coord_axis_rle(dict, &mut buf).unwrap();
214        let mut pos = 0;
215        decode_coord_axis_rle(&buf, &mut pos).unwrap()
216    }
217
218    #[test]
219    fn empty_dict_roundtrip() {
220        let d = make_dict(vec![], vec![]);
221        let out = roundtrip(&d);
222        assert_eq!(out.indices, d.indices);
223        assert_eq!(out.values, d.values);
224    }
225
226    #[test]
227    fn single_entry_roundtrip() {
228        let d = make_dict(vec![0], vec![CoordValue::Int64(5)]);
229        let out = roundtrip(&d);
230        assert_eq!(out.indices, d.indices);
231        assert_eq!(out.values, d.values);
232    }
233
234    #[test]
235    fn long_run_uses_rle() {
236        // 1000 cells all pointing at index 0 — should trigger RLE.
237        let d = make_dict(vec![0u32; 1000], vec![CoordValue::Int64(42)]);
238        let mut buf = Vec::new();
239        encode_coord_axis_rle(&d, &mut buf).unwrap();
240        // RLE mode: first 4 bytes are the RLE_MARKER sentinel.
241        let first = u32::from_le_bytes(buf[0..4].try_into().unwrap());
242        assert_eq!(first, u32::MAX, "expected RLE mode marker");
243
244        let out = roundtrip(&d);
245        assert_eq!(out.indices, d.indices);
246    }
247
248    #[test]
249    fn no_runs_falls_back_to_delta() {
250        // All distinct coords — no benefit from RLE.
251        let values: Vec<CoordValue> = (0..10).map(CoordValue::Int64).collect();
252        let indices: Vec<u32> = (0..10).collect();
253        let d = make_dict(indices, values);
254        let mut buf = Vec::new();
255        encode_coord_axis_rle(&d, &mut buf).unwrap();
256        // Delta mode: first 4 bytes are dict_count (10), not the RLE marker.
257        let first = u32::from_le_bytes(buf[0..4].try_into().unwrap());
258        assert_ne!(first, u32::MAX, "should be delta mode");
259
260        let out = roundtrip(&d);
261        assert_eq!(out.indices, d.indices);
262    }
263
264    #[test]
265    fn mixed_runs_roundtrip() {
266        // Two chromosomes, each repeated 500 times.
267        let mut indices = vec![0u32; 500];
268        indices.extend(vec![1u32; 500]);
269        let d = make_dict(indices, vec![CoordValue::Int64(1), CoordValue::Int64(2)]);
270        let out = roundtrip(&d);
271        assert_eq!(out.indices, d.indices);
272        assert_eq!(out.values, d.values);
273    }
274
275    #[test]
276    fn large_rle_roundtrip() {
277        let d = make_dict(vec![0u32; 100_000], vec![CoordValue::Int64(99)]);
278        let out = roundtrip(&d);
279        assert_eq!(out.indices.len(), 100_000);
280        assert!(out.indices.iter().all(|&i| i == 0));
281    }
282
283    #[test]
284    fn timestamp_coord_roundtrip() {
285        let values = vec![
286            CoordValue::TimestampMs(1_000_000),
287            CoordValue::TimestampMs(2_000_000),
288        ];
289        let indices: Vec<u32> = (0..10).map(|i| i % 2).collect();
290        let d = make_dict(indices, values);
291        let out = roundtrip(&d);
292        assert_eq!(out.indices, d.indices);
293        assert_eq!(out.values, d.values);
294    }
295}