Skip to main content

mcp_memory/
store.rs

1use std::fs::{File, OpenOptions};
2use std::io::{BufReader, BufWriter, Read, Write};
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5
6use arc_swap::ArcSwap;
7
8const MAGIC: &[u8; 8] = b"MCPMEMV1";
9const MAGIC_CRC: &[u8; 8] = b"MCPMEMV2";
10const MAX_RECORD_BYTES: u32 = 1 << 20;
11
12#[repr(u8)]
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum RecordKind {
15    CreateEntity = 0,
16    CreateRelation = 1,
17    AddObservations = 2,
18    DeleteEntity = 3,
19    DeleteObservations = 4,
20    DeleteRelation = 5,
21    /// Opens a transaction: records that follow are buffered on replay and only
22    /// applied once a matching [`RecordKind::TxnCommit`] is seen. An unclosed
23    /// transaction (no commit before EOF) is discarded — this is how
24    /// multi-record operations like `merge_entities` stay crash-atomic.
25    TxnBegin = 6,
26    /// Closes a transaction opened by [`RecordKind::TxnBegin`].
27    TxnCommit = 7,
28}
29
30impl RecordKind {
31    #[inline]
32    pub const fn from_u8(v: u8) -> Option<RecordKind> {
33        Some(match v {
34            0 => RecordKind::CreateEntity,
35            1 => RecordKind::CreateRelation,
36            2 => RecordKind::AddObservations,
37            3 => RecordKind::DeleteEntity,
38            4 => RecordKind::DeleteObservations,
39            5 => RecordKind::DeleteRelation,
40            6 => RecordKind::TxnBegin,
41            7 => RecordKind::TxnCommit,
42            _ => return None,
43        })
44    }
45}
46
47pub struct BinaryStore {
48    writer: BufWriter<File>,
49    path: PathBuf,
50    /// Whether this store writes CRC32 footers on each record.
51    /// `true` for new files (magic `MCPMEMV2`); `false` for legacy V1 files.
52    has_crc: bool,
53    /// Shared cell holding the *current* file handle for the background sync
54    /// thread to `fsync`, without holding any lock. It is updated every time the
55    /// underlying file is (re)opened — notably by `compact`/`reopen_truncated`,
56    /// which swap in a fresh inode — so the sync thread never keeps fsyncing a
57    /// stale fd that points at a renamed-away/unlinked inode (D1).
58    pub(crate) sync_slot: Arc<ArcSwap<File>>,
59}
60
61impl BinaryStore {
62    pub const fn path(&self) -> &PathBuf {
63        &self.path
64    }
65
66    pub fn new(path: &Path) -> std::io::Result<Self> {
67        Self::new_with_slot(path, None)
68    }
69
70    /// Open (or create) the log. When `slot` is `Some`, the freshly opened file
71    /// handle is published into that existing shared cell instead of a new one —
72    /// this is how `compact` keeps the background sync thread pointed at the
73    /// post-compaction file rather than the renamed-away original (D1).
74    pub fn new_with_slot(
75        path: &Path,
76        slot: Option<Arc<ArcSwap<File>>>,
77    ) -> std::io::Result<Self> {
78        let exists = path.exists();
79        let file = OpenOptions::new()
80            .create(true)
81            .append(true)
82            .read(true)
83            .open(path)?;
84
85        let handle = Arc::new(file.try_clone()?);
86        let sync_slot = match slot {
87            Some(s) => {
88                s.store(handle);
89                s
90            }
91            None => Arc::new(ArcSwap::new(handle)),
92        };
93
94        // Determine CRC support and open the write handle.
95        let (has_crc, file) = if !exists {
96            let f = OpenOptions::new()
97                .create(true)
98                .append(true)
99                .read(false)
100                .open(path)?;
101            let mut w = BufWriter::with_capacity(65536, f);
102            w.write_all(MAGIC_CRC)?;
103            w.flush()?;
104            (true, w.into_inner().map_err(|e| e.into_error())?)
105        } else {
106            // Probe the existing file's magic to determine CRC support.
107            let probe_file = OpenOptions::new().read(true).open(path)?;
108            let mut probe = [0u8; 8];
109            let has_crc = match std::io::BufReader::new(&probe_file).read_exact(&mut probe) {
110                Ok(()) => &probe == MAGIC_CRC,
111                _ => false,
112            };
113            drop(probe_file);
114            let f = OpenOptions::new()
115                .create(true)
116                .append(true)
117                .read(false)
118                .open(path)?;
119            (has_crc, f)
120        };
121
122        let writer = BufWriter::with_capacity(65536, file);
123
124        Ok(Self {
125            writer,
126            path: path.to_path_buf(),
127            has_crc,
128            sync_slot,
129        })
130    }
131
132    pub fn write_record(&mut self, kind: RecordKind, payload: &[u8]) -> std::io::Result<()> {
133        let crc_len: usize = if self.has_crc { 4 } else { 0 };
134        let total_len = 4 + 1 + payload.len() + crc_len;
135        if total_len as u32 > MAX_RECORD_BYTES {
136            return Err(std::io::Error::new(
137                std::io::ErrorKind::InvalidInput,
138                "Record too large",
139            ));
140        }
141        self.writer.write_all(&(total_len as u32).to_le_bytes())?;
142        self.writer.write_all(&[kind as u8])?;
143        self.writer.write_all(payload)?;
144        if self.has_crc {
145            let crc = crc32fast::hash(payload);
146            self.writer.write_all(&crc.to_le_bytes())?;
147        }
148        Ok(())
149    }
150
151    /// Flush the `BufWriter` to the kernel buffer (no `fsync`).
152    pub fn flush(&mut self) -> std::io::Result<()> {
153        self.writer.flush()
154    }
155
156    /// `fsync` the underlying file (kernel buffer → disk).
157    pub fn sync(&mut self) -> std::io::Result<()> {
158        self.writer.get_ref().sync_data()
159    }
160
161    pub fn flush_and_sync(&mut self) -> std::io::Result<()> {
162        self.flush()?;
163        self.sync()
164    }
165
166    pub fn replay<F>(&self, mut callback: F) -> std::io::Result<()>
167    where
168        F: FnMut(RecordKind, &[u8]),
169    {
170        let file = match OpenOptions::new().read(true).open(&self.path) {
171            Ok(f) => f,
172            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()),
173            Err(e) => return Err(e),
174        };
175
176        let meta = file.metadata()?;
177        if meta.len() == 0 {
178            return Ok(());
179        }
180
181        let mut reader = BufReader::with_capacity(65536, file);
182        let mut magic = [0u8; 8];
183
184        match reader.read_exact(&mut magic) {
185            Ok(()) => {}
186            Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(()),
187            Err(e) => return Err(e),
188        }
189
190        let has_crc = if &magic == MAGIC_CRC {
191            true
192        } else if &magic == MAGIC {
193            false
194        } else {
195            return Ok(());
196        };
197
198        let mut payload_buf = Vec::with_capacity(4096);
199
200        loop {
201            let mut len_buf = [0u8; 4];
202            match reader.read_exact(&mut len_buf) {
203                Ok(()) => {}
204                Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(()),
205                Err(e) => return Err(e),
206            }
207            let total_len = u32::from_le_bytes(len_buf) as usize;
208            if total_len < 5 || total_len > MAX_RECORD_BYTES as usize {
209                return Err(std::io::Error::new(
210                    std::io::ErrorKind::InvalidData,
211                    format!("Invalid record length: {total_len}"),
212                ));
213            }
214            let payload_len = if has_crc {
215                total_len.checked_sub(5 + 4).ok_or_else(|| {
216                    std::io::Error::new(std::io::ErrorKind::InvalidData, "Record too short for CRC")
217                })?
218            } else {
219                total_len - 5
220            };
221
222            // A crash can leave a record's length prefix written but its body
223            // only partially flushed. Treat a short read on the kind/payload as
224            // a torn tail (stop cleanly) rather than a hard error — otherwise a
225            // single interrupted write would make the whole log unopenable.
226            let mut kind_buf = [0u8; 1];
227            match reader.read_exact(&mut kind_buf) {
228                Ok(()) => {}
229                Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(()),
230                Err(e) => return Err(e),
231            }
232            let kind_val = kind_buf[0];
233
234            payload_buf.clear();
235            payload_buf.resize(payload_len, 0);
236            if payload_len > 0 {
237                match reader.read_exact(&mut payload_buf) {
238                    Ok(()) => {}
239                    Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(()),
240                    Err(e) => return Err(e),
241                }
242            }
243
244            // Verify CRC32 for V2 records. A mismatch is treated as a torn
245            // tail (stop cleanly) rather than a hard corruption error.
246            if has_crc {
247                let mut crc_buf = [0u8; 4];
248                match reader.read_exact(&mut crc_buf) {
249                    Ok(()) => {}
250                    Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(()),
251                    Err(e) => return Err(e),
252                }
253                let expected = u32::from_le_bytes(crc_buf);
254                if crc32fast::hash(&payload_buf) != expected {
255                    tracing::warn!("CRC mismatch at offset — torn tail detected, stopping replay");
256                    return Ok(());
257                }
258            }
259
260            if let Some(kind) = RecordKind::from_u8(kind_val) {
261                callback(kind, &payload_buf);
262            } else {
263                tracing::warn!("Unknown record kind byte {kind_val}, skipping");
264            }
265        }
266    }
267
268    pub fn close(&mut self) -> std::io::Result<()> {
269        self.flush_and_sync()
270    }
271
272    /// Reopen the file with truncation — discards all existing records.
273    /// Used by `compact` to rewrite a fresh log from in-memory state.
274    pub fn reopen_truncated(&mut self) -> std::io::Result<()> {
275        self.writer.flush()?;
276        let file = OpenOptions::new()
277            .create(true)
278            .write(true)
279            .truncate(true)
280            .open(&self.path)?;
281        // Publish the new handle so the background sync thread fsyncs this file,
282        // not the truncated-away one (D1).
283        self.sync_slot.store(Arc::new(file.try_clone()?));
284        let mut writer = BufWriter::with_capacity(65536, file);
285        writer.write_all(MAGIC_CRC)?;
286        writer.flush()?;
287        self.writer = writer;
288        self.has_crc = true;
289        Ok(())
290    }
291}
292
293// --- Binary encoding helpers ---
294
295fn encode_str(buf: &mut Vec<u8>, s: &str) -> std::io::Result<()> {
296    let bytes = s.as_bytes();
297    let len = bytes.len();
298    if len > u16::MAX as usize {
299        return Err(std::io::Error::new(
300            std::io::ErrorKind::InvalidInput,
301            format!("string too long (max {} bytes, got {len})", u16::MAX),
302        ));
303    }
304    buf.extend_from_slice(&(len as u16).to_le_bytes());
305    buf.extend_from_slice(bytes);
306    Ok(())
307}
308
309fn decode_str<'a>(data: &'a [u8], offset: &mut usize) -> Option<&'a str> {
310    if *offset + 2 > data.len() {
311        return None;
312    }
313    let len = u16::from_le_bytes([data[*offset], data[*offset + 1]]) as usize;
314    *offset += 2;
315    if *offset + len > data.len() {
316        return None;
317    }
318    let s = std::str::from_utf8(&data[*offset..*offset + len]).ok()?;
319    *offset += len;
320    Some(s)
321}
322
323fn decode_count(data: &[u8], offset: &mut usize) -> Option<usize> {
324    if *offset + 4 > data.len() {
325        return None;
326    }
327    let count = u32::from_le_bytes([
328        data[*offset],
329        data[*offset + 1],
330        data[*offset + 2],
331        data[*offset + 3],
332    ]) as usize;
333    *offset += 4;
334    Some(count)
335}
336
337pub fn encode_create_entity(buf: &mut Vec<u8>, name: &str, entity_type: &str, observations: &[String]) -> std::io::Result<()> {
338    encode_str(buf, name)?;
339    encode_str(buf, entity_type)?;
340    buf.extend_from_slice(&(observations.len() as u32).to_le_bytes());
341    for obs in observations {
342        encode_str(buf, obs)?;
343    }
344    Ok(())
345}
346
347pub fn decode_create_entity(data: &[u8]) -> Option<(&str, &str, Vec<&str>)> {
348    let mut offset = 0;
349    let name = decode_str(data, &mut offset)?;
350    let entity_type = decode_str(data, &mut offset)?;
351    let count = decode_count(data, &mut offset)?;
352    let mut observations = Vec::with_capacity(count);
353    for _ in 0..count {
354        observations.push(decode_str(data, &mut offset)?);
355    }
356    Some((name, entity_type, observations))
357}
358
359pub fn encode_create_relation(buf: &mut Vec<u8>, from: &str, to: &str, relation_type: &str) -> std::io::Result<()> {
360    encode_str(buf, from)?;
361    encode_str(buf, to)?;
362    encode_str(buf, relation_type)
363}
364
365pub fn decode_create_relation(data: &[u8]) -> Option<(&str, &str, &str)> {
366    let mut offset = 0;
367    let from = decode_str(data, &mut offset)?;
368    let to = decode_str(data, &mut offset)?;
369    let relation_type = decode_str(data, &mut offset)?;
370    Some((from, to, relation_type))
371}
372
373pub fn encode_add_observations(buf: &mut Vec<u8>, name: &str, observations: &[String]) -> std::io::Result<()> {
374    encode_str(buf, name)?;
375    buf.extend_from_slice(&(observations.len() as u32).to_le_bytes());
376    for obs in observations {
377        encode_str(buf, obs)?;
378    }
379    Ok(())
380}
381
382pub fn decode_add_observations(data: &[u8]) -> Option<(&str, Vec<&str>)> {
383    let mut offset = 0;
384    let name = decode_str(data, &mut offset)?;
385    let count = decode_count(data, &mut offset)?;
386    let mut observations = Vec::with_capacity(count);
387    for _ in 0..count {
388        observations.push(decode_str(data, &mut offset)?);
389    }
390    Some((name, observations))
391}
392
393pub fn encode_delete_entity(buf: &mut Vec<u8>, name: &str) -> std::io::Result<()> {
394    encode_str(buf, name)
395}
396
397pub fn decode_delete_entity(data: &[u8]) -> Option<&str> {
398    let mut offset = 0;
399    decode_str(data, &mut offset)
400}
401
402pub fn encode_delete_observations(buf: &mut Vec<u8>, name: &str, observations: &[String]) -> std::io::Result<()> {
403    encode_str(buf, name)?;
404    buf.extend_from_slice(&(observations.len() as u32).to_le_bytes());
405    for obs in observations {
406        encode_str(buf, obs)?;
407    }
408    Ok(())
409}
410
411pub fn decode_delete_observations(data: &[u8]) -> Option<(&str, Vec<&str>)> {
412    decode_add_observations(data)
413}
414
415pub fn encode_delete_relation(buf: &mut Vec<u8>, from: &str, to: &str, relation_type: &str) -> std::io::Result<()> {
416    encode_str(buf, from)?;
417    encode_str(buf, to)?;
418    encode_str(buf, relation_type)
419}
420
421pub fn decode_delete_relation(data: &[u8]) -> Option<(&str, &str, &str)> {
422    decode_create_relation(data)
423}
424
425#[cfg(test)]
426mod tests {
427    use super::*;
428    use std::sync::atomic::{AtomicU64, Ordering};
429
430    static COUNTER: AtomicU64 = AtomicU64::new(0);
431
432    fn tmp_path() -> PathBuf {
433        let pid = std::process::id();
434        let seq = COUNTER.fetch_add(1, Ordering::SeqCst);
435        std::env::temp_dir().join(format!("mcp_store_test_{pid}_{seq}.bin"))
436    }
437
438    #[test]
439    fn test_write_and_replay() {
440        let path = tmp_path();
441        let mut store = BinaryStore::new(&path).unwrap();
442
443        let mut buf = Vec::new();
444        encode_create_entity(&mut buf, "Alice", "person", &["likes coffee".into()]).unwrap();
445        store.write_record(RecordKind::CreateEntity, &buf).unwrap();
446
447        buf.clear();
448        encode_create_entity(&mut buf, "Bob", "person", &[]).unwrap();
449        store.write_record(RecordKind::CreateEntity, &buf).unwrap();
450
451        drop(store);
452
453        let mut replayed: Vec<(RecordKind, Vec<u8>)> = Vec::new();
454        let replay_store = BinaryStore::new(&path).unwrap();
455        replay_store
456            .replay(|kind, data| {
457                replayed.push((kind, data.to_vec()));
458            })
459            .unwrap();
460
461        assert_eq!(replayed.len(), 2);
462        assert_eq!(replayed[0].0, RecordKind::CreateEntity);
463        assert_eq!(
464            decode_create_entity(&replayed[0].1).unwrap().0,
465            "Alice"
466        );
467
468        let _ = std::fs::remove_file(&path);
469    }
470
471    #[test]
472    fn test_encode_decode_roundtrip() {
473        let mut buf = Vec::new();
474        encode_create_entity(
475            &mut buf,
476            "TestEntity",
477            "test_type",
478            &["obs1".into(), "obs2".into()],
479        )
480        .unwrap();
481        let (name, etype, obs) = decode_create_entity(&buf).unwrap();
482        assert_eq!(name, "TestEntity");
483        assert_eq!(etype, "test_type");
484        assert_eq!(obs, vec!["obs1", "obs2"]);
485    }
486
487    #[test]
488    fn test_empty_file() {
489        let path = tmp_path();
490        let store = BinaryStore::new(&path).unwrap();
491        drop(store);
492
493        let mut count = 0;
494        let replay_store = BinaryStore::new(&path).unwrap();
495        replay_store.replay(|_, _| count += 1).unwrap();
496        assert_eq!(count, 0);
497        let _ = std::fs::remove_file(&path);
498    }
499
500    #[test]
501    fn test_write_all_record_kinds() {
502        let path = tmp_path();
503        let mut store = BinaryStore::new(&path).unwrap();
504        let mut buf = Vec::new();
505
506        // Write one of each record kind.
507        encode_create_entity(&mut buf, "E1", "t1", &["o1".into()]).unwrap();
508        store.write_record(RecordKind::CreateEntity, &buf).unwrap();
509
510        buf.clear();
511        encode_create_relation(&mut buf, "E1", "E2", "knows").unwrap();
512        store.write_record(RecordKind::CreateRelation, &buf).unwrap();
513
514        buf.clear();
515        encode_add_observations(&mut buf, "E1", &["o2".into()]).unwrap();
516        store.write_record(RecordKind::AddObservations, &buf).unwrap();
517
518        buf.clear();
519        encode_delete_entity(&mut buf, "E1").unwrap();
520        store.write_record(RecordKind::DeleteEntity, &buf).unwrap();
521
522        buf.clear();
523        encode_delete_observations(&mut buf, "E1", &["o1".into()]).unwrap();
524        store.write_record(RecordKind::DeleteObservations, &buf).unwrap();
525
526        buf.clear();
527        encode_delete_relation(&mut buf, "E1", "E2", "knows").unwrap();
528        store.write_record(RecordKind::DeleteRelation, &buf).unwrap();
529
530        drop(store);
531
532        let mut kinds = Vec::new();
533        let replay_store = BinaryStore::new(&path).unwrap();
534        replay_store
535            .replay(|kind, _| {
536                kinds.push(kind);
537            })
538            .unwrap();
539
540        assert_eq!(kinds.len(), 6);
541        assert_eq!(kinds[0], RecordKind::CreateEntity);
542        assert_eq!(kinds[1], RecordKind::CreateRelation);
543        assert_eq!(kinds[2], RecordKind::AddObservations);
544        assert_eq!(kinds[3], RecordKind::DeleteEntity);
545        assert_eq!(kinds[4], RecordKind::DeleteObservations);
546        assert_eq!(kinds[5], RecordKind::DeleteRelation);
547        let _ = std::fs::remove_file(&path);
548    }
549
550    #[test]
551    fn test_reopen_truncated() {
552        let path = tmp_path();
553        let mut store = BinaryStore::new(&path).unwrap();
554        let mut buf = Vec::new();
555        encode_create_entity(&mut buf, "E1", "t1", &[]).unwrap();
556        store.write_record(RecordKind::CreateEntity, &buf).unwrap();
557        drop(store);
558
559        // Reopen with truncation.
560        let mut store2 = BinaryStore::new(&path).unwrap();
561        store2.reopen_truncated().unwrap();
562
563        let mut buf2 = Vec::new();
564        encode_create_entity(&mut buf2, "E2", "t2", &[]).unwrap();
565        store2.write_record(RecordKind::CreateEntity, &buf2).unwrap();
566        drop(store2);
567
568        let mut names = Vec::new();
569        let replay_store = BinaryStore::new(&path).unwrap();
570        replay_store
571            .replay(|_, data| {
572                if let Some((name, _, _)) = decode_create_entity(data) {
573                    names.push(name.to_string());
574                }
575            })
576            .unwrap();
577
578        // Only E2 should remain — E1 was truncated away.
579        assert_eq!(names, vec!["E2"]);
580        let _ = std::fs::remove_file(&path);
581    }
582
583    #[test]
584    fn test_encode_decode_add_observations() {
585        let mut buf = Vec::new();
586        encode_add_observations(&mut buf, "Alice", &["obs1".into(), "obs2".into()]).unwrap();
587        let (name, obs) = decode_add_observations(&buf).unwrap();
588        assert_eq!(name, "Alice");
589        assert_eq!(obs, vec!["obs1", "obs2"]);
590    }
591
592    #[test]
593    fn test_encode_decode_delete_entity() {
594        let mut buf = Vec::new();
595        encode_delete_entity(&mut buf, "ToDelete").unwrap();
596        let name = decode_delete_entity(&buf).unwrap();
597        assert_eq!(name, "ToDelete");
598    }
599
600    #[test]
601    fn test_encode_decode_delete_observations() {
602        let mut buf = Vec::new();
603        encode_delete_observations(&mut buf, "Alice", &["o1".into()]).unwrap();
604        let (name, obs) = decode_delete_observations(&buf).unwrap();
605        assert_eq!(name, "Alice");
606        assert_eq!(obs, vec!["o1"]);
607    }
608
609    #[test]
610    fn test_encode_decode_delete_relation() {
611        let mut buf = Vec::new();
612        encode_delete_relation(&mut buf, "A", "B", "knows").unwrap();
613        let (from, to, rtype) = decode_delete_relation(&buf).unwrap();
614        assert_eq!(from, "A");
615        assert_eq!(to, "B");
616        assert_eq!(rtype, "knows");
617    }
618
619    #[test]
620    fn test_sync_slot_follows_reopen_truncated() {
621        // The background sync thread fsyncs through the shared slot; after a
622        // reopen it must observe the *new* file handle, not the old one (D1).
623        let path = tmp_path();
624        let mut store = BinaryStore::new(&path).unwrap();
625        let slot = Arc::clone(&store.sync_slot);
626        let before = Arc::as_ptr(&slot.load_full());
627        store.reopen_truncated().unwrap();
628        let after = Arc::as_ptr(&slot.load_full());
629        assert_ne!(before, after, "reopen must publish the new handle into the slot");
630        assert!(Arc::ptr_eq(&slot, &store.sync_slot), "slot identity must be stable");
631        let _ = std::fs::remove_file(&path);
632    }
633
634    #[test]
635    fn test_new_with_slot_reuses_shared_cell() {
636        // compact() reopens via new_with_slot(.., Some(existing_slot)) so the
637        // sync thread keeps tracking the same cell across the swap (D1).
638        let path = tmp_path();
639        let store1 = BinaryStore::new(&path).unwrap();
640        let slot = Arc::clone(&store1.sync_slot);
641        let before = Arc::as_ptr(&slot.load_full());
642        drop(store1);
643
644        let store2 = BinaryStore::new_with_slot(&path, Some(Arc::clone(&slot))).unwrap();
645        assert!(Arc::ptr_eq(&slot, &store2.sync_slot), "must reuse the passed slot");
646        let after = Arc::as_ptr(&slot.load_full());
647        assert_ne!(before, after, "reopened handle must be published into the slot");
648        let _ = std::fs::remove_file(&path);
649    }
650
651    #[test]
652    fn test_record_too_large() {
653        let path = tmp_path();
654        let mut store = BinaryStore::new(&path).unwrap();
655        let huge = vec![0u8; (1 << 20) + 1];
656        let result = store.write_record(RecordKind::CreateEntity, &huge);
657        assert!(result.is_err());
658        let _ = std::fs::remove_file(&path);
659    }
660
661    #[test]
662    fn test_multiple_writes_and_replay() {
663        let path = tmp_path();
664        let mut store = BinaryStore::new(&path).unwrap();
665        for i in 0..100 {
666            let mut buf = Vec::new();
667            encode_create_entity(&mut buf, &format!("E{i}"), "type", &[]).unwrap();
668            store.write_record(RecordKind::CreateEntity, &buf).unwrap();
669        }
670        drop(store);
671
672        let mut count = 0;
673        let replay_store = BinaryStore::new(&path).unwrap();
674        replay_store
675            .replay(|kind, _| {
676                assert_eq!(kind, RecordKind::CreateEntity);
677                count += 1;
678            })
679            .unwrap();
680        assert_eq!(count, 100);
681        let _ = std::fs::remove_file(&path);
682    }
683
684    #[test]
685    fn test_truncated_log_handling() {
686        let path = tmp_path();
687        let mut store = BinaryStore::new(&path).unwrap();
688        let mut buf = Vec::new();
689        encode_create_entity(&mut buf, "Alice", "person", &[]).unwrap();
690        store.write_record(RecordKind::CreateEntity, &buf).unwrap();
691        drop(store);
692
693        // Truncate the file manually (simulate crash during write).
694        let file = OpenOptions::new().write(true).open(&path).unwrap();
695        file.set_len(10).unwrap(); // cut off after MAGIC
696        drop(file);
697
698        // Replay should handle gracefully.
699        let replay_store = BinaryStore::new(&path).unwrap();
700        let mut count = 0;
701        replay_store.replay(|_, _| count += 1).unwrap();
702        assert_eq!(count, 0);
703        let _ = std::fs::remove_file(&path);
704    }
705
706    #[test]
707    fn test_v1_format_backward_compat() {
708        // Open an existing V1 file (MCPMEMV1 magic, no CRC), append a new record,
709        // and verify all records survive replay — the CRC footer must NOT be
710        // written for V1 files, and the payload_len calculation must not absorb
711        // non-existent CRC bytes into the payload (D2 regression guard).
712        let path = tmp_path();
713
714        // Manually craft a V1 file: MAGIC + V1-format records (no CRC).
715        let mut raw = Vec::new();
716        raw.extend_from_slice(b"MCPMEMV1");
717
718        let mut p1 = Vec::new();
719        encode_create_entity(&mut p1, "Alice", "person", &[]).unwrap();
720        let len1: u32 = 4 + 1 + p1.len() as u32;
721        raw.extend_from_slice(&len1.to_le_bytes());
722        raw.extend_from_slice(&[RecordKind::CreateEntity as u8]);
723        raw.extend_from_slice(&p1);
724
725        let mut p2 = Vec::new();
726        encode_create_entity(&mut p2, "Bob", "person", &[]).unwrap();
727        let len2: u32 = 4 + 1 + p2.len() as u32;
728        raw.extend_from_slice(&len2.to_le_bytes());
729        raw.extend_from_slice(&[RecordKind::CreateEntity as u8]);
730        raw.extend_from_slice(&p2);
731
732        std::fs::write(&path, &raw).unwrap();
733
734        // Open with BinaryStore — must detect V1 magic, set has_crc=false.
735        let mut store = BinaryStore::new(&path).unwrap();
736
737        // Append a third record — must NOT add CRC footer.
738        let mut p3 = Vec::new();
739        encode_create_entity(&mut p3, "Charlie", "person", &[]).unwrap();
740        store.write_record(RecordKind::CreateEntity, &p3).unwrap();
741        store.flush().unwrap();
742        drop(store);
743
744        // File size: V1 records have no CRC, so each takes 5 + payload bytes.
745        let expected_size = raw.len() as u64 + (5 + p3.len()) as u64;
746        assert_eq!(
747            std::fs::metadata(&path).unwrap().len(),
748            expected_size,
749            "V1 file must not grow by CRC bytes after write"
750        );
751
752        // Replay must decode all three records correctly.
753        let replay_store = BinaryStore::new(&path).unwrap();
754        let mut names = Vec::new();
755        replay_store
756            .replay(|_, data| {
757                if let Some((name, _, _)) = decode_create_entity(data) {
758                    names.push(name.to_string());
759                }
760            })
761            .unwrap();
762        assert_eq!(names, vec!["Alice", "Bob", "Charlie"]);
763
764        let _ = std::fs::remove_file(&path);
765    }
766
767    #[test]
768    fn test_crc_detects_corrupted_payload() {
769        // Corrupt a byte in a V2 file's payload and verify replay detects the
770        // CRC mismatch and stops — the corrupted record must NOT be returned
771        // to the callback (D2).
772        let path = tmp_path();
773        let mut store = BinaryStore::new(&path).unwrap();
774
775        let mut buf = Vec::new();
776        encode_create_entity(&mut buf, "Alice", "person", &["likes coffee".into()]).unwrap();
777        store.write_record(RecordKind::CreateEntity, &buf).unwrap();
778        store.flush_and_sync().unwrap();
779        drop(store);
780
781        // Read the raw file, corrupt one byte inside the payload.
782        let mut data = std::fs::read(&path).unwrap();
783        // Layout: MAGIC(8) + Len(4) + Kind(1) + payload(N) + CRC(4)
784        // Flip a bit at the payload midpoint.
785        let payload_start = 8 + 4 + 1;
786        let corrupt_pos = payload_start + (data.len() - payload_start - 4) / 2;
787        data[corrupt_pos] ^= 0xFF;
788        std::fs::write(&path, &data).unwrap();
789
790        // Replay must detect the CRC mismatch and stop before the callback.
791        let replay_store = BinaryStore::new(&path).unwrap();
792        let mut count = 0;
793        replay_store
794            .replay(|_, _| count += 1)
795            .expect("CRC mismatch must return Ok (torn-tail semantics)");
796        assert_eq!(count, 0, "corrupted record must not reach callback");
797
798        let _ = std::fs::remove_file(&path);
799    }
800
801    #[test]
802    fn test_crc_detects_corrupted_middle_record() {
803        // With two valid records, corrupting the second record's payload must
804        // preserve the first record and stop cleanly before the second (D2).
805        let path = tmp_path();
806        let mut store = BinaryStore::new(&path).unwrap();
807
808        let mut buf1 = Vec::new();
809        encode_create_entity(&mut buf1, "Alice", "person", &[]).unwrap();
810        store.write_record(RecordKind::CreateEntity, &buf1).unwrap();
811
812        let mut buf2 = Vec::new();
813        encode_create_entity(&mut buf2, "Bob", "person", &[]).unwrap();
814        store.write_record(RecordKind::CreateEntity, &buf2).unwrap();
815
816        store.flush_and_sync().unwrap();
817        drop(store);
818
819        // Corrupt a byte in the second record's payload.
820        let mut data = std::fs::read(&path).unwrap();
821        // First record ends at: 8 + 4 + 1 + payload1.len() + 4
822        let rec1_end = 8 + 4 + 1 + buf1.len() + 4;
823        // Second record payload starts at: rec1_end + 4 (len) + 1 (kind)
824        let rec2_payload_start = rec1_end + 4 + 1;
825        data[rec2_payload_start + 2] ^= 0xFF; // corrupt 3rd byte of payload
826        std::fs::write(&path, &data).unwrap();
827
828        let replay_store = BinaryStore::new(&path).unwrap();
829        let mut names = Vec::new();
830        replay_store
831            .replay(|_, data| {
832                if let Some((name, _, _)) = decode_create_entity(data) {
833                    names.push(name.to_string());
834                }
835            })
836            .expect("CRC mismatch of middle record must not hard-error");
837        // Alice survives; Bob is discarded.
838        assert_eq!(names, vec!["Alice"]);
839
840        let _ = std::fs::remove_file(&path);
841    }
842
843    #[test]
844    fn test_torn_record_mid_stream_recovers_prefix() {
845        // A crash that writes a record's length prefix but only part of its
846        // body must not brick the log: replay should return the records written
847        // before the torn one and stop cleanly (D2).
848        let path = tmp_path();
849        let mut store = BinaryStore::new(&path).unwrap();
850        let mut buf = Vec::new();
851        encode_create_entity(&mut buf, "Alice", "person", &["likes coffee".into()]).unwrap();
852        store.write_record(RecordKind::CreateEntity, &buf).unwrap();
853        store.flush_and_sync().unwrap();
854        let good_len = std::fs::metadata(&path).unwrap().len();
855
856        // Append a second record, then chop it in half to simulate a torn write
857        // (length prefix present, payload incomplete).
858        buf.clear();
859        encode_create_entity(&mut buf, "Bob", "person", &["drinks tea".into()]).unwrap();
860        store.write_record(RecordKind::CreateEntity, &buf).unwrap();
861        store.flush_and_sync().unwrap();
862        drop(store);
863
864        let full_len = std::fs::metadata(&path).unwrap().len();
865        // Cut somewhere inside the second record's body.
866        let torn_len = good_len + (full_len - good_len) / 2;
867        let file = OpenOptions::new().write(true).open(&path).unwrap();
868        file.set_len(torn_len).unwrap();
869        drop(file);
870
871        let replay_store = BinaryStore::new(&path).unwrap();
872        let mut names = Vec::new();
873        replay_store
874            .replay(|_, data| {
875                if let Some((name, _, _)) = decode_create_entity(data) {
876                    names.push(name.to_string());
877                }
878            })
879            .expect("torn tail must not be a hard error");
880        // Only the fully-written first record survives.
881        assert_eq!(names, vec!["Alice"]);
882        let _ = std::fs::remove_file(&path);
883    }
884}