1use std::fs::{File, OpenOptions};
2use std::io::{BufReader, BufWriter, Read, Write};
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5
6use arc_swap::ArcSwap;
7
8const MAGIC: &[u8; 8] = b"MCPMEMV1";
9const MAGIC_CRC: &[u8; 8] = b"MCPMEMV2";
10const MAX_RECORD_BYTES: u32 = 1 << 20;
11
12#[repr(u8)]
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum RecordKind {
15 CreateEntity = 0,
16 CreateRelation = 1,
17 AddObservations = 2,
18 DeleteEntity = 3,
19 DeleteObservations = 4,
20 DeleteRelation = 5,
21 TxnBegin = 6,
26 TxnCommit = 7,
28}
29
30impl RecordKind {
31 #[inline]
32 pub const fn from_u8(v: u8) -> Option<RecordKind> {
33 Some(match v {
34 0 => RecordKind::CreateEntity,
35 1 => RecordKind::CreateRelation,
36 2 => RecordKind::AddObservations,
37 3 => RecordKind::DeleteEntity,
38 4 => RecordKind::DeleteObservations,
39 5 => RecordKind::DeleteRelation,
40 6 => RecordKind::TxnBegin,
41 7 => RecordKind::TxnCommit,
42 _ => return None,
43 })
44 }
45}
46
47pub struct BinaryStore {
48 writer: BufWriter<File>,
49 path: PathBuf,
50 has_crc: bool,
53 pub(crate) sync_slot: Arc<ArcSwap<File>>,
59}
60
61impl BinaryStore {
62 pub const fn path(&self) -> &PathBuf {
63 &self.path
64 }
65
66 pub fn new(path: &Path) -> std::io::Result<Self> {
67 Self::new_with_slot(path, None)
68 }
69
70 pub fn new_with_slot(
75 path: &Path,
76 slot: Option<Arc<ArcSwap<File>>>,
77 ) -> std::io::Result<Self> {
78 let exists = path.exists();
79 let file = OpenOptions::new()
80 .create(true)
81 .append(true)
82 .read(true)
83 .open(path)?;
84
85 let handle = Arc::new(file.try_clone()?);
86 let sync_slot = match slot {
87 Some(s) => {
88 s.store(handle);
89 s
90 }
91 None => Arc::new(ArcSwap::new(handle)),
92 };
93
94 let (has_crc, file) = if !exists {
96 let f = OpenOptions::new()
97 .create(true)
98 .append(true)
99 .read(false)
100 .open(path)?;
101 let mut w = BufWriter::with_capacity(65536, f);
102 w.write_all(MAGIC_CRC)?;
103 w.flush()?;
104 (true, w.into_inner().map_err(|e| e.into_error())?)
105 } else {
106 let probe_file = OpenOptions::new().read(true).open(path)?;
108 let mut probe = [0u8; 8];
109 let has_crc = match std::io::BufReader::new(&probe_file).read_exact(&mut probe) {
110 Ok(()) => &probe == MAGIC_CRC,
111 _ => false,
112 };
113 drop(probe_file);
114 let f = OpenOptions::new()
115 .create(true)
116 .append(true)
117 .read(false)
118 .open(path)?;
119 (has_crc, f)
120 };
121
122 let writer = BufWriter::with_capacity(65536, file);
123
124 Ok(Self {
125 writer,
126 path: path.to_path_buf(),
127 has_crc,
128 sync_slot,
129 })
130 }
131
132 pub fn write_record(&mut self, kind: RecordKind, payload: &[u8]) -> std::io::Result<()> {
133 let crc_len: usize = if self.has_crc { 4 } else { 0 };
134 let total_len = 4 + 1 + payload.len() + crc_len;
135 if total_len as u32 > MAX_RECORD_BYTES {
136 return Err(std::io::Error::new(
137 std::io::ErrorKind::InvalidInput,
138 "Record too large",
139 ));
140 }
141 self.writer.write_all(&(total_len as u32).to_le_bytes())?;
142 self.writer.write_all(&[kind as u8])?;
143 self.writer.write_all(payload)?;
144 if self.has_crc {
145 let crc = crc32fast::hash(payload);
146 self.writer.write_all(&crc.to_le_bytes())?;
147 }
148 Ok(())
149 }
150
151 pub fn flush(&mut self) -> std::io::Result<()> {
153 self.writer.flush()
154 }
155
156 pub fn sync(&mut self) -> std::io::Result<()> {
158 self.writer.get_ref().sync_data()
159 }
160
161 pub fn flush_and_sync(&mut self) -> std::io::Result<()> {
162 self.flush()?;
163 self.sync()
164 }
165
166 pub fn replay<F>(&self, mut callback: F) -> std::io::Result<()>
167 where
168 F: FnMut(RecordKind, &[u8]),
169 {
170 let file = match OpenOptions::new().read(true).open(&self.path) {
171 Ok(f) => f,
172 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()),
173 Err(e) => return Err(e),
174 };
175
176 let meta = file.metadata()?;
177 if meta.len() == 0 {
178 return Ok(());
179 }
180
181 let mut reader = BufReader::with_capacity(65536, file);
182 let mut magic = [0u8; 8];
183
184 match reader.read_exact(&mut magic) {
185 Ok(()) => {}
186 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(()),
187 Err(e) => return Err(e),
188 }
189
190 let has_crc = if &magic == MAGIC_CRC {
191 true
192 } else if &magic == MAGIC {
193 false
194 } else {
195 return Ok(());
196 };
197
198 let mut payload_buf = Vec::with_capacity(4096);
199
200 loop {
201 let mut len_buf = [0u8; 4];
202 match reader.read_exact(&mut len_buf) {
203 Ok(()) => {}
204 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(()),
205 Err(e) => return Err(e),
206 }
207 let total_len = u32::from_le_bytes(len_buf) as usize;
208 if total_len < 5 || total_len > MAX_RECORD_BYTES as usize {
209 return Err(std::io::Error::new(
210 std::io::ErrorKind::InvalidData,
211 format!("Invalid record length: {total_len}"),
212 ));
213 }
214 let payload_len = if has_crc {
215 total_len.checked_sub(5 + 4).ok_or_else(|| {
216 std::io::Error::new(std::io::ErrorKind::InvalidData, "Record too short for CRC")
217 })?
218 } else {
219 total_len - 5
220 };
221
222 let mut kind_buf = [0u8; 1];
227 match reader.read_exact(&mut kind_buf) {
228 Ok(()) => {}
229 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(()),
230 Err(e) => return Err(e),
231 }
232 let kind_val = kind_buf[0];
233
234 payload_buf.clear();
235 payload_buf.resize(payload_len, 0);
236 if payload_len > 0 {
237 match reader.read_exact(&mut payload_buf) {
238 Ok(()) => {}
239 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(()),
240 Err(e) => return Err(e),
241 }
242 }
243
244 if has_crc {
247 let mut crc_buf = [0u8; 4];
248 match reader.read_exact(&mut crc_buf) {
249 Ok(()) => {}
250 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(()),
251 Err(e) => return Err(e),
252 }
253 let expected = u32::from_le_bytes(crc_buf);
254 if crc32fast::hash(&payload_buf) != expected {
255 tracing::warn!("CRC mismatch at offset — torn tail detected, stopping replay");
256 return Ok(());
257 }
258 }
259
260 if let Some(kind) = RecordKind::from_u8(kind_val) {
261 callback(kind, &payload_buf);
262 } else {
263 tracing::warn!("Unknown record kind byte {kind_val}, skipping");
264 }
265 }
266 }
267
268 pub fn close(&mut self) -> std::io::Result<()> {
269 self.flush_and_sync()
270 }
271
272 pub fn reopen_truncated(&mut self) -> std::io::Result<()> {
275 self.writer.flush()?;
276 let file = OpenOptions::new()
277 .create(true)
278 .write(true)
279 .truncate(true)
280 .open(&self.path)?;
281 self.sync_slot.store(Arc::new(file.try_clone()?));
284 let mut writer = BufWriter::with_capacity(65536, file);
285 writer.write_all(MAGIC_CRC)?;
286 writer.flush()?;
287 self.writer = writer;
288 self.has_crc = true;
289 Ok(())
290 }
291}
292
293fn encode_str(buf: &mut Vec<u8>, s: &str) -> std::io::Result<()> {
296 let bytes = s.as_bytes();
297 let len = bytes.len();
298 if len > u16::MAX as usize {
299 return Err(std::io::Error::new(
300 std::io::ErrorKind::InvalidInput,
301 format!("string too long (max {} bytes, got {len})", u16::MAX),
302 ));
303 }
304 buf.extend_from_slice(&(len as u16).to_le_bytes());
305 buf.extend_from_slice(bytes);
306 Ok(())
307}
308
309fn decode_str<'a>(data: &'a [u8], offset: &mut usize) -> Option<&'a str> {
310 if *offset + 2 > data.len() {
311 return None;
312 }
313 let len = u16::from_le_bytes([data[*offset], data[*offset + 1]]) as usize;
314 *offset += 2;
315 if *offset + len > data.len() {
316 return None;
317 }
318 let s = std::str::from_utf8(&data[*offset..*offset + len]).ok()?;
319 *offset += len;
320 Some(s)
321}
322
323fn decode_count(data: &[u8], offset: &mut usize) -> Option<usize> {
324 if *offset + 4 > data.len() {
325 return None;
326 }
327 let count = u32::from_le_bytes([
328 data[*offset],
329 data[*offset + 1],
330 data[*offset + 2],
331 data[*offset + 3],
332 ]) as usize;
333 *offset += 4;
334 Some(count)
335}
336
337pub fn encode_create_entity(buf: &mut Vec<u8>, name: &str, entity_type: &str, observations: &[String]) -> std::io::Result<()> {
338 encode_str(buf, name)?;
339 encode_str(buf, entity_type)?;
340 buf.extend_from_slice(&(observations.len() as u32).to_le_bytes());
341 for obs in observations {
342 encode_str(buf, obs)?;
343 }
344 Ok(())
345}
346
347pub fn decode_create_entity(data: &[u8]) -> Option<(&str, &str, Vec<&str>)> {
348 let mut offset = 0;
349 let name = decode_str(data, &mut offset)?;
350 let entity_type = decode_str(data, &mut offset)?;
351 let count = decode_count(data, &mut offset)?;
352 let mut observations = Vec::with_capacity(count);
353 for _ in 0..count {
354 observations.push(decode_str(data, &mut offset)?);
355 }
356 Some((name, entity_type, observations))
357}
358
359pub fn encode_create_relation(buf: &mut Vec<u8>, from: &str, to: &str, relation_type: &str) -> std::io::Result<()> {
360 encode_str(buf, from)?;
361 encode_str(buf, to)?;
362 encode_str(buf, relation_type)
363}
364
365pub fn decode_create_relation(data: &[u8]) -> Option<(&str, &str, &str)> {
366 let mut offset = 0;
367 let from = decode_str(data, &mut offset)?;
368 let to = decode_str(data, &mut offset)?;
369 let relation_type = decode_str(data, &mut offset)?;
370 Some((from, to, relation_type))
371}
372
373pub fn encode_add_observations(buf: &mut Vec<u8>, name: &str, observations: &[String]) -> std::io::Result<()> {
374 encode_str(buf, name)?;
375 buf.extend_from_slice(&(observations.len() as u32).to_le_bytes());
376 for obs in observations {
377 encode_str(buf, obs)?;
378 }
379 Ok(())
380}
381
382pub fn decode_add_observations(data: &[u8]) -> Option<(&str, Vec<&str>)> {
383 let mut offset = 0;
384 let name = decode_str(data, &mut offset)?;
385 let count = decode_count(data, &mut offset)?;
386 let mut observations = Vec::with_capacity(count);
387 for _ in 0..count {
388 observations.push(decode_str(data, &mut offset)?);
389 }
390 Some((name, observations))
391}
392
393pub fn encode_delete_entity(buf: &mut Vec<u8>, name: &str) -> std::io::Result<()> {
394 encode_str(buf, name)
395}
396
397pub fn decode_delete_entity(data: &[u8]) -> Option<&str> {
398 let mut offset = 0;
399 decode_str(data, &mut offset)
400}
401
402pub fn encode_delete_observations(buf: &mut Vec<u8>, name: &str, observations: &[String]) -> std::io::Result<()> {
403 encode_str(buf, name)?;
404 buf.extend_from_slice(&(observations.len() as u32).to_le_bytes());
405 for obs in observations {
406 encode_str(buf, obs)?;
407 }
408 Ok(())
409}
410
411pub fn decode_delete_observations(data: &[u8]) -> Option<(&str, Vec<&str>)> {
412 decode_add_observations(data)
413}
414
415pub fn encode_delete_relation(buf: &mut Vec<u8>, from: &str, to: &str, relation_type: &str) -> std::io::Result<()> {
416 encode_str(buf, from)?;
417 encode_str(buf, to)?;
418 encode_str(buf, relation_type)
419}
420
421pub fn decode_delete_relation(data: &[u8]) -> Option<(&str, &str, &str)> {
422 decode_create_relation(data)
423}
424
425#[cfg(test)]
426mod tests {
427 use super::*;
428 use std::sync::atomic::{AtomicU64, Ordering};
429
430 static COUNTER: AtomicU64 = AtomicU64::new(0);
431
432 fn tmp_path() -> PathBuf {
433 let pid = std::process::id();
434 let seq = COUNTER.fetch_add(1, Ordering::SeqCst);
435 std::env::temp_dir().join(format!("mcp_store_test_{pid}_{seq}.bin"))
436 }
437
438 #[test]
439 fn test_write_and_replay() {
440 let path = tmp_path();
441 let mut store = BinaryStore::new(&path).unwrap();
442
443 let mut buf = Vec::new();
444 encode_create_entity(&mut buf, "Alice", "person", &["likes coffee".into()]).unwrap();
445 store.write_record(RecordKind::CreateEntity, &buf).unwrap();
446
447 buf.clear();
448 encode_create_entity(&mut buf, "Bob", "person", &[]).unwrap();
449 store.write_record(RecordKind::CreateEntity, &buf).unwrap();
450
451 drop(store);
452
453 let mut replayed: Vec<(RecordKind, Vec<u8>)> = Vec::new();
454 let replay_store = BinaryStore::new(&path).unwrap();
455 replay_store
456 .replay(|kind, data| {
457 replayed.push((kind, data.to_vec()));
458 })
459 .unwrap();
460
461 assert_eq!(replayed.len(), 2);
462 assert_eq!(replayed[0].0, RecordKind::CreateEntity);
463 assert_eq!(
464 decode_create_entity(&replayed[0].1).unwrap().0,
465 "Alice"
466 );
467
468 let _ = std::fs::remove_file(&path);
469 }
470
471 #[test]
472 fn test_encode_decode_roundtrip() {
473 let mut buf = Vec::new();
474 encode_create_entity(
475 &mut buf,
476 "TestEntity",
477 "test_type",
478 &["obs1".into(), "obs2".into()],
479 )
480 .unwrap();
481 let (name, etype, obs) = decode_create_entity(&buf).unwrap();
482 assert_eq!(name, "TestEntity");
483 assert_eq!(etype, "test_type");
484 assert_eq!(obs, vec!["obs1", "obs2"]);
485 }
486
487 #[test]
488 fn test_empty_file() {
489 let path = tmp_path();
490 let store = BinaryStore::new(&path).unwrap();
491 drop(store);
492
493 let mut count = 0;
494 let replay_store = BinaryStore::new(&path).unwrap();
495 replay_store.replay(|_, _| count += 1).unwrap();
496 assert_eq!(count, 0);
497 let _ = std::fs::remove_file(&path);
498 }
499
500 #[test]
501 fn test_write_all_record_kinds() {
502 let path = tmp_path();
503 let mut store = BinaryStore::new(&path).unwrap();
504 let mut buf = Vec::new();
505
506 encode_create_entity(&mut buf, "E1", "t1", &["o1".into()]).unwrap();
508 store.write_record(RecordKind::CreateEntity, &buf).unwrap();
509
510 buf.clear();
511 encode_create_relation(&mut buf, "E1", "E2", "knows").unwrap();
512 store.write_record(RecordKind::CreateRelation, &buf).unwrap();
513
514 buf.clear();
515 encode_add_observations(&mut buf, "E1", &["o2".into()]).unwrap();
516 store.write_record(RecordKind::AddObservations, &buf).unwrap();
517
518 buf.clear();
519 encode_delete_entity(&mut buf, "E1").unwrap();
520 store.write_record(RecordKind::DeleteEntity, &buf).unwrap();
521
522 buf.clear();
523 encode_delete_observations(&mut buf, "E1", &["o1".into()]).unwrap();
524 store.write_record(RecordKind::DeleteObservations, &buf).unwrap();
525
526 buf.clear();
527 encode_delete_relation(&mut buf, "E1", "E2", "knows").unwrap();
528 store.write_record(RecordKind::DeleteRelation, &buf).unwrap();
529
530 drop(store);
531
532 let mut kinds = Vec::new();
533 let replay_store = BinaryStore::new(&path).unwrap();
534 replay_store
535 .replay(|kind, _| {
536 kinds.push(kind);
537 })
538 .unwrap();
539
540 assert_eq!(kinds.len(), 6);
541 assert_eq!(kinds[0], RecordKind::CreateEntity);
542 assert_eq!(kinds[1], RecordKind::CreateRelation);
543 assert_eq!(kinds[2], RecordKind::AddObservations);
544 assert_eq!(kinds[3], RecordKind::DeleteEntity);
545 assert_eq!(kinds[4], RecordKind::DeleteObservations);
546 assert_eq!(kinds[5], RecordKind::DeleteRelation);
547 let _ = std::fs::remove_file(&path);
548 }
549
550 #[test]
551 fn test_reopen_truncated() {
552 let path = tmp_path();
553 let mut store = BinaryStore::new(&path).unwrap();
554 let mut buf = Vec::new();
555 encode_create_entity(&mut buf, "E1", "t1", &[]).unwrap();
556 store.write_record(RecordKind::CreateEntity, &buf).unwrap();
557 drop(store);
558
559 let mut store2 = BinaryStore::new(&path).unwrap();
561 store2.reopen_truncated().unwrap();
562
563 let mut buf2 = Vec::new();
564 encode_create_entity(&mut buf2, "E2", "t2", &[]).unwrap();
565 store2.write_record(RecordKind::CreateEntity, &buf2).unwrap();
566 drop(store2);
567
568 let mut names = Vec::new();
569 let replay_store = BinaryStore::new(&path).unwrap();
570 replay_store
571 .replay(|_, data| {
572 if let Some((name, _, _)) = decode_create_entity(data) {
573 names.push(name.to_string());
574 }
575 })
576 .unwrap();
577
578 assert_eq!(names, vec!["E2"]);
580 let _ = std::fs::remove_file(&path);
581 }
582
583 #[test]
584 fn test_encode_decode_add_observations() {
585 let mut buf = Vec::new();
586 encode_add_observations(&mut buf, "Alice", &["obs1".into(), "obs2".into()]).unwrap();
587 let (name, obs) = decode_add_observations(&buf).unwrap();
588 assert_eq!(name, "Alice");
589 assert_eq!(obs, vec!["obs1", "obs2"]);
590 }
591
592 #[test]
593 fn test_encode_decode_delete_entity() {
594 let mut buf = Vec::new();
595 encode_delete_entity(&mut buf, "ToDelete").unwrap();
596 let name = decode_delete_entity(&buf).unwrap();
597 assert_eq!(name, "ToDelete");
598 }
599
600 #[test]
601 fn test_encode_decode_delete_observations() {
602 let mut buf = Vec::new();
603 encode_delete_observations(&mut buf, "Alice", &["o1".into()]).unwrap();
604 let (name, obs) = decode_delete_observations(&buf).unwrap();
605 assert_eq!(name, "Alice");
606 assert_eq!(obs, vec!["o1"]);
607 }
608
609 #[test]
610 fn test_encode_decode_delete_relation() {
611 let mut buf = Vec::new();
612 encode_delete_relation(&mut buf, "A", "B", "knows").unwrap();
613 let (from, to, rtype) = decode_delete_relation(&buf).unwrap();
614 assert_eq!(from, "A");
615 assert_eq!(to, "B");
616 assert_eq!(rtype, "knows");
617 }
618
619 #[test]
620 fn test_sync_slot_follows_reopen_truncated() {
621 let path = tmp_path();
624 let mut store = BinaryStore::new(&path).unwrap();
625 let slot = Arc::clone(&store.sync_slot);
626 let before = Arc::as_ptr(&slot.load_full());
627 store.reopen_truncated().unwrap();
628 let after = Arc::as_ptr(&slot.load_full());
629 assert_ne!(before, after, "reopen must publish the new handle into the slot");
630 assert!(Arc::ptr_eq(&slot, &store.sync_slot), "slot identity must be stable");
631 let _ = std::fs::remove_file(&path);
632 }
633
634 #[test]
635 fn test_new_with_slot_reuses_shared_cell() {
636 let path = tmp_path();
639 let store1 = BinaryStore::new(&path).unwrap();
640 let slot = Arc::clone(&store1.sync_slot);
641 let before = Arc::as_ptr(&slot.load_full());
642 drop(store1);
643
644 let store2 = BinaryStore::new_with_slot(&path, Some(Arc::clone(&slot))).unwrap();
645 assert!(Arc::ptr_eq(&slot, &store2.sync_slot), "must reuse the passed slot");
646 let after = Arc::as_ptr(&slot.load_full());
647 assert_ne!(before, after, "reopened handle must be published into the slot");
648 let _ = std::fs::remove_file(&path);
649 }
650
651 #[test]
652 fn test_record_too_large() {
653 let path = tmp_path();
654 let mut store = BinaryStore::new(&path).unwrap();
655 let huge = vec![0u8; (1 << 20) + 1];
656 let result = store.write_record(RecordKind::CreateEntity, &huge);
657 assert!(result.is_err());
658 let _ = std::fs::remove_file(&path);
659 }
660
661 #[test]
662 fn test_multiple_writes_and_replay() {
663 let path = tmp_path();
664 let mut store = BinaryStore::new(&path).unwrap();
665 for i in 0..100 {
666 let mut buf = Vec::new();
667 encode_create_entity(&mut buf, &format!("E{i}"), "type", &[]).unwrap();
668 store.write_record(RecordKind::CreateEntity, &buf).unwrap();
669 }
670 drop(store);
671
672 let mut count = 0;
673 let replay_store = BinaryStore::new(&path).unwrap();
674 replay_store
675 .replay(|kind, _| {
676 assert_eq!(kind, RecordKind::CreateEntity);
677 count += 1;
678 })
679 .unwrap();
680 assert_eq!(count, 100);
681 let _ = std::fs::remove_file(&path);
682 }
683
684 #[test]
685 fn test_truncated_log_handling() {
686 let path = tmp_path();
687 let mut store = BinaryStore::new(&path).unwrap();
688 let mut buf = Vec::new();
689 encode_create_entity(&mut buf, "Alice", "person", &[]).unwrap();
690 store.write_record(RecordKind::CreateEntity, &buf).unwrap();
691 drop(store);
692
693 let file = OpenOptions::new().write(true).open(&path).unwrap();
695 file.set_len(10).unwrap(); drop(file);
697
698 let replay_store = BinaryStore::new(&path).unwrap();
700 let mut count = 0;
701 replay_store.replay(|_, _| count += 1).unwrap();
702 assert_eq!(count, 0);
703 let _ = std::fs::remove_file(&path);
704 }
705
706 #[test]
707 fn test_v1_format_backward_compat() {
708 let path = tmp_path();
713
714 let mut raw = Vec::new();
716 raw.extend_from_slice(b"MCPMEMV1");
717
718 let mut p1 = Vec::new();
719 encode_create_entity(&mut p1, "Alice", "person", &[]).unwrap();
720 let len1: u32 = 4 + 1 + p1.len() as u32;
721 raw.extend_from_slice(&len1.to_le_bytes());
722 raw.extend_from_slice(&[RecordKind::CreateEntity as u8]);
723 raw.extend_from_slice(&p1);
724
725 let mut p2 = Vec::new();
726 encode_create_entity(&mut p2, "Bob", "person", &[]).unwrap();
727 let len2: u32 = 4 + 1 + p2.len() as u32;
728 raw.extend_from_slice(&len2.to_le_bytes());
729 raw.extend_from_slice(&[RecordKind::CreateEntity as u8]);
730 raw.extend_from_slice(&p2);
731
732 std::fs::write(&path, &raw).unwrap();
733
734 let mut store = BinaryStore::new(&path).unwrap();
736
737 let mut p3 = Vec::new();
739 encode_create_entity(&mut p3, "Charlie", "person", &[]).unwrap();
740 store.write_record(RecordKind::CreateEntity, &p3).unwrap();
741 store.flush().unwrap();
742 drop(store);
743
744 let expected_size = raw.len() as u64 + (5 + p3.len()) as u64;
746 assert_eq!(
747 std::fs::metadata(&path).unwrap().len(),
748 expected_size,
749 "V1 file must not grow by CRC bytes after write"
750 );
751
752 let replay_store = BinaryStore::new(&path).unwrap();
754 let mut names = Vec::new();
755 replay_store
756 .replay(|_, data| {
757 if let Some((name, _, _)) = decode_create_entity(data) {
758 names.push(name.to_string());
759 }
760 })
761 .unwrap();
762 assert_eq!(names, vec!["Alice", "Bob", "Charlie"]);
763
764 let _ = std::fs::remove_file(&path);
765 }
766
767 #[test]
768 fn test_crc_detects_corrupted_payload() {
769 let path = tmp_path();
773 let mut store = BinaryStore::new(&path).unwrap();
774
775 let mut buf = Vec::new();
776 encode_create_entity(&mut buf, "Alice", "person", &["likes coffee".into()]).unwrap();
777 store.write_record(RecordKind::CreateEntity, &buf).unwrap();
778 store.flush_and_sync().unwrap();
779 drop(store);
780
781 let mut data = std::fs::read(&path).unwrap();
783 let payload_start = 8 + 4 + 1;
786 let corrupt_pos = payload_start + (data.len() - payload_start - 4) / 2;
787 data[corrupt_pos] ^= 0xFF;
788 std::fs::write(&path, &data).unwrap();
789
790 let replay_store = BinaryStore::new(&path).unwrap();
792 let mut count = 0;
793 replay_store
794 .replay(|_, _| count += 1)
795 .expect("CRC mismatch must return Ok (torn-tail semantics)");
796 assert_eq!(count, 0, "corrupted record must not reach callback");
797
798 let _ = std::fs::remove_file(&path);
799 }
800
801 #[test]
802 fn test_crc_detects_corrupted_middle_record() {
803 let path = tmp_path();
806 let mut store = BinaryStore::new(&path).unwrap();
807
808 let mut buf1 = Vec::new();
809 encode_create_entity(&mut buf1, "Alice", "person", &[]).unwrap();
810 store.write_record(RecordKind::CreateEntity, &buf1).unwrap();
811
812 let mut buf2 = Vec::new();
813 encode_create_entity(&mut buf2, "Bob", "person", &[]).unwrap();
814 store.write_record(RecordKind::CreateEntity, &buf2).unwrap();
815
816 store.flush_and_sync().unwrap();
817 drop(store);
818
819 let mut data = std::fs::read(&path).unwrap();
821 let rec1_end = 8 + 4 + 1 + buf1.len() + 4;
823 let rec2_payload_start = rec1_end + 4 + 1;
825 data[rec2_payload_start + 2] ^= 0xFF; std::fs::write(&path, &data).unwrap();
827
828 let replay_store = BinaryStore::new(&path).unwrap();
829 let mut names = Vec::new();
830 replay_store
831 .replay(|_, data| {
832 if let Some((name, _, _)) = decode_create_entity(data) {
833 names.push(name.to_string());
834 }
835 })
836 .expect("CRC mismatch of middle record must not hard-error");
837 assert_eq!(names, vec!["Alice"]);
839
840 let _ = std::fs::remove_file(&path);
841 }
842
843 #[test]
844 fn test_torn_record_mid_stream_recovers_prefix() {
845 let path = tmp_path();
849 let mut store = BinaryStore::new(&path).unwrap();
850 let mut buf = Vec::new();
851 encode_create_entity(&mut buf, "Alice", "person", &["likes coffee".into()]).unwrap();
852 store.write_record(RecordKind::CreateEntity, &buf).unwrap();
853 store.flush_and_sync().unwrap();
854 let good_len = std::fs::metadata(&path).unwrap().len();
855
856 buf.clear();
859 encode_create_entity(&mut buf, "Bob", "person", &["drinks tea".into()]).unwrap();
860 store.write_record(RecordKind::CreateEntity, &buf).unwrap();
861 store.flush_and_sync().unwrap();
862 drop(store);
863
864 let full_len = std::fs::metadata(&path).unwrap().len();
865 let torn_len = good_len + (full_len - good_len) / 2;
867 let file = OpenOptions::new().write(true).open(&path).unwrap();
868 file.set_len(torn_len).unwrap();
869 drop(file);
870
871 let replay_store = BinaryStore::new(&path).unwrap();
872 let mut names = Vec::new();
873 replay_store
874 .replay(|_, data| {
875 if let Some((name, _, _)) = decode_create_entity(data) {
876 names.push(name.to_string());
877 }
878 })
879 .expect("torn tail must not be a hard error");
880 assert_eq!(names, vec!["Alice"]);
882 let _ = std::fs::remove_file(&path);
883 }
884}