Skip to main content

bones_core/cache/
mod.rs

1//! Binary columnar event cache.
2//!
3//! This module implements the binary cache format described in
4//! `docs/binary-cache-format.md`. The cache is a derived, read-optimised
5//! representation of the TSJSON event log. It is not authoritative — the
6//! `.events` files are the source of truth — but enables sub-millisecond
7//! replay on large repositories.
8//!
9//! # Module layout
10//!
11//! - [`codec`] — [`ColumnCodec`] trait and per-column codec implementations.
12//! - [`columns`] — [`CacheColumns`] intermediate representation.
13//! - [`CacheHeader`] — file header struct (this module).
14//! - [`CacheError`] — error type (this module).
15//!
16//! # Usage sketch
17//!
18//! ```rust,no_run
19//! use bones_core::cache::{CacheHeader, CacheColumns};
20//!
21//! // Build from events and encode to bytes:
22//! // let cols = CacheColumns::from_events(&events)?;
23//! // let header = CacheHeader::new(events.len() as u64);
24//! // let bytes = header.encode(&cols)?;
25//!
26//! // Decode from bytes:
27//! // let (header, cols) = CacheHeader::decode(&bytes)?;
28//! // let events = cols.into_events()?;
29//! ```
30
31pub mod codec;
32pub mod columns;
33pub mod manager;
34pub mod reader;
35pub mod writer;
36
37pub use codec::{
38    ColumnCodec, EventTypeCodec, InternedStringCodec, ItemIdCodec, RawBytesCodec, TimestampCodec,
39    ValueCodec,
40};
41pub use columns::{COLUMN_COUNT, CacheColumns, ColumnRow};
42pub use manager::{CacheManager, LoadResult, LoadSource};
43pub use reader::{CacheReader, CacheReaderError};
44pub use writer::{CacheStats, CacheWriter, rebuild_cache};
45
46use crate::event::Event;
47use columns::{
48    COL_AGENTS, COL_EVENT_TYPES, COL_ITC, COL_ITEM_IDS, COL_PARENTS, COL_TIMESTAMPS, COL_VALUES,
49};
50
51// ---------------------------------------------------------------------------
52// Magic bytes and format constants
53// ---------------------------------------------------------------------------
54
55/// The four magic bytes at the start of every cache file.
56pub const CACHE_MAGIC: [u8; 4] = *b"BNCH";
57
58/// The current format version written to new cache files.
59pub const CACHE_VERSION: u8 = 1;
60
61/// File header size in bytes (fixed).
62///
63/// Layout:
64/// - 4 bytes: magic
65/// - 1 byte:  version
66/// - 1 byte:  `column_count`
67/// - 2 bytes: reserved (must be zero)
68/// - 8 bytes: `row_count`
69/// - 8 bytes: `created_at_us`
70/// - 8 bytes: `data_crc64`
71///
72/// = 32 bytes total
73pub const HEADER_SIZE: usize = 32;
74
75// ---------------------------------------------------------------------------
76// Error type
77// ---------------------------------------------------------------------------
78
79/// Errors returned by cache encoding and decoding.
80#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
81pub enum CacheError {
82    /// The file does not start with `BNCH`.
83    #[error("invalid magic bytes: expected BNCH, got {0:?}")]
84    InvalidMagic([u8; 4]),
85
86    /// The format version is newer than this library supports.
87    #[error("unsupported cache format version {0}: maximum supported is {CACHE_VERSION}")]
88    UnsupportedVersion(u8),
89
90    /// CRC-64 mismatch — data is corrupted or truncated.
91    #[error("cache data is corrupted: {0}")]
92    DataCorrupted(String),
93
94    /// Unexpected end of data while reading a column or header.
95    #[error("unexpected end of cache data")]
96    UnexpectedEof,
97
98    /// JSON serialisation / deserialisation of event data failed.
99    #[error("event data encode/decode error: {0}")]
100    EventDataError(String),
101
102    /// Column count mismatch between header and data.
103    #[error("column count mismatch: header says {expected}, file has {actual}")]
104    ColumnCountMismatch { expected: usize, actual: usize }, // thiserror handles named fields
105}
106
107impl From<serde_json::Error> for CacheError {
108    fn from(e: serde_json::Error) -> Self {
109        Self::EventDataError(e.to_string())
110    }
111}
112
113// ---------------------------------------------------------------------------
114// CRC-64 helper (simple XOR-based checksum for now)
115// ---------------------------------------------------------------------------
116
117/// Compute a simple 64-bit checksum over the data bytes.
118///
119/// Uses the CRC-64/XZ (ECMA-182) polynomial. This is intentionally a simple
120/// implementation; production code can swap in a crate like `crc64` if
121/// available.
122fn checksum(data: &[u8]) -> u64 {
123    // Polynomial for CRC-64/XZ
124    // Simple table-less implementation for correctness
125    const POLY: u64 = 0xC96C_5795_D787_0F42;
126    let mut crc: u64 = u64::MAX;
127    for &byte in data {
128        crc ^= u64::from(byte) << 56;
129        for _ in 0..8 {
130            if crc & (1 << 63) != 0 {
131                crc = (crc << 1) ^ POLY;
132            } else {
133                crc <<= 1;
134            }
135        }
136    }
137    !crc
138}
139
140// ---------------------------------------------------------------------------
141// CacheHeader
142// ---------------------------------------------------------------------------
143
144/// File header for the binary event cache.
145#[derive(Debug, Clone, PartialEq, Eq)]
146pub struct CacheHeader {
147    /// Format version (currently 1).
148    pub version: u8,
149    /// Number of columns present.
150    pub column_count: u8,
151    /// Number of events (rows) in the file.
152    pub row_count: u64,
153    /// Wall-clock timestamp at cache creation (us since Unix epoch).
154    pub created_at_us: u64,
155    /// CRC-64 over all column data bytes.
156    pub data_crc64: u64,
157}
158
159impl CacheHeader {
160    /// Create a new header for a file containing `row_count` events,
161    /// created at `created_at_us` with a placeholder CRC.
162    #[must_use]
163    pub const fn new(row_count: u64, created_at_us: u64) -> Self {
164        Self {
165            version: CACHE_VERSION,
166            column_count: {
167                const { assert!(COLUMN_COUNT <= u8::MAX as usize) };
168                #[allow(clippy::cast_possible_truncation)]
169                {
170                    COLUMN_COUNT as u8
171                }
172            },
173            row_count,
174            created_at_us,
175            data_crc64: 0,
176        }
177    }
178
179    /// Encode the header and all column data into a byte buffer.
180    ///
181    /// The column offsets array is written immediately after the 32-byte
182    /// header. Each column is then written at its recorded offset.
183    ///
184    /// # Errors
185    ///
186    /// Returns [`CacheError`] if any column fails to encode.
187    pub fn encode(&mut self, cols: &CacheColumns) -> Result<Vec<u8>, CacheError> {
188        // Encode each column into its own buffer
189        let mut col_bufs: Vec<Vec<u8>> = vec![Vec::new(); COLUMN_COUNT];
190
191        TimestampCodec::encode(&cols.timestamps, &mut col_bufs[COL_TIMESTAMPS])?;
192        InternedStringCodec::encode(&cols.agents, &mut col_bufs[COL_AGENTS])?;
193        EventTypeCodec::encode(&cols.event_types, &mut col_bufs[COL_EVENT_TYPES])?;
194        ItemIdCodec::encode(&cols.item_ids, &mut col_bufs[COL_ITEM_IDS])?;
195        InternedStringCodec::encode(&cols.parents, &mut col_bufs[COL_PARENTS])?;
196        RawBytesCodec::encode(&cols.itc, &mut col_bufs[COL_ITC])?;
197        ValueCodec::encode(&cols.values, &mut col_bufs[COL_VALUES])?;
198
199        // Compute column offsets
200        let offsets_section_size = COLUMN_COUNT * 8; // 8 bytes per u64 offset
201        let header_and_offsets = HEADER_SIZE + offsets_section_size;
202        let mut offsets: Vec<u64> = Vec::with_capacity(COLUMN_COUNT);
203        let mut cur = header_and_offsets as u64;
204        for buf in &col_bufs {
205            offsets.push(cur);
206            cur += buf.len() as u64;
207        }
208
209        // Compute CRC over all column data
210        let mut all_col_bytes: Vec<u8> = Vec::new();
211        for buf in &col_bufs {
212            all_col_bytes.extend_from_slice(buf);
213        }
214        self.data_crc64 = checksum(&all_col_bytes);
215
216        // Assemble final buffer
217        let total = header_and_offsets + all_col_bytes.len();
218        let mut out = Vec::with_capacity(total);
219
220        // Header (32 bytes)
221        out.extend_from_slice(&CACHE_MAGIC);
222        out.push(self.version);
223        out.push(self.column_count);
224        out.extend_from_slice(&0u16.to_le_bytes()); // reserved
225        out.extend_from_slice(&self.row_count.to_le_bytes());
226        out.extend_from_slice(&self.created_at_us.to_le_bytes());
227        out.extend_from_slice(&self.data_crc64.to_le_bytes());
228        debug_assert_eq!(out.len(), HEADER_SIZE);
229
230        // Column offsets
231        for offset in &offsets {
232            out.extend_from_slice(&offset.to_le_bytes());
233        }
234
235        // Column data
236        out.extend_from_slice(&all_col_bytes);
237
238        Ok(out)
239    }
240
241    /// Decode a cache file from bytes, returning the header and column data.
242    ///
243    /// # Errors
244    ///
245    /// Returns [`CacheError`] if:
246    /// - The magic bytes are wrong.
247    /// - The version is unsupported.
248    /// - The CRC does not match.
249    /// - Any column data is truncated or malformed.
250    ///
251    /// # Panics
252    ///
253    /// Panics if fixed-size slice conversions fail, which cannot happen when
254    /// the data length has already been validated.
255    pub fn decode(data: &[u8]) -> Result<(Self, CacheColumns), CacheError> {
256        if data.len() < HEADER_SIZE {
257            return Err(CacheError::UnexpectedEof);
258        }
259
260        // Check magic
261        let magic: [u8; 4] = data[0..4].try_into().expect("slice is 4 bytes");
262        if magic != CACHE_MAGIC {
263            return Err(CacheError::InvalidMagic(magic));
264        }
265
266        let version = data[4];
267        if version > CACHE_VERSION {
268            return Err(CacheError::UnsupportedVersion(version));
269        }
270
271        let column_count = data[5] as usize;
272        // bytes 6-7 are reserved
273        let row_count = u64::from_le_bytes(data[8..16].try_into().expect("slice is 8 bytes"));
274        let created_at_us = u64::from_le_bytes(data[16..24].try_into().expect("slice is 8 bytes"));
275        let stored_crc = u64::from_le_bytes(data[24..32].try_into().expect("slice is 8 bytes"));
276
277        // Read column offsets
278        let offsets_start = HEADER_SIZE;
279        let offsets_end = offsets_start + column_count * 8;
280        if data.len() < offsets_end {
281            return Err(CacheError::UnexpectedEof);
282        }
283
284        let mut offsets: Vec<u64> = Vec::with_capacity(column_count);
285        for i in 0..column_count {
286            let start = offsets_start + i * 8;
287            let offset =
288                u64::from_le_bytes(data[start..start + 8].try_into().expect("slice is 8 bytes"));
289            offsets.push(offset);
290        }
291
292        // Verify CRC over all column data (from first column offset to end)
293        let col_data_start = offsets_end;
294        if data.len() < col_data_start {
295            return Err(CacheError::UnexpectedEof);
296        }
297        let col_data = &data[col_data_start..];
298        let actual_crc = checksum(col_data);
299        if actual_crc != stored_crc {
300            return Err(CacheError::DataCorrupted(format!(
301                "CRC mismatch: expected {stored_crc:#018x}, got {actual_crc:#018x}"
302            )));
303        }
304
305        // Check column count
306        if column_count < COLUMN_COUNT {
307            return Err(CacheError::ColumnCountMismatch {
308                expected: COLUMN_COUNT,
309                actual: column_count,
310            });
311        }
312
313        let count = usize::try_from(row_count).map_err(|_| {
314            CacheError::DataCorrupted(format!("row_count {row_count} exceeds platform usize"))
315        })?;
316
317        // Helper: get column data slice given offset index
318        let col_slice = |col_idx: usize| -> Result<&[u8], CacheError> {
319            let start = usize::try_from(offsets[col_idx]).map_err(|_| CacheError::UnexpectedEof)?;
320            if start > data.len() {
321                return Err(CacheError::UnexpectedEof);
322            }
323            // End is either the next column's offset or end of file
324            let end = if col_idx + 1 < column_count {
325                usize::try_from(offsets[col_idx + 1]).map_err(|_| CacheError::UnexpectedEof)?
326            } else {
327                data.len()
328            };
329            if end > data.len() {
330                return Err(CacheError::UnexpectedEof);
331            }
332            Ok(&data[start..end])
333        };
334
335        // Decode each column
336        let (timestamps, _) = TimestampCodec::decode(col_slice(COL_TIMESTAMPS)?, count)?;
337        let (agents, _) = InternedStringCodec::decode(col_slice(COL_AGENTS)?, count)?;
338        let (event_types, _) = EventTypeCodec::decode(col_slice(COL_EVENT_TYPES)?, count)?;
339        let (item_ids, _) = ItemIdCodec::decode(col_slice(COL_ITEM_IDS)?, count)?;
340        let (parents, _) = InternedStringCodec::decode(col_slice(COL_PARENTS)?, count)?;
341        let (itc, _) = RawBytesCodec::decode(col_slice(COL_ITC)?, count)?;
342        let (values, _) = ValueCodec::decode(col_slice(COL_VALUES)?, count)?;
343
344        let cols = CacheColumns {
345            timestamps,
346            agents,
347            event_types,
348            item_ids,
349            parents,
350            itc,
351            values,
352        };
353
354        let header = Self {
355            version,
356            column_count: u8::try_from(column_count).map_err(|_| {
357                CacheError::DataCorrupted(format!("column_count {column_count} exceeds u8"))
358            })?,
359            row_count,
360            created_at_us,
361            data_crc64: stored_crc,
362        };
363
364        Ok((header, cols))
365    }
366}
367
368// ---------------------------------------------------------------------------
369// High-level encode / decode helpers
370// ---------------------------------------------------------------------------
371
372/// Encode a slice of events to binary cache bytes.
373///
374/// Convenience wrapper around [`CacheColumns::from_events`] +
375/// [`CacheHeader::encode`].
376///
377/// # Errors
378///
379/// Returns [`CacheError`] if column encoding or JSON serialisation fails.
380pub fn encode_events(events: &[Event], created_at_us: u64) -> Result<Vec<u8>, CacheError> {
381    let cols = CacheColumns::from_events(events)?;
382    let mut header = CacheHeader::new(events.len() as u64, created_at_us);
383    header.encode(&cols)
384}
385
386/// Decode binary cache bytes back to a vector of events.
387///
388/// Convenience wrapper around [`CacheHeader::decode`] +
389/// [`CacheColumns::into_events`].
390///
391/// **Note**: The `event_hash` field of each reconstructed event will be empty.
392/// Callers that need hashes must recompute them from the TSJSON writer.
393///
394/// # Errors
395///
396/// Returns [`CacheError`] if header validation, CRC check, or column decode
397/// fails.
398pub fn decode_events(data: &[u8]) -> Result<(CacheHeader, Vec<Event>), CacheError> {
399    let (header, cols) = CacheHeader::decode(data)?;
400    let events = cols.into_events().map_err(CacheError::EventDataError)?;
401    Ok((header, events))
402}
403
404// ---------------------------------------------------------------------------
405// Tests
406// ---------------------------------------------------------------------------
407
408#[cfg(test)]
409mod tests {
410    use super::*;
411    use crate::event::data::CreateData;
412    use crate::event::data::MoveData;
413    use crate::event::{Event, EventData, EventType};
414    use crate::model::item::{Kind, State, Urgency};
415    use crate::model::item_id::ItemId;
416    use std::collections::BTreeMap;
417
418    fn make_event(ts: i64, agent: &str, et: EventType, item: &str) -> Event {
419        use crate::event::data::{
420            AssignAction, AssignData, CommentData, CompactData, DeleteData, LinkData, RedactData,
421            SnapshotData, UnlinkData, UpdateData,
422        };
423        let data = match et {
424            EventType::Create => EventData::Create(CreateData {
425                title: format!("Item {item}"),
426                kind: Kind::Task,
427                size: None,
428                urgency: Urgency::Default,
429                labels: vec![],
430                parent: None,
431                causation: None,
432                description: None,
433                extra: BTreeMap::new(),
434            }),
435            EventType::Update => EventData::Update(UpdateData {
436                field: "title".to_string(),
437                value: serde_json::json!("new title"),
438                extra: BTreeMap::new(),
439            }),
440            EventType::Move => EventData::Move(MoveData {
441                state: State::Doing,
442                reason: None,
443                extra: BTreeMap::new(),
444            }),
445            EventType::Assign => EventData::Assign(AssignData {
446                agent: "assignee".to_string(),
447                action: AssignAction::Assign,
448                extra: BTreeMap::new(),
449            }),
450            EventType::Comment => EventData::Comment(CommentData {
451                body: "A comment".to_string(),
452                extra: BTreeMap::new(),
453            }),
454            EventType::Link => EventData::Link(LinkData {
455                target: "bn-other".to_string(),
456                link_type: "blocks".to_string(),
457                extra: BTreeMap::new(),
458            }),
459            EventType::Unlink => EventData::Unlink(UnlinkData {
460                target: "bn-other".to_string(),
461                link_type: None,
462                extra: BTreeMap::new(),
463            }),
464            EventType::Delete => EventData::Delete(DeleteData {
465                reason: None,
466                extra: BTreeMap::new(),
467            }),
468            EventType::Compact => EventData::Compact(CompactData {
469                summary: "TL;DR".to_string(),
470                extra: BTreeMap::new(),
471            }),
472            EventType::Snapshot => EventData::Snapshot(SnapshotData {
473                state: serde_json::json!({"id": item}),
474                extra: BTreeMap::new(),
475            }),
476            EventType::Redact => EventData::Redact(RedactData {
477                target_hash: "blake3:abc".to_string(),
478                reason: "oops".to_string(),
479                extra: BTreeMap::new(),
480            }),
481        };
482        Event {
483            wall_ts_us: ts,
484            agent: agent.to_string(),
485            itc: "itc:AQ".to_string(),
486            parents: vec![],
487            event_type: et,
488            item_id: ItemId::new_unchecked(item),
489            data,
490            event_hash: format!("blake3:{ts:016x}"),
491        }
492    }
493
494    // === Constants ========================================================
495
496    #[test]
497    fn magic_bytes_are_bnch() {
498        assert_eq!(&CACHE_MAGIC, b"BNCH");
499    }
500
501    #[test]
502    fn header_size_is_32() {
503        assert_eq!(HEADER_SIZE, 32);
504    }
505
506    // === CacheHeader::new =================================================
507
508    #[test]
509    fn new_header_defaults() {
510        let h = CacheHeader::new(42, 1_700_000_000_000);
511        assert_eq!(h.version, CACHE_VERSION);
512        assert_eq!(h.column_count, COLUMN_COUNT as u8);
513        assert_eq!(h.row_count, 42);
514        assert_eq!(h.created_at_us, 1_700_000_000_000);
515        assert_eq!(h.data_crc64, 0); // placeholder until encode
516    }
517
518    // === Checksum =========================================================
519
520    #[test]
521    fn checksum_empty() {
522        let c = checksum(&[]);
523        // Just verify it produces a consistent value (not zero)
524        assert_eq!(c, checksum(&[]));
525    }
526
527    #[test]
528    fn checksum_different_data() {
529        assert_ne!(checksum(b"hello"), checksum(b"world"));
530    }
531
532    #[test]
533    fn checksum_single_bit_flip() {
534        let data = b"hello world";
535        let mut flipped = data.to_vec();
536        flipped[5] ^= 0x01;
537        assert_ne!(checksum(data), checksum(&flipped));
538    }
539
540    // === encode_events / decode_events ====================================
541
542    #[test]
543    fn encode_decode_empty() {
544        let bytes = encode_events(&[], 0).unwrap();
545        let (header, events) = decode_events(&bytes).unwrap();
546        assert_eq!(header.row_count, 0);
547        assert!(events.is_empty());
548    }
549
550    #[test]
551    fn encode_decode_single_event() {
552        let event = make_event(1_700_000_000_000, "claude", EventType::Create, "bn-a7x");
553        let bytes = encode_events(std::slice::from_ref(&event), 9999).unwrap();
554        let (header, events) = decode_events(&bytes).unwrap();
555
556        assert_eq!(header.row_count, 1);
557        assert_eq!(header.created_at_us, 9999);
558        assert_eq!(events.len(), 1);
559        assert_eq!(events[0].wall_ts_us, 1_700_000_000_000);
560        assert_eq!(events[0].agent, "claude");
561        assert_eq!(events[0].event_type, EventType::Create);
562        assert_eq!(events[0].item_id.as_str(), "bn-a7x");
563    }
564
565    #[test]
566    fn encode_decode_multiple_events() {
567        let events = vec![
568            make_event(1_000, "alice", EventType::Create, "bn-a7x"),
569            make_event(2_000, "bob", EventType::Move, "bn-a7x"),
570            make_event(3_000, "alice", EventType::Create, "bn-b8y"),
571            make_event(4_000, "carol", EventType::Move, "bn-b8y"),
572        ];
573        let bytes = encode_events(&events, 0).unwrap();
574        let (header, decoded) = decode_events(&bytes).unwrap();
575
576        assert_eq!(header.row_count, 4);
577        assert_eq!(decoded.len(), 4);
578        assert_eq!(decoded[0].wall_ts_us, 1_000);
579        assert_eq!(decoded[1].agent, "bob");
580        assert_eq!(decoded[2].item_id.as_str(), "bn-b8y");
581        assert_eq!(decoded[3].event_type, EventType::Move);
582    }
583
584    #[test]
585    fn encode_decode_all_event_types() {
586        let all_types = EventType::ALL;
587        let events: Vec<Event> = all_types
588            .iter()
589            .enumerate()
590            .map(|(i, &et)| make_event((i as i64 + 1) * 1000, "agent", et, "bn-a7x"))
591            .collect();
592
593        let bytes = encode_events(&events, 0).unwrap();
594        let (_, decoded) = decode_events(&bytes).unwrap();
595
596        assert_eq!(decoded.len(), all_types.len());
597        for (i, et) in all_types.iter().enumerate() {
598            assert_eq!(decoded[i].event_type, *et, "mismatch at index {i}");
599        }
600    }
601
602    // === Header validation ================================================
603
604    #[test]
605    fn decode_bad_magic() {
606        let mut bytes = encode_events(&[], 0).unwrap();
607        bytes[0] = 0xFF; // corrupt magic
608        let err = decode_events(&bytes).unwrap_err();
609        assert!(matches!(err, CacheError::InvalidMagic(_)));
610    }
611
612    #[test]
613    fn decode_unsupported_version() {
614        let mut bytes = encode_events(&[], 0).unwrap();
615        bytes[4] = 99; // future version
616        let err = decode_events(&bytes).unwrap_err();
617        assert!(matches!(err, CacheError::UnsupportedVersion(99)));
618    }
619
620    #[test]
621    fn decode_corrupted_crc() {
622        let mut bytes =
623            encode_events(&[make_event(1_000, "a", EventType::Create, "bn-a7x")], 0).unwrap();
624        // Flip a byte in the column data (after header + offsets)
625        let col_start = HEADER_SIZE + COLUMN_COUNT * 8;
626        if col_start < bytes.len() {
627            bytes[col_start] ^= 0xFF;
628        }
629        let err = decode_events(&bytes).unwrap_err();
630        // Either CRC or DataCorrupted
631        assert!(
632            matches!(err, CacheError::DataCorrupted(_)),
633            "expected DataCorrupted, got {err:?}"
634        );
635    }
636
637    #[test]
638    fn decode_truncated_data() {
639        let bytes =
640            encode_events(&[make_event(1_000, "a", EventType::Create, "bn-a7x")], 0).unwrap();
641        let truncated = &bytes[..bytes.len() / 2];
642        let err = decode_events(truncated).unwrap_err();
643        assert!(
644            matches!(
645                err,
646                CacheError::UnexpectedEof | CacheError::DataCorrupted(_)
647            ),
648            "expected truncation error, got {err:?}"
649        );
650    }
651
652    // === Large batch ======================================================
653
654    #[test]
655    fn encode_decode_large_batch() {
656        let n = 500;
657        let events: Vec<Event> = (0..n)
658            .map(|i| {
659                make_event(
660                    i as i64 * 1000,
661                    if i % 3 == 0 {
662                        "alice"
663                    } else if i % 3 == 1 {
664                        "bob"
665                    } else {
666                        "carol"
667                    },
668                    if i % 2 == 0 {
669                        EventType::Create
670                    } else {
671                        EventType::Move
672                    },
673                    &format!("bn-{:03}", i % 50),
674                )
675            })
676            .collect();
677
678        let bytes = encode_events(&events, 42).unwrap();
679        let (header, decoded) = decode_events(&bytes).unwrap();
680
681        assert_eq!(header.row_count, n as u64);
682        assert_eq!(decoded.len(), n);
683        for (i, (orig, dec)) in events.iter().zip(decoded.iter()).enumerate() {
684            assert_eq!(orig.wall_ts_us, dec.wall_ts_us, "ts mismatch at {i}");
685            assert_eq!(orig.agent, dec.agent, "agent mismatch at {i}");
686            assert_eq!(orig.event_type, dec.event_type, "type mismatch at {i}");
687            assert_eq!(orig.item_id, dec.item_id, "item mismatch at {i}");
688        }
689    }
690}