1pub mod codec;
32pub mod columns;
33pub mod manager;
34pub mod reader;
35pub mod writer;
36
37pub use codec::{
38 ColumnCodec, EventTypeCodec, InternedStringCodec, ItemIdCodec, RawBytesCodec, TimestampCodec,
39 ValueCodec,
40};
41pub use columns::{COLUMN_COUNT, CacheColumns, ColumnRow};
42pub use manager::{CacheManager, LoadResult, LoadSource};
43pub use reader::{CacheReader, CacheReaderError};
44pub use writer::{CacheStats, CacheWriter, rebuild_cache};
45
46use crate::event::Event;
47use columns::{
48 COL_AGENTS, COL_EVENT_TYPES, COL_ITC, COL_ITEM_IDS, COL_PARENTS, COL_TIMESTAMPS, COL_VALUES,
49};
50
51pub const CACHE_MAGIC: [u8; 4] = *b"BNCH";
57
58pub const CACHE_VERSION: u8 = 1;
60
61pub const HEADER_SIZE: usize = 32;
74
75#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
81pub enum CacheError {
82 #[error("invalid magic bytes: expected BNCH, got {0:?}")]
84 InvalidMagic([u8; 4]),
85
86 #[error("unsupported cache format version {0}: maximum supported is {CACHE_VERSION}")]
88 UnsupportedVersion(u8),
89
90 #[error("cache data is corrupted: {0}")]
92 DataCorrupted(String),
93
94 #[error("unexpected end of cache data")]
96 UnexpectedEof,
97
98 #[error("event data encode/decode error: {0}")]
100 EventDataError(String),
101
102 #[error("column count mismatch: header says {expected}, file has {actual}")]
104 ColumnCountMismatch { expected: usize, actual: usize }, }
106
107impl From<serde_json::Error> for CacheError {
108 fn from(e: serde_json::Error) -> Self {
109 Self::EventDataError(e.to_string())
110 }
111}
112
113fn checksum(data: &[u8]) -> u64 {
123 const POLY: u64 = 0xC96C_5795_D787_0F42;
126 let mut crc: u64 = u64::MAX;
127 for &byte in data {
128 crc ^= u64::from(byte) << 56;
129 for _ in 0..8 {
130 if crc & (1 << 63) != 0 {
131 crc = (crc << 1) ^ POLY;
132 } else {
133 crc <<= 1;
134 }
135 }
136 }
137 !crc
138}
139
140#[derive(Debug, Clone, PartialEq, Eq)]
146pub struct CacheHeader {
147 pub version: u8,
149 pub column_count: u8,
151 pub row_count: u64,
153 pub created_at_us: u64,
155 pub data_crc64: u64,
157}
158
159impl CacheHeader {
160 #[must_use]
163 pub const fn new(row_count: u64, created_at_us: u64) -> Self {
164 Self {
165 version: CACHE_VERSION,
166 column_count: {
167 const { assert!(COLUMN_COUNT <= u8::MAX as usize) };
168 #[allow(clippy::cast_possible_truncation)]
169 {
170 COLUMN_COUNT as u8
171 }
172 },
173 row_count,
174 created_at_us,
175 data_crc64: 0,
176 }
177 }
178
179 pub fn encode(&mut self, cols: &CacheColumns) -> Result<Vec<u8>, CacheError> {
188 let mut col_bufs: Vec<Vec<u8>> = vec![Vec::new(); COLUMN_COUNT];
190
191 TimestampCodec::encode(&cols.timestamps, &mut col_bufs[COL_TIMESTAMPS])?;
192 InternedStringCodec::encode(&cols.agents, &mut col_bufs[COL_AGENTS])?;
193 EventTypeCodec::encode(&cols.event_types, &mut col_bufs[COL_EVENT_TYPES])?;
194 ItemIdCodec::encode(&cols.item_ids, &mut col_bufs[COL_ITEM_IDS])?;
195 InternedStringCodec::encode(&cols.parents, &mut col_bufs[COL_PARENTS])?;
196 RawBytesCodec::encode(&cols.itc, &mut col_bufs[COL_ITC])?;
197 ValueCodec::encode(&cols.values, &mut col_bufs[COL_VALUES])?;
198
199 let offsets_section_size = COLUMN_COUNT * 8; let header_and_offsets = HEADER_SIZE + offsets_section_size;
202 let mut offsets: Vec<u64> = Vec::with_capacity(COLUMN_COUNT);
203 let mut cur = header_and_offsets as u64;
204 for buf in &col_bufs {
205 offsets.push(cur);
206 cur += buf.len() as u64;
207 }
208
209 let mut all_col_bytes: Vec<u8> = Vec::new();
211 for buf in &col_bufs {
212 all_col_bytes.extend_from_slice(buf);
213 }
214 self.data_crc64 = checksum(&all_col_bytes);
215
216 let total = header_and_offsets + all_col_bytes.len();
218 let mut out = Vec::with_capacity(total);
219
220 out.extend_from_slice(&CACHE_MAGIC);
222 out.push(self.version);
223 out.push(self.column_count);
224 out.extend_from_slice(&0u16.to_le_bytes()); out.extend_from_slice(&self.row_count.to_le_bytes());
226 out.extend_from_slice(&self.created_at_us.to_le_bytes());
227 out.extend_from_slice(&self.data_crc64.to_le_bytes());
228 debug_assert_eq!(out.len(), HEADER_SIZE);
229
230 for offset in &offsets {
232 out.extend_from_slice(&offset.to_le_bytes());
233 }
234
235 out.extend_from_slice(&all_col_bytes);
237
238 Ok(out)
239 }
240
241 pub fn decode(data: &[u8]) -> Result<(Self, CacheColumns), CacheError> {
256 if data.len() < HEADER_SIZE {
257 return Err(CacheError::UnexpectedEof);
258 }
259
260 let magic: [u8; 4] = data[0..4].try_into().expect("slice is 4 bytes");
262 if magic != CACHE_MAGIC {
263 return Err(CacheError::InvalidMagic(magic));
264 }
265
266 let version = data[4];
267 if version > CACHE_VERSION {
268 return Err(CacheError::UnsupportedVersion(version));
269 }
270
271 let column_count = data[5] as usize;
272 let row_count = u64::from_le_bytes(data[8..16].try_into().expect("slice is 8 bytes"));
274 let created_at_us = u64::from_le_bytes(data[16..24].try_into().expect("slice is 8 bytes"));
275 let stored_crc = u64::from_le_bytes(data[24..32].try_into().expect("slice is 8 bytes"));
276
277 let offsets_start = HEADER_SIZE;
279 let offsets_end = offsets_start + column_count * 8;
280 if data.len() < offsets_end {
281 return Err(CacheError::UnexpectedEof);
282 }
283
284 let mut offsets: Vec<u64> = Vec::with_capacity(column_count);
285 for i in 0..column_count {
286 let start = offsets_start + i * 8;
287 let offset =
288 u64::from_le_bytes(data[start..start + 8].try_into().expect("slice is 8 bytes"));
289 offsets.push(offset);
290 }
291
292 let col_data_start = offsets_end;
294 if data.len() < col_data_start {
295 return Err(CacheError::UnexpectedEof);
296 }
297 let col_data = &data[col_data_start..];
298 let actual_crc = checksum(col_data);
299 if actual_crc != stored_crc {
300 return Err(CacheError::DataCorrupted(format!(
301 "CRC mismatch: expected {stored_crc:#018x}, got {actual_crc:#018x}"
302 )));
303 }
304
305 if column_count < COLUMN_COUNT {
307 return Err(CacheError::ColumnCountMismatch {
308 expected: COLUMN_COUNT,
309 actual: column_count,
310 });
311 }
312
313 let count = usize::try_from(row_count).map_err(|_| {
314 CacheError::DataCorrupted(format!("row_count {row_count} exceeds platform usize"))
315 })?;
316
317 let col_slice = |col_idx: usize| -> Result<&[u8], CacheError> {
319 let start = usize::try_from(offsets[col_idx]).map_err(|_| CacheError::UnexpectedEof)?;
320 if start > data.len() {
321 return Err(CacheError::UnexpectedEof);
322 }
323 let end = if col_idx + 1 < column_count {
325 usize::try_from(offsets[col_idx + 1]).map_err(|_| CacheError::UnexpectedEof)?
326 } else {
327 data.len()
328 };
329 if end > data.len() {
330 return Err(CacheError::UnexpectedEof);
331 }
332 Ok(&data[start..end])
333 };
334
335 let (timestamps, _) = TimestampCodec::decode(col_slice(COL_TIMESTAMPS)?, count)?;
337 let (agents, _) = InternedStringCodec::decode(col_slice(COL_AGENTS)?, count)?;
338 let (event_types, _) = EventTypeCodec::decode(col_slice(COL_EVENT_TYPES)?, count)?;
339 let (item_ids, _) = ItemIdCodec::decode(col_slice(COL_ITEM_IDS)?, count)?;
340 let (parents, _) = InternedStringCodec::decode(col_slice(COL_PARENTS)?, count)?;
341 let (itc, _) = RawBytesCodec::decode(col_slice(COL_ITC)?, count)?;
342 let (values, _) = ValueCodec::decode(col_slice(COL_VALUES)?, count)?;
343
344 let cols = CacheColumns {
345 timestamps,
346 agents,
347 event_types,
348 item_ids,
349 parents,
350 itc,
351 values,
352 };
353
354 let header = Self {
355 version,
356 column_count: u8::try_from(column_count).map_err(|_| {
357 CacheError::DataCorrupted(format!("column_count {column_count} exceeds u8"))
358 })?,
359 row_count,
360 created_at_us,
361 data_crc64: stored_crc,
362 };
363
364 Ok((header, cols))
365 }
366}
367
368pub fn encode_events(events: &[Event], created_at_us: u64) -> Result<Vec<u8>, CacheError> {
381 let cols = CacheColumns::from_events(events)?;
382 let mut header = CacheHeader::new(events.len() as u64, created_at_us);
383 header.encode(&cols)
384}
385
386pub fn decode_events(data: &[u8]) -> Result<(CacheHeader, Vec<Event>), CacheError> {
399 let (header, cols) = CacheHeader::decode(data)?;
400 let events = cols.into_events().map_err(CacheError::EventDataError)?;
401 Ok((header, events))
402}
403
404#[cfg(test)]
409mod tests {
410 use super::*;
411 use crate::event::data::CreateData;
412 use crate::event::data::MoveData;
413 use crate::event::{Event, EventData, EventType};
414 use crate::model::item::{Kind, State, Urgency};
415 use crate::model::item_id::ItemId;
416 use std::collections::BTreeMap;
417
418 fn make_event(ts: i64, agent: &str, et: EventType, item: &str) -> Event {
419 use crate::event::data::{
420 AssignAction, AssignData, CommentData, CompactData, DeleteData, LinkData, RedactData,
421 SnapshotData, UnlinkData, UpdateData,
422 };
423 let data = match et {
424 EventType::Create => EventData::Create(CreateData {
425 title: format!("Item {item}"),
426 kind: Kind::Task,
427 size: None,
428 urgency: Urgency::Default,
429 labels: vec![],
430 parent: None,
431 causation: None,
432 description: None,
433 extra: BTreeMap::new(),
434 }),
435 EventType::Update => EventData::Update(UpdateData {
436 field: "title".to_string(),
437 value: serde_json::json!("new title"),
438 extra: BTreeMap::new(),
439 }),
440 EventType::Move => EventData::Move(MoveData {
441 state: State::Doing,
442 reason: None,
443 extra: BTreeMap::new(),
444 }),
445 EventType::Assign => EventData::Assign(AssignData {
446 agent: "assignee".to_string(),
447 action: AssignAction::Assign,
448 extra: BTreeMap::new(),
449 }),
450 EventType::Comment => EventData::Comment(CommentData {
451 body: "A comment".to_string(),
452 extra: BTreeMap::new(),
453 }),
454 EventType::Link => EventData::Link(LinkData {
455 target: "bn-other".to_string(),
456 link_type: "blocks".to_string(),
457 extra: BTreeMap::new(),
458 }),
459 EventType::Unlink => EventData::Unlink(UnlinkData {
460 target: "bn-other".to_string(),
461 link_type: None,
462 extra: BTreeMap::new(),
463 }),
464 EventType::Delete => EventData::Delete(DeleteData {
465 reason: None,
466 extra: BTreeMap::new(),
467 }),
468 EventType::Compact => EventData::Compact(CompactData {
469 summary: "TL;DR".to_string(),
470 extra: BTreeMap::new(),
471 }),
472 EventType::Snapshot => EventData::Snapshot(SnapshotData {
473 state: serde_json::json!({"id": item}),
474 extra: BTreeMap::new(),
475 }),
476 EventType::Redact => EventData::Redact(RedactData {
477 target_hash: "blake3:abc".to_string(),
478 reason: "oops".to_string(),
479 extra: BTreeMap::new(),
480 }),
481 };
482 Event {
483 wall_ts_us: ts,
484 agent: agent.to_string(),
485 itc: "itc:AQ".to_string(),
486 parents: vec![],
487 event_type: et,
488 item_id: ItemId::new_unchecked(item),
489 data,
490 event_hash: format!("blake3:{ts:016x}"),
491 }
492 }
493
494 #[test]
497 fn magic_bytes_are_bnch() {
498 assert_eq!(&CACHE_MAGIC, b"BNCH");
499 }
500
501 #[test]
502 fn header_size_is_32() {
503 assert_eq!(HEADER_SIZE, 32);
504 }
505
506 #[test]
509 fn new_header_defaults() {
510 let h = CacheHeader::new(42, 1_700_000_000_000);
511 assert_eq!(h.version, CACHE_VERSION);
512 assert_eq!(h.column_count, COLUMN_COUNT as u8);
513 assert_eq!(h.row_count, 42);
514 assert_eq!(h.created_at_us, 1_700_000_000_000);
515 assert_eq!(h.data_crc64, 0); }
517
518 #[test]
521 fn checksum_empty() {
522 let c = checksum(&[]);
523 assert_eq!(c, checksum(&[]));
525 }
526
527 #[test]
528 fn checksum_different_data() {
529 assert_ne!(checksum(b"hello"), checksum(b"world"));
530 }
531
532 #[test]
533 fn checksum_single_bit_flip() {
534 let data = b"hello world";
535 let mut flipped = data.to_vec();
536 flipped[5] ^= 0x01;
537 assert_ne!(checksum(data), checksum(&flipped));
538 }
539
540 #[test]
543 fn encode_decode_empty() {
544 let bytes = encode_events(&[], 0).unwrap();
545 let (header, events) = decode_events(&bytes).unwrap();
546 assert_eq!(header.row_count, 0);
547 assert!(events.is_empty());
548 }
549
550 #[test]
551 fn encode_decode_single_event() {
552 let event = make_event(1_700_000_000_000, "claude", EventType::Create, "bn-a7x");
553 let bytes = encode_events(std::slice::from_ref(&event), 9999).unwrap();
554 let (header, events) = decode_events(&bytes).unwrap();
555
556 assert_eq!(header.row_count, 1);
557 assert_eq!(header.created_at_us, 9999);
558 assert_eq!(events.len(), 1);
559 assert_eq!(events[0].wall_ts_us, 1_700_000_000_000);
560 assert_eq!(events[0].agent, "claude");
561 assert_eq!(events[0].event_type, EventType::Create);
562 assert_eq!(events[0].item_id.as_str(), "bn-a7x");
563 }
564
565 #[test]
566 fn encode_decode_multiple_events() {
567 let events = vec![
568 make_event(1_000, "alice", EventType::Create, "bn-a7x"),
569 make_event(2_000, "bob", EventType::Move, "bn-a7x"),
570 make_event(3_000, "alice", EventType::Create, "bn-b8y"),
571 make_event(4_000, "carol", EventType::Move, "bn-b8y"),
572 ];
573 let bytes = encode_events(&events, 0).unwrap();
574 let (header, decoded) = decode_events(&bytes).unwrap();
575
576 assert_eq!(header.row_count, 4);
577 assert_eq!(decoded.len(), 4);
578 assert_eq!(decoded[0].wall_ts_us, 1_000);
579 assert_eq!(decoded[1].agent, "bob");
580 assert_eq!(decoded[2].item_id.as_str(), "bn-b8y");
581 assert_eq!(decoded[3].event_type, EventType::Move);
582 }
583
584 #[test]
585 fn encode_decode_all_event_types() {
586 let all_types = EventType::ALL;
587 let events: Vec<Event> = all_types
588 .iter()
589 .enumerate()
590 .map(|(i, &et)| make_event((i as i64 + 1) * 1000, "agent", et, "bn-a7x"))
591 .collect();
592
593 let bytes = encode_events(&events, 0).unwrap();
594 let (_, decoded) = decode_events(&bytes).unwrap();
595
596 assert_eq!(decoded.len(), all_types.len());
597 for (i, et) in all_types.iter().enumerate() {
598 assert_eq!(decoded[i].event_type, *et, "mismatch at index {i}");
599 }
600 }
601
602 #[test]
605 fn decode_bad_magic() {
606 let mut bytes = encode_events(&[], 0).unwrap();
607 bytes[0] = 0xFF; let err = decode_events(&bytes).unwrap_err();
609 assert!(matches!(err, CacheError::InvalidMagic(_)));
610 }
611
612 #[test]
613 fn decode_unsupported_version() {
614 let mut bytes = encode_events(&[], 0).unwrap();
615 bytes[4] = 99; let err = decode_events(&bytes).unwrap_err();
617 assert!(matches!(err, CacheError::UnsupportedVersion(99)));
618 }
619
620 #[test]
621 fn decode_corrupted_crc() {
622 let mut bytes =
623 encode_events(&[make_event(1_000, "a", EventType::Create, "bn-a7x")], 0).unwrap();
624 let col_start = HEADER_SIZE + COLUMN_COUNT * 8;
626 if col_start < bytes.len() {
627 bytes[col_start] ^= 0xFF;
628 }
629 let err = decode_events(&bytes).unwrap_err();
630 assert!(
632 matches!(err, CacheError::DataCorrupted(_)),
633 "expected DataCorrupted, got {err:?}"
634 );
635 }
636
637 #[test]
638 fn decode_truncated_data() {
639 let bytes =
640 encode_events(&[make_event(1_000, "a", EventType::Create, "bn-a7x")], 0).unwrap();
641 let truncated = &bytes[..bytes.len() / 2];
642 let err = decode_events(truncated).unwrap_err();
643 assert!(
644 matches!(
645 err,
646 CacheError::UnexpectedEof | CacheError::DataCorrupted(_)
647 ),
648 "expected truncation error, got {err:?}"
649 );
650 }
651
652 #[test]
655 fn encode_decode_large_batch() {
656 let n = 500;
657 let events: Vec<Event> = (0..n)
658 .map(|i| {
659 make_event(
660 i as i64 * 1000,
661 if i % 3 == 0 {
662 "alice"
663 } else if i % 3 == 1 {
664 "bob"
665 } else {
666 "carol"
667 },
668 if i % 2 == 0 {
669 EventType::Create
670 } else {
671 EventType::Move
672 },
673 &format!("bn-{:03}", i % 50),
674 )
675 })
676 .collect();
677
678 let bytes = encode_events(&events, 42).unwrap();
679 let (header, decoded) = decode_events(&bytes).unwrap();
680
681 assert_eq!(header.row_count, n as u64);
682 assert_eq!(decoded.len(), n);
683 for (i, (orig, dec)) in events.iter().zip(decoded.iter()).enumerate() {
684 assert_eq!(orig.wall_ts_us, dec.wall_ts_us, "ts mismatch at {i}");
685 assert_eq!(orig.agent, dec.agent, "agent mismatch at {i}");
686 assert_eq!(orig.event_type, dec.event_type, "type mismatch at {i}");
687 assert_eq!(orig.item_id, dec.item_id, "item mismatch at {i}");
688 }
689 }
690}