Skip to main content

bones_core/cache/
codec.rs

1//! Column codec trait and implementations for the binary cache format.
2//!
3//! Each column type has a dedicated codec that exploits the statistical
4//! properties of that column's data for compact encoding. All codecs
5//! implement the [`ColumnCodec`] trait.
6//!
7//! See `docs/binary-cache-format.md` for byte-level format documentation.
8
9use super::CacheError;
10use crate::event::EventType;
11
12// ---------------------------------------------------------------------------
13// ColumnCodec trait
14// ---------------------------------------------------------------------------
15
16/// Trait for encoding and decoding a single column of event data.
17///
18/// Each column type has its own codec implementation optimised for the
19/// data's statistical properties.
20pub trait ColumnCodec {
21    /// The decoded Rust type for elements in this column.
22    type Item;
23
24    /// Encode a slice of items into bytes.
25    ///
26    /// # Errors
27    ///
28    /// Returns [`CacheError`] if encoding fails.
29    fn encode(items: &[Self::Item], buf: &mut Vec<u8>) -> Result<(), CacheError>;
30
31    /// Decode `count` items from a byte slice.
32    ///
33    /// Returns the decoded items and the number of bytes consumed.
34    ///
35    /// # Errors
36    ///
37    /// Returns [`CacheError`] if decoding fails (truncated data, invalid
38    /// encoding, etc.).
39    fn decode(data: &[u8], count: usize) -> Result<(Vec<Self::Item>, usize), CacheError>;
40}
41
42// ---------------------------------------------------------------------------
43// Varint helpers
44// ---------------------------------------------------------------------------
45
46/// Encode an unsigned 64-bit value as LEB128.
47pub(crate) fn encode_varint(value: u64, buf: &mut Vec<u8>) {
48    let mut v = value;
49    loop {
50        let byte = (v & 0x7F) as u8;
51        v >>= 7;
52        if v == 0 {
53            buf.push(byte);
54            break;
55        }
56        buf.push(byte | 0x80);
57    }
58}
59
60/// Decode a LEB128-encoded unsigned varint from `data`, returning the value
61/// and bytes consumed.
62///
63/// # Errors
64///
65/// Returns [`CacheError::UnexpectedEof`] if the data is truncated.
66pub(crate) fn decode_varint(data: &[u8]) -> Result<(u64, usize), CacheError> {
67    let mut value: u64 = 0;
68    let mut shift = 0u32;
69    for (i, &byte) in data.iter().enumerate() {
70        let low = u64::from(byte & 0x7F);
71        value |= low << shift;
72        if byte & 0x80 == 0 {
73            return Ok((value, i + 1));
74        }
75        shift += 7;
76        if shift >= 64 {
77            return Err(CacheError::DataCorrupted(
78                "varint overflow: more than 9 bytes".into(),
79            ));
80        }
81    }
82    Err(CacheError::UnexpectedEof)
83}
84
85/// Zigzag-encode a signed value (maps negative numbers to odd positives).
86#[inline]
87pub(crate) const fn zigzag_encode(n: i64) -> u64 {
88    ((n << 1) ^ (n >> 63)).cast_unsigned()
89}
90
91/// Zigzag-decode a value produced by [`zigzag_encode`].
92#[inline]
93pub(crate) const fn zigzag_decode(n: u64) -> i64 {
94    (n >> 1).cast_signed() ^ -((n & 1).cast_signed())
95}
96
97// ---------------------------------------------------------------------------
98// TimestampCodec
99// ---------------------------------------------------------------------------
100
101/// Delta-encoded varint timestamps.
102///
103/// The first value is stored as an absolute little-endian `i64` (8 bytes).
104/// Subsequent values are stored as zigzag-encoded LEB128 varints representing
105/// the delta from the previous timestamp.
106pub struct TimestampCodec;
107
108impl ColumnCodec for TimestampCodec {
109    type Item = i64;
110
111    fn encode(items: &[i64], buf: &mut Vec<u8>) -> Result<(), CacheError> {
112        if items.is_empty() {
113            return Ok(());
114        }
115        // First value: absolute i64 little-endian
116        buf.extend_from_slice(&items[0].to_le_bytes());
117        // Subsequent values: zigzag delta varints
118        let mut prev = items[0];
119        for &ts in &items[1..] {
120            let delta = ts - prev;
121            encode_varint(zigzag_encode(delta), buf);
122            prev = ts;
123        }
124        Ok(())
125    }
126
127    fn decode(data: &[u8], count: usize) -> Result<(Vec<i64>, usize), CacheError> {
128        if count == 0 {
129            return Ok((vec![], 0));
130        }
131        if data.len() < 8 {
132            return Err(CacheError::UnexpectedEof);
133        }
134        let first = i64::from_le_bytes(data[..8].try_into().expect("slice is 8 bytes"));
135        let mut result = Vec::with_capacity(count);
136        result.push(first);
137
138        let mut pos = 8;
139        let mut prev = first;
140        for _ in 1..count {
141            if pos >= data.len() {
142                return Err(CacheError::UnexpectedEof);
143            }
144            let (zz, consumed) = decode_varint(&data[pos..])?;
145            let delta = zigzag_decode(zz);
146            let ts = prev + delta;
147            result.push(ts);
148            prev = ts;
149            pos += consumed;
150        }
151        Ok((result, pos))
152    }
153}
154
155// ---------------------------------------------------------------------------
156// InternedStringCodec
157// ---------------------------------------------------------------------------
158
159/// Interned string table with run-length-encoded `u16` references.
160///
161/// Used for agent IDs and parent hash lists, both of which have low
162/// cardinality and high repetition.
163///
164/// Layout:
165/// ```text
166/// [table_count: u32 LE]
167/// [len_0: u16 LE] [bytes_0...]
168/// ...
169/// [run_count: u32 LE]
170/// [run_len_0: u16 LE] [index_0: u16 LE]
171/// ...
172/// ```
173pub struct InternedStringCodec;
174
175impl ColumnCodec for InternedStringCodec {
176    type Item = String;
177
178    fn encode(items: &[String], buf: &mut Vec<u8>) -> Result<(), CacheError> {
179        // Build string table (insertion order = index)
180        let mut table: Vec<&str> = Vec::new();
181        let mut index_of: std::collections::HashMap<&str, u16> = std::collections::HashMap::new();
182        let mut indices: Vec<u16> = Vec::with_capacity(items.len());
183
184        for item in items {
185            let idx = if let Some(&i) = index_of.get(item.as_str()) {
186                i
187            } else {
188                let i = u16::try_from(table.len()).map_err(|_| {
189                    CacheError::DataCorrupted("string table exceeds 65535 entries".into())
190                })?;
191                table.push(item.as_str());
192                index_of.insert(item.as_str(), i);
193                i
194            };
195            indices.push(idx);
196        }
197
198        // Write string table
199        let table_count = u32::try_from(table.len())
200            .map_err(|_| CacheError::DataCorrupted("table too large".into()))?;
201        buf.extend_from_slice(&table_count.to_le_bytes());
202        for s in &table {
203            let len = u16::try_from(s.len())
204                .map_err(|_| CacheError::DataCorrupted("string too long for u16".into()))?;
205            buf.extend_from_slice(&len.to_le_bytes());
206            buf.extend_from_slice(s.as_bytes());
207        }
208
209        // Write RLE-encoded indices
210        let runs = rle_encode_u16(&indices);
211        let run_count = u32::try_from(runs.len())
212            .map_err(|_| CacheError::DataCorrupted("run count overflow".into()))?;
213        buf.extend_from_slice(&run_count.to_le_bytes());
214        for (run_len, idx) in &runs {
215            buf.extend_from_slice(&run_len.to_le_bytes());
216            buf.extend_from_slice(&idx.to_le_bytes());
217        }
218
219        Ok(())
220    }
221
222    fn decode(data: &[u8], count: usize) -> Result<(Vec<String>, usize), CacheError> {
223        let mut pos = 0;
224
225        // Read string table
226        if data.len() < 4 {
227            return Err(CacheError::UnexpectedEof);
228        }
229        let table_count =
230            u32::from_le_bytes(data[pos..pos + 4].try_into().expect("slice is 4 bytes")) as usize;
231        pos += 4;
232
233        let mut table: Vec<String> = Vec::with_capacity(table_count);
234        for _ in 0..table_count {
235            if pos + 2 > data.len() {
236                return Err(CacheError::UnexpectedEof);
237            }
238            let len = u16::from_le_bytes(data[pos..pos + 2].try_into().expect("slice is 2 bytes"))
239                as usize;
240            pos += 2;
241            if pos + len > data.len() {
242                return Err(CacheError::UnexpectedEof);
243            }
244            let s = std::str::from_utf8(&data[pos..pos + len])
245                .map_err(|e| CacheError::DataCorrupted(format!("invalid UTF-8 in string: {e}")))?
246                .to_string();
247            table.push(s);
248            pos += len;
249        }
250
251        // Read RLE runs
252        if pos + 4 > data.len() {
253            return Err(CacheError::UnexpectedEof);
254        }
255        let run_count =
256            u32::from_le_bytes(data[pos..pos + 4].try_into().expect("slice is 4 bytes")) as usize;
257        pos += 4;
258
259        let mut result = Vec::with_capacity(count);
260        for _ in 0..run_count {
261            if pos + 4 > data.len() {
262                return Err(CacheError::UnexpectedEof);
263            }
264            let run_len =
265                u16::from_le_bytes(data[pos..pos + 2].try_into().expect("slice is 2 bytes"))
266                    as usize;
267            pos += 2;
268            let idx = u16::from_le_bytes(data[pos..pos + 2].try_into().expect("slice is 2 bytes"))
269                as usize;
270            pos += 2;
271            let s = table.get(idx).ok_or_else(|| {
272                CacheError::DataCorrupted(format!("string index {idx} out of range"))
273            })?;
274            for _ in 0..run_len {
275                result.push(s.clone());
276            }
277        }
278
279        if result.len() != count {
280            return Err(CacheError::DataCorrupted(format!(
281                "expected {count} items, got {}",
282                result.len()
283            )));
284        }
285
286        Ok((result, pos))
287    }
288}
289
290// ---------------------------------------------------------------------------
291// EventTypeCodec
292// ---------------------------------------------------------------------------
293
294/// 4-bit RLE-encoded event types.
295///
296/// The 11 event types are mapped to values 0–10, packed two per byte (low
297/// nibble first), then the packed bytes are RLE-compressed with
298/// `[run_length: u8] [packed_byte: u8]` pairs.
299///
300/// Layout:
301/// ```text
302/// [packed_byte_count: u32 LE]
303/// [run_count: u32 LE]
304/// [run_len_0: u8] [packed_byte_0: u8]
305/// ...
306/// ```
307pub struct EventTypeCodec;
308
309impl EventTypeCodec {
310    const fn type_to_nibble(et: EventType) -> u8 {
311        match et {
312            EventType::Create => 0,
313            EventType::Update => 1,
314            EventType::Move => 2,
315            EventType::Assign => 3,
316            EventType::Comment => 4,
317            EventType::Link => 5,
318            EventType::Unlink => 6,
319            EventType::Delete => 7,
320            EventType::Compact => 8,
321            EventType::Snapshot => 9,
322            EventType::Redact => 10,
323        }
324    }
325
326    fn nibble_to_type(nibble: u8) -> Result<EventType, CacheError> {
327        match nibble {
328            0 => Ok(EventType::Create),
329            1 => Ok(EventType::Update),
330            2 => Ok(EventType::Move),
331            3 => Ok(EventType::Assign),
332            4 => Ok(EventType::Comment),
333            5 => Ok(EventType::Link),
334            6 => Ok(EventType::Unlink),
335            7 => Ok(EventType::Delete),
336            8 => Ok(EventType::Compact),
337            9 => Ok(EventType::Snapshot),
338            10 => Ok(EventType::Redact),
339            _ => Err(CacheError::DataCorrupted(format!(
340                "unknown event type nibble: {nibble}"
341            ))),
342        }
343    }
344}
345
346impl ColumnCodec for EventTypeCodec {
347    type Item = EventType;
348
349    fn encode(items: &[EventType], buf: &mut Vec<u8>) -> Result<(), CacheError> {
350        // Pack nibbles: two event types per byte, low nibble first
351        let packed_count = items.len().div_ceil(2);
352        let mut packed: Vec<u8> = Vec::with_capacity(packed_count);
353        for chunk in items.chunks(2) {
354            let lo = Self::type_to_nibble(chunk[0]);
355            let hi = if chunk.len() > 1 {
356                Self::type_to_nibble(chunk[1])
357            } else {
358                0x0F // padding nibble
359            };
360            packed.push(lo | (hi << 4));
361        }
362
363        // RLE over packed bytes
364        let runs = rle_encode_u8(&packed);
365
366        // Write packed_byte_count + RLE runs
367        let packed_count_u32 = u32::try_from(packed_count)
368            .map_err(|_| CacheError::DataCorrupted("too many events".into()))?;
369        buf.extend_from_slice(&packed_count_u32.to_le_bytes());
370
371        let run_count = u32::try_from(runs.len())
372            .map_err(|_| CacheError::DataCorrupted("run count overflow".into()))?;
373        buf.extend_from_slice(&run_count.to_le_bytes());
374        for (run_len, byte) in &runs {
375            buf.push(*run_len);
376            buf.push(*byte);
377        }
378
379        Ok(())
380    }
381
382    fn decode(data: &[u8], count: usize) -> Result<(Vec<EventType>, usize), CacheError> {
383        let mut pos = 0;
384
385        if pos + 4 > data.len() {
386            return Err(CacheError::UnexpectedEof);
387        }
388        let _packed_count =
389            u32::from_le_bytes(data[pos..pos + 4].try_into().expect("slice is 4 bytes")) as usize;
390        pos += 4;
391
392        if pos + 4 > data.len() {
393            return Err(CacheError::UnexpectedEof);
394        }
395        let run_count =
396            u32::from_le_bytes(data[pos..pos + 4].try_into().expect("slice is 4 bytes")) as usize;
397        pos += 4;
398
399        // Decode RLE runs into packed bytes
400        let mut packed: Vec<u8> = Vec::new();
401        for _ in 0..run_count {
402            if pos + 2 > data.len() {
403                return Err(CacheError::UnexpectedEof);
404            }
405            let run_len = data[pos] as usize;
406            let byte = data[pos + 1];
407            pos += 2;
408            for _ in 0..run_len {
409                packed.push(byte);
410            }
411        }
412
413        // Unpack nibbles into event types
414        let mut result = Vec::with_capacity(count);
415        for (i, &byte) in packed.iter().enumerate() {
416            if result.len() >= count {
417                break;
418            }
419            let lo = byte & 0x0F;
420            result.push(Self::nibble_to_type(lo)?);
421            if result.len() >= count {
422                break;
423            }
424            // Don't decode the hi nibble of the last byte if it's padding
425            let is_last = i == packed.len() - 1;
426            if !is_last || count.is_multiple_of(2) {
427                let hi = (byte >> 4) & 0x0F;
428                result.push(Self::nibble_to_type(hi)?);
429            }
430        }
431
432        if result.len() > count {
433            result.truncate(count);
434        }
435
436        if result.len() != count {
437            return Err(CacheError::DataCorrupted(format!(
438                "expected {count} event types, decoded {}",
439                result.len()
440            )));
441        }
442
443        Ok((result, pos))
444    }
445}
446
447// ---------------------------------------------------------------------------
448// ItemIdCodec
449// ---------------------------------------------------------------------------
450
451/// Dictionary-encoded item IDs with RLE-encoded u32 indices.
452///
453/// Similar to `InternedStringCodec` but uses `u32` indices to support
454/// repositories with more than 65535 unique item IDs.
455///
456/// Layout:
457/// ```text
458/// [dict_count: u32 LE]
459/// [len_0: u16 LE] [bytes_0...]
460/// ...
461/// [run_count: u32 LE]
462/// [run_len_0: u16 LE] [index_0: u32 LE]
463/// ...
464/// ```
465pub struct ItemIdCodec;
466
467impl ColumnCodec for ItemIdCodec {
468    type Item = String;
469
470    fn encode(items: &[String], buf: &mut Vec<u8>) -> Result<(), CacheError> {
471        // Build dictionary
472        let mut dict: Vec<&str> = Vec::new();
473        let mut index_of: std::collections::HashMap<&str, u32> = std::collections::HashMap::new();
474        let mut indices: Vec<u32> = Vec::with_capacity(items.len());
475
476        for item in items {
477            let idx = if let Some(&i) = index_of.get(item.as_str()) {
478                i
479            } else {
480                let i = u32::try_from(dict.len()).map_err(|_| {
481                    CacheError::DataCorrupted("item ID dict exceeds u32::MAX entries".into())
482                })?;
483                dict.push(item.as_str());
484                index_of.insert(item.as_str(), i);
485                i
486            };
487            indices.push(idx);
488        }
489
490        // Write dictionary
491        let dict_count = u32::try_from(dict.len())
492            .map_err(|_| CacheError::DataCorrupted("dict too large".into()))?;
493        buf.extend_from_slice(&dict_count.to_le_bytes());
494        for s in &dict {
495            let len = u16::try_from(s.len())
496                .map_err(|_| CacheError::DataCorrupted("item ID string too long".into()))?;
497            buf.extend_from_slice(&len.to_le_bytes());
498            buf.extend_from_slice(s.as_bytes());
499        }
500
501        // Write RLE-encoded u32 indices
502        let runs = rle_encode_u32(&indices);
503        let run_count = u32::try_from(runs.len())
504            .map_err(|_| CacheError::DataCorrupted("run count overflow".into()))?;
505        buf.extend_from_slice(&run_count.to_le_bytes());
506        for (run_len, idx) in &runs {
507            buf.extend_from_slice(&run_len.to_le_bytes());
508            buf.extend_from_slice(&idx.to_le_bytes());
509        }
510
511        Ok(())
512    }
513
514    fn decode(data: &[u8], count: usize) -> Result<(Vec<String>, usize), CacheError> {
515        let mut pos = 0;
516
517        // Read dictionary
518        if pos + 4 > data.len() {
519            return Err(CacheError::UnexpectedEof);
520        }
521        let dict_count =
522            u32::from_le_bytes(data[pos..pos + 4].try_into().expect("slice is 4 bytes")) as usize;
523        pos += 4;
524
525        let mut dict: Vec<String> = Vec::with_capacity(dict_count);
526        for _ in 0..dict_count {
527            if pos + 2 > data.len() {
528                return Err(CacheError::UnexpectedEof);
529            }
530            let len = u16::from_le_bytes(data[pos..pos + 2].try_into().expect("slice is 2 bytes"))
531                as usize;
532            pos += 2;
533            if pos + len > data.len() {
534                return Err(CacheError::UnexpectedEof);
535            }
536            let s = std::str::from_utf8(&data[pos..pos + len])
537                .map_err(|e| CacheError::DataCorrupted(format!("invalid UTF-8 in item ID: {e}")))?
538                .to_string();
539            dict.push(s);
540            pos += len;
541        }
542
543        // Read RLE runs
544        if pos + 4 > data.len() {
545            return Err(CacheError::UnexpectedEof);
546        }
547        let run_count =
548            u32::from_le_bytes(data[pos..pos + 4].try_into().expect("slice is 4 bytes")) as usize;
549        pos += 4;
550
551        let mut result = Vec::with_capacity(count);
552        for _ in 0..run_count {
553            if pos + 6 > data.len() {
554                return Err(CacheError::UnexpectedEof);
555            }
556            let run_len =
557                u16::from_le_bytes(data[pos..pos + 2].try_into().expect("slice is 2 bytes"))
558                    as usize;
559            pos += 2;
560            let idx = u32::from_le_bytes(data[pos..pos + 4].try_into().expect("slice is 4 bytes"))
561                as usize;
562            pos += 4;
563            let s = dict.get(idx).ok_or_else(|| {
564                CacheError::DataCorrupted(format!("item ID index {idx} out of range"))
565            })?;
566            for _ in 0..run_len {
567                result.push(s.clone());
568            }
569        }
570
571        if result.len() != count {
572            return Err(CacheError::DataCorrupted(format!(
573                "expected {count} item IDs, got {}",
574                result.len()
575            )));
576        }
577
578        Ok((result, pos))
579    }
580}
581
582// ---------------------------------------------------------------------------
583// RawBytesCodec
584// ---------------------------------------------------------------------------
585
586/// Length-prefixed raw byte strings.
587///
588/// Used for ITC stamps and other variable-length fields that don't benefit
589/// from string interning or dictionary encoding.
590///
591/// Layout:
592/// ```text
593/// [len_0: u16 LE] [bytes_0...]
594/// [len_1: u16 LE] [bytes_1...]
595/// ...
596/// ```
597pub struct RawBytesCodec;
598
599impl ColumnCodec for RawBytesCodec {
600    type Item = String;
601
602    fn encode(items: &[String], buf: &mut Vec<u8>) -> Result<(), CacheError> {
603        for s in items {
604            let len = u16::try_from(s.len()).map_err(|_| {
605                CacheError::DataCorrupted("string too long for u16 length prefix".into())
606            })?;
607            buf.extend_from_slice(&len.to_le_bytes());
608            buf.extend_from_slice(s.as_bytes());
609        }
610        Ok(())
611    }
612
613    fn decode(data: &[u8], count: usize) -> Result<(Vec<String>, usize), CacheError> {
614        let mut pos = 0;
615        let mut result = Vec::with_capacity(count);
616        for _ in 0..count {
617            if pos + 2 > data.len() {
618                return Err(CacheError::UnexpectedEof);
619            }
620            let len = u16::from_le_bytes(data[pos..pos + 2].try_into().expect("slice is 2 bytes"))
621                as usize;
622            pos += 2;
623            if pos + len > data.len() {
624                return Err(CacheError::UnexpectedEof);
625            }
626            let s = std::str::from_utf8(&data[pos..pos + len])
627                .map_err(|e| CacheError::DataCorrupted(format!("invalid UTF-8 in raw bytes: {e}")))?
628                .to_string();
629            result.push(s);
630            pos += len;
631        }
632        Ok((result, pos))
633    }
634}
635
636// ---------------------------------------------------------------------------
637// ValueCodec
638// ---------------------------------------------------------------------------
639
640/// Type-specific value encoding for event payloads.
641///
642/// Stores each event's JSON payload as a length-prefixed UTF-8 string with
643/// a `u32` length prefix to support payloads larger than 64 KiB.
644///
645/// Layout:
646/// ```text
647/// [len_0: u32 LE] [json_0: UTF-8]
648/// [len_1: u32 LE] [json_1: UTF-8]
649/// ...
650/// ```
651pub struct ValueCodec;
652
653impl ColumnCodec for ValueCodec {
654    type Item = String;
655
656    fn encode(items: &[String], buf: &mut Vec<u8>) -> Result<(), CacheError> {
657        for s in items {
658            let len = u32::try_from(s.len()).map_err(|_| {
659                CacheError::DataCorrupted("payload too large for u32 length prefix".into())
660            })?;
661            buf.extend_from_slice(&len.to_le_bytes());
662            buf.extend_from_slice(s.as_bytes());
663        }
664        Ok(())
665    }
666
667    fn decode(data: &[u8], count: usize) -> Result<(Vec<String>, usize), CacheError> {
668        let mut pos = 0;
669        let mut result = Vec::with_capacity(count);
670        for _ in 0..count {
671            if pos + 4 > data.len() {
672                return Err(CacheError::UnexpectedEof);
673            }
674            let len = u32::from_le_bytes(data[pos..pos + 4].try_into().expect("slice is 4 bytes"))
675                as usize;
676            pos += 4;
677            if pos + len > data.len() {
678                return Err(CacheError::UnexpectedEof);
679            }
680            let s = std::str::from_utf8(&data[pos..pos + len])
681                .map_err(|e| {
682                    CacheError::DataCorrupted(format!("invalid UTF-8 in value payload: {e}"))
683                })?
684                .to_string();
685            result.push(s);
686            pos += len;
687        }
688        Ok((result, pos))
689    }
690}
691
692// ---------------------------------------------------------------------------
693// RLE helpers
694// ---------------------------------------------------------------------------
695
696/// Run-length encode a `u8` slice into `(run_length, value)` pairs.
697fn rle_encode_u8(items: &[u8]) -> Vec<(u8, u8)> {
698    let mut runs: Vec<(u8, u8)> = Vec::new();
699    if items.is_empty() {
700        return runs;
701    }
702    let mut current = items[0];
703    let mut count: u8 = 1;
704    for &item in &items[1..] {
705        if item == current && count < u8::MAX {
706            count += 1;
707        } else {
708            runs.push((count, current));
709            current = item;
710            count = 1;
711        }
712    }
713    runs.push((count, current));
714    runs
715}
716
717/// Run-length encode a `u16` slice into `(run_length, value)` pairs.
718fn rle_encode_u16(items: &[u16]) -> Vec<(u16, u16)> {
719    let mut runs: Vec<(u16, u16)> = Vec::new();
720    if items.is_empty() {
721        return runs;
722    }
723    let mut current = items[0];
724    let mut count: u16 = 1;
725    for &item in &items[1..] {
726        if item == current && count < u16::MAX {
727            count += 1;
728        } else {
729            runs.push((count, current));
730            current = item;
731            count = 1;
732        }
733    }
734    runs.push((count, current));
735    runs
736}
737
738/// Run-length encode a `u32` slice into `(run_length, value)` pairs.
739fn rle_encode_u32(items: &[u32]) -> Vec<(u16, u32)> {
740    let mut runs: Vec<(u16, u32)> = Vec::new();
741    if items.is_empty() {
742        return runs;
743    }
744    let mut current = items[0];
745    let mut count: u16 = 1;
746    for &item in &items[1..] {
747        if item == current && count < u16::MAX {
748            count += 1;
749        } else {
750            runs.push((count, current));
751            current = item;
752            count = 1;
753        }
754    }
755    runs.push((count, current));
756    runs
757}
758
759// ---------------------------------------------------------------------------
760// Tests
761// ---------------------------------------------------------------------------
762
763#[cfg(test)]
764mod tests {
765    use super::*;
766    use crate::event::EventType;
767
768    // === Varint helpers ====================================================
769
770    #[test]
771    fn varint_roundtrip_small() {
772        for n in [0u64, 1, 127, 128, 255, 300, 16383, 16384, u32::MAX as u64] {
773            let mut buf = Vec::new();
774            encode_varint(n, &mut buf);
775            let (decoded, consumed) = decode_varint(&buf).expect("decode");
776            assert_eq!(decoded, n, "roundtrip failed for {n}");
777            assert_eq!(consumed, buf.len(), "consumed all bytes for {n}");
778        }
779    }
780
781    #[test]
782    fn varint_decode_truncated() {
783        // Varint with continuation bit set but no more bytes
784        let truncated = &[0x80u8];
785        assert!(matches!(
786            decode_varint(truncated),
787            Err(CacheError::UnexpectedEof)
788        ));
789    }
790
791    #[test]
792    fn zigzag_roundtrip() {
793        for n in [0i64, 1, -1, i64::MIN, i64::MAX, -1000, 1000] {
794            assert_eq!(zigzag_decode(zigzag_encode(n)), n, "zigzag failed for {n}");
795        }
796    }
797
798    // === TimestampCodec ====================================================
799
800    #[test]
801    fn timestamp_empty() {
802        let mut buf = Vec::new();
803        TimestampCodec::encode(&[], &mut buf).unwrap();
804        assert!(buf.is_empty());
805        let (decoded, consumed) = TimestampCodec::decode(&[], 0).unwrap();
806        assert!(decoded.is_empty());
807        assert_eq!(consumed, 0);
808    }
809
810    #[test]
811    fn timestamp_single() {
812        let ts = [1_708_012_200_000_000i64];
813        let mut buf = Vec::new();
814        TimestampCodec::encode(&ts, &mut buf).unwrap();
815        let (decoded, _) = TimestampCodec::decode(&buf, 1).unwrap();
816        assert_eq!(decoded, ts);
817    }
818
819    #[test]
820    fn timestamp_roundtrip_ascending() {
821        let timestamps: Vec<i64> = (0..100).map(|i| 1_700_000_000_000i64 + i * 1000).collect();
822        let mut buf = Vec::new();
823        TimestampCodec::encode(&timestamps, &mut buf).unwrap();
824        let (decoded, consumed) = TimestampCodec::decode(&buf, timestamps.len()).unwrap();
825        assert_eq!(decoded, timestamps);
826        assert_eq!(consumed, buf.len());
827    }
828
829    #[test]
830    fn timestamp_roundtrip_with_negative_delta() {
831        // Out-of-order timestamps (can happen in multi-writer scenarios)
832        let timestamps: Vec<i64> = vec![
833            1_700_000_000_000,
834            1_700_000_001_000,
835            1_700_000_000_500, // slight regression
836            1_700_000_002_000,
837        ];
838        let mut buf = Vec::new();
839        TimestampCodec::encode(&timestamps, &mut buf).unwrap();
840        let (decoded, _) = TimestampCodec::decode(&buf, timestamps.len()).unwrap();
841        assert_eq!(decoded, timestamps);
842    }
843
844    #[test]
845    fn timestamp_delta_encodes_compactly() {
846        // Ascending timestamps should encode smaller than 8 bytes each after first
847        let base: i64 = 1_700_000_000_000;
848        let timestamps: Vec<i64> = (0..10).map(|i| base + i * 1000).collect();
849        let mut buf = Vec::new();
850        TimestampCodec::encode(&timestamps, &mut buf).unwrap();
851        // 8 bytes for first + 9 delta varints (delta=1000 encodes in 2 bytes)
852        assert!(buf.len() < 8 + 9 * 4, "expected compact encoding");
853    }
854
855    // === InternedStringCodec ==============================================
856
857    #[test]
858    fn interned_string_empty() {
859        let mut buf = Vec::new();
860        InternedStringCodec::encode(&[], &mut buf).unwrap();
861        let (decoded, _) = InternedStringCodec::decode(&buf, 0).unwrap();
862        assert!(decoded.is_empty());
863    }
864
865    #[test]
866    fn interned_string_single() {
867        let items = vec!["claude-abc".to_string()];
868        let mut buf = Vec::new();
869        InternedStringCodec::encode(&items, &mut buf).unwrap();
870        let (decoded, _) = InternedStringCodec::decode(&buf, 1).unwrap();
871        assert_eq!(decoded, items);
872    }
873
874    #[test]
875    fn interned_string_roundtrip_repeated() {
876        let items: Vec<String> = [
877            "claude-abc",
878            "gemini-xyz",
879            "claude-abc",
880            "claude-abc",
881            "gemini-xyz",
882        ]
883        .iter()
884        .map(|s| s.to_string())
885        .collect();
886        let mut buf = Vec::new();
887        InternedStringCodec::encode(&items, &mut buf).unwrap();
888        let (decoded, consumed) = InternedStringCodec::decode(&buf, items.len()).unwrap();
889        assert_eq!(decoded, items);
890        assert_eq!(consumed, buf.len());
891    }
892
893    #[test]
894    fn interned_string_compresses_repeated_values() {
895        // 1000 entries: 500 "agent-one" followed by 500 "agent-two" → 2 RLE runs
896        let items: Vec<String> = (0..1000)
897            .map(|i| {
898                if i < 500 {
899                    "agent-one".to_string()
900                } else {
901                    "agent-two".to_string()
902                }
903            })
904            .collect();
905        let mut buf = Vec::new();
906        InternedStringCodec::encode(&items, &mut buf).unwrap();
907        // 2 strings (9 bytes each ≈ 22 bytes) + table header (4) + 2 runs (8 bytes) + run header (4)
908        // ≈ 38 bytes total vs 9000 bytes raw
909        assert!(
910            buf.len() < 60,
911            "should compress well: got {} bytes",
912            buf.len()
913        );
914        let (decoded, _) = InternedStringCodec::decode(&buf, 1000).unwrap();
915        assert_eq!(decoded, items);
916    }
917
918    // === EventTypeCodec ===================================================
919
920    #[test]
921    fn event_type_empty() {
922        let mut buf = Vec::new();
923        EventTypeCodec::encode(&[], &mut buf).unwrap();
924        let (decoded, _) = EventTypeCodec::decode(&buf, 0).unwrap();
925        assert!(decoded.is_empty());
926    }
927
928    #[test]
929    fn event_type_roundtrip_all_types() {
930        let items: Vec<EventType> = EventType::ALL.to_vec();
931        let mut buf = Vec::new();
932        EventTypeCodec::encode(&items, &mut buf).unwrap();
933        let (decoded, consumed) = EventTypeCodec::decode(&buf, items.len()).unwrap();
934        assert_eq!(decoded, items);
935        assert_eq!(consumed, buf.len());
936    }
937
938    #[test]
939    fn event_type_odd_count() {
940        // Odd count should not confuse padding nibble
941        let items = vec![EventType::Create, EventType::Update, EventType::Move];
942        let mut buf = Vec::new();
943        EventTypeCodec::encode(&items, &mut buf).unwrap();
944        let (decoded, _) = EventTypeCodec::decode(&buf, 3).unwrap();
945        assert_eq!(decoded, items);
946    }
947
948    #[test]
949    fn event_type_single() {
950        let items = vec![EventType::Comment];
951        let mut buf = Vec::new();
952        EventTypeCodec::encode(&items, &mut buf).unwrap();
953        let (decoded, _) = EventTypeCodec::decode(&buf, 1).unwrap();
954        assert_eq!(decoded, items);
955    }
956
957    #[test]
958    fn event_type_compresses_homogeneous_stream() {
959        // 1000 create events should RLE down to a handful of bytes
960        let items: Vec<EventType> = vec![EventType::Create; 1000];
961        let mut buf = Vec::new();
962        EventTypeCodec::encode(&items, &mut buf).unwrap();
963        // 500 packed bytes RLE into ~5 runs, each 2 bytes = ~10 bytes + headers
964        assert!(
965            buf.len() < 50,
966            "expected compact encoding: {} bytes",
967            buf.len()
968        );
969        let (decoded, _) = EventTypeCodec::decode(&buf, 1000).unwrap();
970        assert_eq!(decoded, items);
971    }
972
973    // === ItemIdCodec ======================================================
974
975    #[test]
976    fn item_id_empty() {
977        let mut buf = Vec::new();
978        ItemIdCodec::encode(&[], &mut buf).unwrap();
979        let (decoded, _) = ItemIdCodec::decode(&buf, 0).unwrap();
980        assert!(decoded.is_empty());
981    }
982
983    #[test]
984    fn item_id_roundtrip() {
985        let items: Vec<String> = vec![
986            "bn-a7x".to_string(),
987            "bn-b8y".to_string(),
988            "bn-a7x".to_string(),
989            "bn-c9z".to_string(),
990            "bn-a7x".to_string(),
991        ];
992        let mut buf = Vec::new();
993        ItemIdCodec::encode(&items, &mut buf).unwrap();
994        let (decoded, consumed) = ItemIdCodec::decode(&buf, items.len()).unwrap();
995        assert_eq!(decoded, items);
996        assert_eq!(consumed, buf.len());
997    }
998
999    // === RawBytesCodec ====================================================
1000
1001    #[test]
1002    fn raw_bytes_empty() {
1003        let mut buf = Vec::new();
1004        RawBytesCodec::encode(&[], &mut buf).unwrap();
1005        let (decoded, _) = RawBytesCodec::decode(&buf, 0).unwrap();
1006        assert!(decoded.is_empty());
1007    }
1008
1009    #[test]
1010    fn raw_bytes_roundtrip() {
1011        let items: Vec<String> = vec![
1012            "itc:AQ".to_string(),
1013            "itc:AQ.1".to_string(),
1014            "itc:Bg".to_string(),
1015        ];
1016        let mut buf = Vec::new();
1017        RawBytesCodec::encode(&items, &mut buf).unwrap();
1018        let (decoded, consumed) = RawBytesCodec::decode(&buf, items.len()).unwrap();
1019        assert_eq!(decoded, items);
1020        assert_eq!(consumed, buf.len());
1021    }
1022
1023    // === ValueCodec =======================================================
1024
1025    #[test]
1026    fn value_codec_empty() {
1027        let mut buf = Vec::new();
1028        ValueCodec::encode(&[], &mut buf).unwrap();
1029        let (decoded, _) = ValueCodec::decode(&buf, 0).unwrap();
1030        assert!(decoded.is_empty());
1031    }
1032
1033    #[test]
1034    fn value_codec_roundtrip() {
1035        let items: Vec<String> = vec![
1036            r#"{"title":"Fix auth retry","kind":"task"}"#.to_string(),
1037            r#"{"field":"title","value":"New title"}"#.to_string(),
1038            r#"{"state":"doing"}"#.to_string(),
1039        ];
1040        let mut buf = Vec::new();
1041        ValueCodec::encode(&items, &mut buf).unwrap();
1042        let (decoded, consumed) = ValueCodec::decode(&buf, items.len()).unwrap();
1043        assert_eq!(decoded, items);
1044        assert_eq!(consumed, buf.len());
1045    }
1046
1047    #[test]
1048    fn value_codec_large_payload() {
1049        // Simulate a snapshot event with a large JSON payload
1050        let big = "x".repeat(100_000);
1051        let items = vec![big.clone()];
1052        let mut buf = Vec::new();
1053        ValueCodec::encode(&items, &mut buf).unwrap();
1054        let (decoded, _) = ValueCodec::decode(&buf, 1).unwrap();
1055        assert_eq!(decoded[0], big);
1056    }
1057
1058    // === RLE helpers ======================================================
1059
1060    #[test]
1061    fn rle_u8_empty() {
1062        let runs = rle_encode_u8(&[]);
1063        assert!(runs.is_empty());
1064    }
1065
1066    #[test]
1067    fn rle_u8_single_run() {
1068        let runs = rle_encode_u8(&[42, 42, 42]);
1069        assert_eq!(runs, vec![(3, 42)]);
1070    }
1071
1072    #[test]
1073    fn rle_u8_mixed_runs() {
1074        let runs = rle_encode_u8(&[1, 1, 2, 3, 3, 3]);
1075        assert_eq!(runs, vec![(2, 1), (1, 2), (3, 3)]);
1076    }
1077
1078    #[test]
1079    fn rle_u16_roundtrip() {
1080        let items = vec![0u16, 0, 1, 1, 1, 2, 2];
1081        let runs = rle_encode_u16(&items);
1082        // Decode manually
1083        let mut decoded = Vec::new();
1084        for (count, val) in &runs {
1085            for _ in 0..*count {
1086                decoded.push(*val);
1087            }
1088        }
1089        assert_eq!(decoded, items);
1090    }
1091}