nodedb_array/codec/
coord_rle.rs1use crate::codec::coord_delta::{decode_coord_axis, encode_coord_axis};
19use crate::codec::limits::{
20 MAX_CELLS_PER_TILE, MAX_DICT_CARDINALITY, MAX_RLE_RUN_LEN, MAX_RLE_RUNS, check_decoded_size,
21};
22use crate::error::{ArrayError, ArrayResult};
23use crate::tile::sparse_tile::DimDict;
24use crate::types::coord::value::CoordValue;
25
26const RLE_BENEFIT_NUMERATOR: usize = 1;
29const RLE_BENEFIT_DENOMINATOR: usize = 2;
30
31fn should_use_rle(indices: &[u32]) -> bool {
32 if indices.len() < 4 {
33 return false;
34 }
35 let mut run_count = 1usize;
36 for i in 1..indices.len() {
37 if indices[i] != indices[i - 1] {
38 run_count += 1;
39 }
40 }
41 run_count * RLE_BENEFIT_DENOMINATOR < indices.len() * RLE_BENEFIT_NUMERATOR
45}
46
47const RLE_MARKER: u32 = u32::MAX;
51
52pub fn encode_coord_axis_rle(dict: &DimDict, out: &mut Vec<u8>) -> ArrayResult<()> {
54 if should_use_rle(&dict.indices) {
55 encode_rle(dict, out)
56 } else {
57 encode_coord_axis(dict, out)
58 }
59}
60
61fn encode_rle(dict: &DimDict, out: &mut Vec<u8>) -> ArrayResult<()> {
62 out.extend_from_slice(&RLE_MARKER.to_le_bytes());
63
64 let mut runs: Vec<(u32, u32)> = Vec::new(); let mut i = 0;
67 while i < dict.indices.len() {
68 let val = dict.indices[i];
69 let mut len = 1u32;
70 while i + (len as usize) < dict.indices.len() && dict.indices[i + (len as usize)] == val {
71 len += 1;
72 }
73 runs.push((val, len));
74 i += len as usize;
75 }
76
77 out.extend_from_slice(&(runs.len() as u32).to_le_bytes());
78 for (val, len) in &runs {
79 out.extend_from_slice(&val.to_le_bytes());
80 out.extend_from_slice(&len.to_le_bytes());
81 }
82
83 out.extend_from_slice(&(dict.values.len() as u32).to_le_bytes());
85 for cv in &dict.values {
86 let bytes = zerompk::to_msgpack_vec(cv).map_err(|e| ArrayError::SegmentCorruption {
87 detail: format!("rle coord dict encode: {e}"),
88 })?;
89 out.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
90 out.extend_from_slice(&bytes);
91 }
92
93 Ok(())
94}
95
96pub fn decode_coord_axis_rle(data: &[u8], pos: &mut usize) -> ArrayResult<DimDict> {
98 if *pos + 4 > data.len() {
99 return Err(ArrayError::SegmentCorruption {
100 detail: "rle coord: truncated mode marker".into(),
101 });
102 }
103 let first_u32 = u32::from_le_bytes(
104 data[*pos..*pos + 4]
105 .try_into()
106 .expect("invariant: bounds check at line 96 guarantees 4 bytes available"),
107 );
108
109 if first_u32 == RLE_MARKER {
110 *pos += 4;
111 decode_rle(data, pos)
112 } else {
113 decode_coord_axis(data, pos)
115 }
116}
117
118fn decode_rle(data: &[u8], pos: &mut usize) -> ArrayResult<DimDict> {
119 if *pos + 4 > data.len() {
120 return Err(ArrayError::SegmentCorruption {
121 detail: "rle coord: truncated run count".into(),
122 });
123 }
124 let run_count = u32::from_le_bytes(
125 data[*pos..*pos + 4]
126 .try_into()
127 .expect("invariant: bounds check at line 113 guarantees 4 bytes available"),
128 ) as usize;
129 *pos += 4;
130 check_decoded_size(run_count, MAX_RLE_RUNS, "rle run_count")?;
131
132 let mut indices: Vec<u32> = Vec::new();
133 let mut total_len: usize = 0;
134 for _ in 0..run_count {
135 if *pos + 8 > data.len() {
136 return Err(ArrayError::SegmentCorruption {
137 detail: "rle coord: truncated run entry".into(),
138 });
139 }
140 let val =
141 u32::from_le_bytes(data[*pos..*pos + 4].try_into().expect(
142 "invariant: bounds check at line 125 guarantees 8 bytes available; first 4",
143 ));
144 *pos += 4;
145 let len =
146 u32::from_le_bytes(data[*pos..*pos + 4].try_into().expect(
147 "invariant: bounds check at line 125 guarantees 8 bytes available; second 4",
148 )) as usize;
149 *pos += 4;
150 check_decoded_size(len, MAX_RLE_RUN_LEN, "rle run len")?;
151 total_len = total_len.saturating_add(len);
152 check_decoded_size(total_len, MAX_CELLS_PER_TILE, "rle indices total_len")?;
153 for _ in 0..len {
154 indices.push(val);
155 }
156 }
157
158 if *pos + 4 > data.len() {
160 return Err(ArrayError::SegmentCorruption {
161 detail: "rle coord: truncated dict count".into(),
162 });
163 }
164 let dict_count = u32::from_le_bytes(
165 data[*pos..*pos + 4]
166 .try_into()
167 .expect("invariant: bounds check at line 143 guarantees 4 bytes available"),
168 ) as usize;
169 *pos += 4;
170 check_decoded_size(dict_count, MAX_DICT_CARDINALITY, "rle dict_count")?;
171
172 let mut values: Vec<CoordValue> = Vec::with_capacity(dict_count);
173 for _ in 0..dict_count {
174 if *pos + 4 > data.len() {
175 return Err(ArrayError::SegmentCorruption {
176 detail: "rle coord: truncated dict entry len".into(),
177 });
178 }
179 let len = u32::from_le_bytes(
180 data[*pos..*pos + 4]
181 .try_into()
182 .expect("invariant: bounds check at line 154 guarantees 4 bytes available"),
183 ) as usize;
184 *pos += 4;
185 if *pos + len > data.len() {
186 return Err(ArrayError::SegmentCorruption {
187 detail: "rle coord: truncated dict entry bytes".into(),
188 });
189 }
190 let cv: CoordValue = zerompk::from_msgpack(&data[*pos..*pos + len]).map_err(|e| {
191 ArrayError::SegmentCorruption {
192 detail: format!("rle coord dict decode: {e}"),
193 }
194 })?;
195 *pos += len;
196 values.push(cv);
197 }
198
199 Ok(DimDict { values, indices })
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205 use crate::types::coord::value::CoordValue;
206
207 fn make_dict(indices: Vec<u32>, values: Vec<CoordValue>) -> DimDict {
208 DimDict { values, indices }
209 }
210
211 fn roundtrip(dict: &DimDict) -> DimDict {
212 let mut buf = Vec::new();
213 encode_coord_axis_rle(dict, &mut buf).unwrap();
214 let mut pos = 0;
215 decode_coord_axis_rle(&buf, &mut pos).unwrap()
216 }
217
218 #[test]
219 fn empty_dict_roundtrip() {
220 let d = make_dict(vec![], vec![]);
221 let out = roundtrip(&d);
222 assert_eq!(out.indices, d.indices);
223 assert_eq!(out.values, d.values);
224 }
225
226 #[test]
227 fn single_entry_roundtrip() {
228 let d = make_dict(vec![0], vec![CoordValue::Int64(5)]);
229 let out = roundtrip(&d);
230 assert_eq!(out.indices, d.indices);
231 assert_eq!(out.values, d.values);
232 }
233
234 #[test]
235 fn long_run_uses_rle() {
236 let d = make_dict(vec![0u32; 1000], vec![CoordValue::Int64(42)]);
238 let mut buf = Vec::new();
239 encode_coord_axis_rle(&d, &mut buf).unwrap();
240 let first = u32::from_le_bytes(buf[0..4].try_into().unwrap());
242 assert_eq!(first, u32::MAX, "expected RLE mode marker");
243
244 let out = roundtrip(&d);
245 assert_eq!(out.indices, d.indices);
246 }
247
248 #[test]
249 fn no_runs_falls_back_to_delta() {
250 let values: Vec<CoordValue> = (0..10).map(CoordValue::Int64).collect();
252 let indices: Vec<u32> = (0..10).collect();
253 let d = make_dict(indices, values);
254 let mut buf = Vec::new();
255 encode_coord_axis_rle(&d, &mut buf).unwrap();
256 let first = u32::from_le_bytes(buf[0..4].try_into().unwrap());
258 assert_ne!(first, u32::MAX, "should be delta mode");
259
260 let out = roundtrip(&d);
261 assert_eq!(out.indices, d.indices);
262 }
263
264 #[test]
265 fn mixed_runs_roundtrip() {
266 let mut indices = vec![0u32; 500];
268 indices.extend(vec![1u32; 500]);
269 let d = make_dict(indices, vec![CoordValue::Int64(1), CoordValue::Int64(2)]);
270 let out = roundtrip(&d);
271 assert_eq!(out.indices, d.indices);
272 assert_eq!(out.values, d.values);
273 }
274
275 #[test]
276 fn large_rle_roundtrip() {
277 let d = make_dict(vec![0u32; 100_000], vec![CoordValue::Int64(99)]);
278 let out = roundtrip(&d);
279 assert_eq!(out.indices.len(), 100_000);
280 assert!(out.indices.iter().all(|&i| i == 0));
281 }
282
283 #[test]
284 fn timestamp_coord_roundtrip() {
285 let values = vec![
286 CoordValue::TimestampMs(1_000_000),
287 CoordValue::TimestampMs(2_000_000),
288 ];
289 let indices: Vec<u32> = (0..10).map(|i| i % 2).collect();
290 let d = make_dict(indices, values);
291 let out = roundtrip(&d);
292 assert_eq!(out.indices, d.indices);
293 assert_eq!(out.values, d.values);
294 }
295}