1pub mod codec;
32pub mod columns;
33pub mod manager;
34pub mod reader;
35pub mod writer;
36
37use std::collections::BTreeMap;
38use std::fs;
39use std::path::Path;
40
41use anyhow::{Context, Result};
42
43pub use codec::{
44 ColumnCodec, EventTypeCodec, InternedStringCodec, ItemIdCodec, RawBytesCodec, TimestampCodec,
45 ValueCodec,
46};
47pub use columns::{COLUMN_COUNT, CacheColumns, ColumnRow};
48pub use manager::{CacheManager, LoadResult, LoadSource};
49pub use reader::{CacheReader, CacheReaderError};
50pub use writer::{CacheStats, CacheWriter, rebuild_cache};
51
52use crate::event::Event;
53use columns::{
54 COL_AGENTS, COL_EVENT_TYPES, COL_ITC, COL_ITEM_IDS, COL_PARENTS, COL_TIMESTAMPS, COL_VALUES,
55};
56
57pub const CACHE_MAGIC: [u8; 4] = *b"BNCH";
63
64pub const CACHE_VERSION: u8 = 1;
66
67pub const HEADER_SIZE: usize = 32;
80
81#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
87pub enum CacheError {
88 #[error("invalid magic bytes: expected BNCH, got {0:?}")]
90 InvalidMagic([u8; 4]),
91
92 #[error("unsupported cache format version {0}: maximum supported is {CACHE_VERSION}")]
94 UnsupportedVersion(u8),
95
96 #[error("cache data is corrupted: {0}")]
98 DataCorrupted(String),
99
100 #[error("unexpected end of cache data")]
102 UnexpectedEof,
103
104 #[error("event data encode/decode error: {0}")]
106 EventDataError(String),
107
108 #[error("column count mismatch: header says {expected}, file has {actual}")]
110 ColumnCountMismatch { expected: usize, actual: usize }, }
112
113impl From<serde_json::Error> for CacheError {
114 fn from(e: serde_json::Error) -> Self {
115 Self::EventDataError(e.to_string())
116 }
117}
118
119fn checksum(data: &[u8]) -> u64 {
129 const POLY: u64 = 0xC96C_5795_D787_0F42;
132 let mut crc: u64 = u64::MAX;
133 for &byte in data {
134 crc ^= u64::from(byte) << 56;
135 for _ in 0..8 {
136 if crc & (1 << 63) != 0 {
137 crc = (crc << 1) ^ POLY;
138 } else {
139 crc <<= 1;
140 }
141 }
142 }
143 !crc
144}
145
146pub(crate) fn fingerprint_dir(dir: &Path) -> Result<u64> {
151 if !dir.exists() {
152 return Ok(0);
153 }
154
155 let mut entries: BTreeMap<String, (u64, u64)> = BTreeMap::new();
156
157 let read_dir = fs::read_dir(dir).with_context(|| format!("read dir {}", dir.display()))?;
158
159 for entry in read_dir {
160 let entry = entry?;
161 let name = entry.file_name().to_string_lossy().to_string();
162
163 if !name.ends_with(".events") {
164 continue;
165 }
166
167 let meta = entry.metadata()?;
168 let size = meta.len();
169 let mtime_ns = meta
170 .modified()
171 .ok()
172 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
173 .map_or(0, |d| u64::try_from(d.as_nanos()).unwrap_or(u64::MAX));
174
175 entries.insert(name, (size, mtime_ns));
176 }
177
178 let mut hash: u64 = 0xcbf2_9ce4_8422_2325;
179 for (name, (size, mtime)) in &entries {
180 for byte in name.bytes() {
181 hash ^= u64::from(byte);
182 hash = hash.wrapping_mul(0x0100_0000_01b3);
183 }
184 for byte in size.to_le_bytes() {
185 hash ^= u64::from(byte);
186 hash = hash.wrapping_mul(0x0100_0000_01b3);
187 }
188 for byte in mtime.to_le_bytes() {
189 hash ^= u64::from(byte);
190 hash = hash.wrapping_mul(0x0100_0000_01b3);
191 }
192 }
193
194 Ok(hash)
195}
196
197#[derive(Debug, Clone, PartialEq, Eq)]
203pub struct CacheHeader {
204 pub version: u8,
206 pub column_count: u8,
208 pub row_count: u64,
210 pub created_at_us: u64,
212 pub data_crc64: u64,
214}
215
216impl CacheHeader {
217 #[must_use]
220 pub const fn new(row_count: u64, created_at_us: u64) -> Self {
221 Self {
222 version: CACHE_VERSION,
223 column_count: {
224 const { assert!(COLUMN_COUNT <= u8::MAX as usize) };
225 #[allow(clippy::cast_possible_truncation)]
226 {
227 COLUMN_COUNT as u8
228 }
229 },
230 row_count,
231 created_at_us,
232 data_crc64: 0,
233 }
234 }
235
236 pub fn encode(&mut self, cols: &CacheColumns) -> Result<Vec<u8>, CacheError> {
245 let mut col_bufs: Vec<Vec<u8>> = vec![Vec::new(); COLUMN_COUNT];
247
248 TimestampCodec::encode(&cols.timestamps, &mut col_bufs[COL_TIMESTAMPS])?;
249 InternedStringCodec::encode(&cols.agents, &mut col_bufs[COL_AGENTS])?;
250 EventTypeCodec::encode(&cols.event_types, &mut col_bufs[COL_EVENT_TYPES])?;
251 ItemIdCodec::encode(&cols.item_ids, &mut col_bufs[COL_ITEM_IDS])?;
252 InternedStringCodec::encode(&cols.parents, &mut col_bufs[COL_PARENTS])?;
253 RawBytesCodec::encode(&cols.itc, &mut col_bufs[COL_ITC])?;
254 ValueCodec::encode(&cols.values, &mut col_bufs[COL_VALUES])?;
255
256 let offsets_section_size = COLUMN_COUNT * 8; let header_and_offsets = HEADER_SIZE + offsets_section_size;
259 let mut offsets: Vec<u64> = Vec::with_capacity(COLUMN_COUNT);
260 let mut cur = header_and_offsets as u64;
261 for buf in &col_bufs {
262 offsets.push(cur);
263 cur += buf.len() as u64;
264 }
265
266 let mut all_col_bytes: Vec<u8> = Vec::new();
268 for buf in &col_bufs {
269 all_col_bytes.extend_from_slice(buf);
270 }
271 self.data_crc64 = checksum(&all_col_bytes);
272
273 let total = header_and_offsets + all_col_bytes.len();
275 let mut out = Vec::with_capacity(total);
276
277 out.extend_from_slice(&CACHE_MAGIC);
279 out.push(self.version);
280 out.push(self.column_count);
281 out.extend_from_slice(&0u16.to_le_bytes()); out.extend_from_slice(&self.row_count.to_le_bytes());
283 out.extend_from_slice(&self.created_at_us.to_le_bytes());
284 out.extend_from_slice(&self.data_crc64.to_le_bytes());
285 debug_assert_eq!(out.len(), HEADER_SIZE);
286
287 for offset in &offsets {
289 out.extend_from_slice(&offset.to_le_bytes());
290 }
291
292 out.extend_from_slice(&all_col_bytes);
294
295 Ok(out)
296 }
297
298 pub fn decode(data: &[u8]) -> Result<(Self, CacheColumns), CacheError> {
313 if data.len() < HEADER_SIZE {
314 return Err(CacheError::UnexpectedEof);
315 }
316
317 let magic: [u8; 4] = data[0..4].try_into().expect("slice is 4 bytes");
319 if magic != CACHE_MAGIC {
320 return Err(CacheError::InvalidMagic(magic));
321 }
322
323 let version = data[4];
324 if version > CACHE_VERSION {
325 return Err(CacheError::UnsupportedVersion(version));
326 }
327
328 let column_count = data[5] as usize;
329 let row_count = u64::from_le_bytes(data[8..16].try_into().expect("slice is 8 bytes"));
331 let created_at_us = u64::from_le_bytes(data[16..24].try_into().expect("slice is 8 bytes"));
332 let stored_crc = u64::from_le_bytes(data[24..32].try_into().expect("slice is 8 bytes"));
333
334 let offsets_start = HEADER_SIZE;
336 let offsets_end = offsets_start + column_count * 8;
337 if data.len() < offsets_end {
338 return Err(CacheError::UnexpectedEof);
339 }
340
341 let mut offsets: Vec<u64> = Vec::with_capacity(column_count);
342 for i in 0..column_count {
343 let start = offsets_start + i * 8;
344 let offset =
345 u64::from_le_bytes(data[start..start + 8].try_into().expect("slice is 8 bytes"));
346 offsets.push(offset);
347 }
348
349 let col_data_start = offsets_end;
351 if data.len() < col_data_start {
352 return Err(CacheError::UnexpectedEof);
353 }
354 let col_data = &data[col_data_start..];
355 let actual_crc = checksum(col_data);
356 if actual_crc != stored_crc {
357 return Err(CacheError::DataCorrupted(format!(
358 "CRC mismatch: expected {stored_crc:#018x}, got {actual_crc:#018x}"
359 )));
360 }
361
362 if column_count < COLUMN_COUNT {
364 return Err(CacheError::ColumnCountMismatch {
365 expected: COLUMN_COUNT,
366 actual: column_count,
367 });
368 }
369
370 let count = usize::try_from(row_count).map_err(|_| {
371 CacheError::DataCorrupted(format!("row_count {row_count} exceeds platform usize"))
372 })?;
373
374 let col_slice = |col_idx: usize| -> Result<&[u8], CacheError> {
376 let start = usize::try_from(offsets[col_idx]).map_err(|_| CacheError::UnexpectedEof)?;
377 if start > data.len() {
378 return Err(CacheError::UnexpectedEof);
379 }
380 let end = if col_idx + 1 < column_count {
382 usize::try_from(offsets[col_idx + 1]).map_err(|_| CacheError::UnexpectedEof)?
383 } else {
384 data.len()
385 };
386 if end > data.len() {
387 return Err(CacheError::UnexpectedEof);
388 }
389 Ok(&data[start..end])
390 };
391
392 let (timestamps, _) = TimestampCodec::decode(col_slice(COL_TIMESTAMPS)?, count)?;
394 let (agents, _) = InternedStringCodec::decode(col_slice(COL_AGENTS)?, count)?;
395 let (event_types, _) = EventTypeCodec::decode(col_slice(COL_EVENT_TYPES)?, count)?;
396 let (item_ids, _) = ItemIdCodec::decode(col_slice(COL_ITEM_IDS)?, count)?;
397 let (parents, _) = InternedStringCodec::decode(col_slice(COL_PARENTS)?, count)?;
398 let (itc, _) = RawBytesCodec::decode(col_slice(COL_ITC)?, count)?;
399 let (values, _) = ValueCodec::decode(col_slice(COL_VALUES)?, count)?;
400
401 let cols = CacheColumns {
402 timestamps,
403 agents,
404 event_types,
405 item_ids,
406 parents,
407 itc,
408 values,
409 };
410
411 let header = Self {
412 version,
413 column_count: u8::try_from(column_count).map_err(|_| {
414 CacheError::DataCorrupted(format!("column_count {column_count} exceeds u8"))
415 })?,
416 row_count,
417 created_at_us,
418 data_crc64: stored_crc,
419 };
420
421 Ok((header, cols))
422 }
423}
424
425pub fn encode_events(events: &[Event], created_at_us: u64) -> Result<Vec<u8>, CacheError> {
438 let cols = CacheColumns::from_events(events)?;
439 let mut header = CacheHeader::new(events.len() as u64, created_at_us);
440 header.encode(&cols)
441}
442
443pub fn decode_events(data: &[u8]) -> Result<(CacheHeader, Vec<Event>), CacheError> {
456 let (header, cols) = CacheHeader::decode(data)?;
457 let events = cols.into_events().map_err(CacheError::EventDataError)?;
458 Ok((header, events))
459}
460
461#[cfg(test)]
466mod tests {
467 use super::*;
468 use crate::event::data::CreateData;
469 use crate::event::data::MoveData;
470 use crate::event::{Event, EventData, EventType};
471 use crate::model::item::{Kind, State, Urgency};
472 use crate::model::item_id::ItemId;
473 use std::collections::BTreeMap;
474
475 fn make_event(ts: i64, agent: &str, et: EventType, item: &str) -> Event {
476 use crate::event::data::{
477 AssignAction, AssignData, CommentData, CompactData, DeleteData, LinkData, RedactData,
478 SnapshotData, UnlinkData, UpdateData,
479 };
480 let data = match et {
481 EventType::Create => EventData::Create(CreateData {
482 title: format!("Item {item}"),
483 kind: Kind::Task,
484 size: None,
485 urgency: Urgency::Default,
486 labels: vec![],
487 parent: None,
488 causation: None,
489 description: None,
490 extra: BTreeMap::new(),
491 }),
492 EventType::Update => EventData::Update(UpdateData {
493 field: "title".to_string(),
494 value: serde_json::json!("new title"),
495 extra: BTreeMap::new(),
496 }),
497 EventType::Move => EventData::Move(MoveData {
498 state: State::Doing,
499 reason: None,
500 extra: BTreeMap::new(),
501 }),
502 EventType::Assign => EventData::Assign(AssignData {
503 agent: "assignee".to_string(),
504 action: AssignAction::Assign,
505 extra: BTreeMap::new(),
506 }),
507 EventType::Comment => EventData::Comment(CommentData {
508 body: "A comment".to_string(),
509 extra: BTreeMap::new(),
510 }),
511 EventType::Link => EventData::Link(LinkData {
512 target: "bn-other".to_string(),
513 link_type: "blocks".to_string(),
514 extra: BTreeMap::new(),
515 }),
516 EventType::Unlink => EventData::Unlink(UnlinkData {
517 target: "bn-other".to_string(),
518 link_type: None,
519 extra: BTreeMap::new(),
520 }),
521 EventType::Delete => EventData::Delete(DeleteData {
522 reason: None,
523 extra: BTreeMap::new(),
524 }),
525 EventType::Compact => EventData::Compact(CompactData {
526 summary: "TL;DR".to_string(),
527 extra: BTreeMap::new(),
528 }),
529 EventType::Snapshot => EventData::Snapshot(SnapshotData {
530 state: serde_json::json!({"id": item}),
531 extra: BTreeMap::new(),
532 }),
533 EventType::Redact => EventData::Redact(RedactData {
534 target_hash: "blake3:abc".to_string(),
535 reason: "oops".to_string(),
536 extra: BTreeMap::new(),
537 }),
538 };
539 Event {
540 wall_ts_us: ts,
541 agent: agent.to_string(),
542 itc: "itc:AQ".to_string(),
543 parents: vec![],
544 event_type: et,
545 item_id: ItemId::new_unchecked(item),
546 data,
547 event_hash: format!("blake3:{ts:016x}"),
548 }
549 }
550
551 #[test]
554 fn magic_bytes_are_bnch() {
555 assert_eq!(&CACHE_MAGIC, b"BNCH");
556 }
557
558 #[test]
559 fn header_size_is_32() {
560 assert_eq!(HEADER_SIZE, 32);
561 }
562
563 #[test]
566 fn new_header_defaults() {
567 let h = CacheHeader::new(42, 1_700_000_000_000);
568 assert_eq!(h.version, CACHE_VERSION);
569 assert_eq!(h.column_count, COLUMN_COUNT as u8);
570 assert_eq!(h.row_count, 42);
571 assert_eq!(h.created_at_us, 1_700_000_000_000);
572 assert_eq!(h.data_crc64, 0); }
574
575 #[test]
578 fn checksum_empty() {
579 let c = checksum(&[]);
580 assert_eq!(c, checksum(&[]));
582 }
583
584 #[test]
585 fn checksum_different_data() {
586 assert_ne!(checksum(b"hello"), checksum(b"world"));
587 }
588
589 #[test]
590 fn checksum_single_bit_flip() {
591 let data = b"hello world";
592 let mut flipped = data.to_vec();
593 flipped[5] ^= 0x01;
594 assert_ne!(checksum(data), checksum(&flipped));
595 }
596
597 #[test]
600 fn encode_decode_empty() {
601 let bytes = encode_events(&[], 0).unwrap();
602 let (header, events) = decode_events(&bytes).unwrap();
603 assert_eq!(header.row_count, 0);
604 assert!(events.is_empty());
605 }
606
607 #[test]
608 fn encode_decode_single_event() {
609 let event = make_event(1_700_000_000_000, "claude", EventType::Create, "bn-a7x");
610 let bytes = encode_events(std::slice::from_ref(&event), 9999).unwrap();
611 let (header, events) = decode_events(&bytes).unwrap();
612
613 assert_eq!(header.row_count, 1);
614 assert_eq!(header.created_at_us, 9999);
615 assert_eq!(events.len(), 1);
616 assert_eq!(events[0].wall_ts_us, 1_700_000_000_000);
617 assert_eq!(events[0].agent, "claude");
618 assert_eq!(events[0].event_type, EventType::Create);
619 assert_eq!(events[0].item_id.as_str(), "bn-a7x");
620 }
621
622 #[test]
623 fn encode_decode_multiple_events() {
624 let events = vec![
625 make_event(1_000, "alice", EventType::Create, "bn-a7x"),
626 make_event(2_000, "bob", EventType::Move, "bn-a7x"),
627 make_event(3_000, "alice", EventType::Create, "bn-b8y"),
628 make_event(4_000, "carol", EventType::Move, "bn-b8y"),
629 ];
630 let bytes = encode_events(&events, 0).unwrap();
631 let (header, decoded) = decode_events(&bytes).unwrap();
632
633 assert_eq!(header.row_count, 4);
634 assert_eq!(decoded.len(), 4);
635 assert_eq!(decoded[0].wall_ts_us, 1_000);
636 assert_eq!(decoded[1].agent, "bob");
637 assert_eq!(decoded[2].item_id.as_str(), "bn-b8y");
638 assert_eq!(decoded[3].event_type, EventType::Move);
639 }
640
641 #[test]
642 fn encode_decode_all_event_types() {
643 let all_types = EventType::ALL;
644 let events: Vec<Event> = all_types
645 .iter()
646 .enumerate()
647 .map(|(i, &et)| make_event((i as i64 + 1) * 1000, "agent", et, "bn-a7x"))
648 .collect();
649
650 let bytes = encode_events(&events, 0).unwrap();
651 let (_, decoded) = decode_events(&bytes).unwrap();
652
653 assert_eq!(decoded.len(), all_types.len());
654 for (i, et) in all_types.iter().enumerate() {
655 assert_eq!(decoded[i].event_type, *et, "mismatch at index {i}");
656 }
657 }
658
659 #[test]
662 fn decode_bad_magic() {
663 let mut bytes = encode_events(&[], 0).unwrap();
664 bytes[0] = 0xFF; let err = decode_events(&bytes).unwrap_err();
666 assert!(matches!(err, CacheError::InvalidMagic(_)));
667 }
668
669 #[test]
670 fn decode_unsupported_version() {
671 let mut bytes = encode_events(&[], 0).unwrap();
672 bytes[4] = 99; let err = decode_events(&bytes).unwrap_err();
674 assert!(matches!(err, CacheError::UnsupportedVersion(99)));
675 }
676
677 #[test]
678 fn decode_corrupted_crc() {
679 let mut bytes =
680 encode_events(&[make_event(1_000, "a", EventType::Create, "bn-a7x")], 0).unwrap();
681 let col_start = HEADER_SIZE + COLUMN_COUNT * 8;
683 if col_start < bytes.len() {
684 bytes[col_start] ^= 0xFF;
685 }
686 let err = decode_events(&bytes).unwrap_err();
687 assert!(
689 matches!(err, CacheError::DataCorrupted(_)),
690 "expected DataCorrupted, got {err:?}"
691 );
692 }
693
694 #[test]
695 fn decode_truncated_data() {
696 let bytes =
697 encode_events(&[make_event(1_000, "a", EventType::Create, "bn-a7x")], 0).unwrap();
698 let truncated = &bytes[..bytes.len() / 2];
699 let err = decode_events(truncated).unwrap_err();
700 assert!(
701 matches!(
702 err,
703 CacheError::UnexpectedEof | CacheError::DataCorrupted(_)
704 ),
705 "expected truncation error, got {err:?}"
706 );
707 }
708
709 #[test]
712 fn encode_decode_large_batch() {
713 let n = 500;
714 let events: Vec<Event> = (0..n)
715 .map(|i| {
716 make_event(
717 i as i64 * 1000,
718 if i % 3 == 0 {
719 "alice"
720 } else if i % 3 == 1 {
721 "bob"
722 } else {
723 "carol"
724 },
725 if i % 2 == 0 {
726 EventType::Create
727 } else {
728 EventType::Move
729 },
730 &format!("bn-{:03}", i % 50),
731 )
732 })
733 .collect();
734
735 let bytes = encode_events(&events, 42).unwrap();
736 let (header, decoded) = decode_events(&bytes).unwrap();
737
738 assert_eq!(header.row_count, n as u64);
739 assert_eq!(decoded.len(), n);
740 for (i, (orig, dec)) in events.iter().zip(decoded.iter()).enumerate() {
741 assert_eq!(orig.wall_ts_us, dec.wall_ts_us, "ts mismatch at {i}");
742 assert_eq!(orig.agent, dec.agent, "agent mismatch at {i}");
743 assert_eq!(orig.event_type, dec.event_type, "type mismatch at {i}");
744 assert_eq!(orig.item_id, dec.item_id, "item mismatch at {i}");
745 }
746 }
747}