Skip to main content

mcp_memory/
kg.rs

1use std::collections::VecDeque;
2use std::path::Path;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::{Arc, Condvar, Mutex as StdMutex};
5
6use ahash::{AHashMap, AHashSet};
7use arc_swap::ArcSwap;
8
9use serde::ser::{Serialize, SerializeSeq, SerializeStruct, Serializer};
10
11use crate::errors::{MCSError, Result};
12use crate::intern::{StrId, StringInterner};
13use crate::types::{Entity, Relation, KnowledgeGraphOut};
14use crate::search::SearchIndex;
15use crate::store::{self as store_enc, BinaryStore, RecordKind};
16
17const ENTITY_SLOT_LIVE: u8 = 1;
18const NAME_TABLE_SHARDS: usize = 4;
19
20// ---------------------------------------------------------------------------
21// Prefetch helper – issues a non-binding software prefetch hint to pull a
22// cache-line into L1/L2 while we finish probing the current entry.
23// ---------------------------------------------------------------------------
24#[cfg(target_arch = "x86_64")]
25#[inline(always)]
26unsafe fn prefetch_addr(addr: *const u8) {
27    // _MM_HINT_T0 = 3  (temporal prefetch to all cache levels)
28    std::arch::x86_64::_mm_prefetch::<3>(addr);
29}
30
31#[cfg(not(target_arch = "x86_64"))]
32#[inline(always)]
33const unsafe fn prefetch_addr(_addr: *const u8) {}
34
35/// fsync the directory containing `path` so a rename/create inside it is durable.
36/// On platforms where a directory cannot be opened/synced this is a no-op.
37fn sync_parent_dir(path: &Path) -> std::io::Result<()> {
38    let dir = path.parent().filter(|p| !p.as_os_str().is_empty());
39    let dir = match dir {
40        Some(d) => d,
41        None => Path::new("."),
42    };
43    match std::fs::File::open(dir) {
44        Ok(f) => match f.sync_all() {
45            Ok(()) => Ok(()),
46            // Some filesystems disallow fsync on a directory handle; tolerate it.
47            Err(e) if e.kind() == std::io::ErrorKind::InvalidInput => Ok(()),
48            Err(e) => Err(e),
49        },
50        Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
51        Err(e) => Err(e),
52    }
53}
54
55// ---------------------------------------------------------------------------
56// StoredEntity / StoredRelation – internal representations using StrId.
57// ---------------------------------------------------------------------------
58// Default layout: 40 B / align 8 (Rust packs `state` into the Vec's padding and
59// the `Option` niche is free). With `cache_align`, the slot is rounded to a full
60// 64-byte line so a point lookup/mutation (name_table -> slot index) touches
61// exactly one cache line instead of occasionally straddling two. Costs +60%
62// memory and a wider stride on bulk scans — measure before enabling.
63#[derive(Clone)]
64#[cfg_attr(feature = "cache_align", repr(align(64)))]
65pub(crate) struct StoredEntity {
66    state: u8,
67    pub(crate) name: StrId,
68    pub(crate) entity_type: StrId,
69    pub(crate) observations: Vec<StrId>,
70}
71
72impl StoredEntity {
73    pub(crate) const fn is_live(&self) -> bool {
74        self.state == ENTITY_SLOT_LIVE
75    }
76}
77
78// Default layout: 12 B / align 4 → ~1 in 5 records straddles a 64-byte line.
79// With `cache_align`, align(16) rounds the size to 16 B so 4 records fill a line
80// exactly (no straddle, AVX2-load-friendly) for +33% memory.
81#[derive(Clone)]
82#[cfg_attr(feature = "cache_align", repr(align(16)))]
83pub(crate) struct StoredRelation {
84    pub(crate) from: StrId,
85    pub(crate) to: StrId,
86    pub(crate) relation_type: StrId,
87}
88
89// ---------------------------------------------------------------------------
90// Borrowing serialization views (M6).
91//
92// Read tools used to build owned `Entity`/`Relation` vecs (a fresh `String`
93// per name/type/observation) and *then* serialize them — roughly 2-3x the
94// graph resident at once. These views instead hold references to the selected
95// stored records and emit their interned `&str` directly during
96// serialization, with no intermediate owned strings. The emitted JSON is
97// byte-for-byte identical to serializing `KnowledgeGraphOut`.
98// ---------------------------------------------------------------------------
99
100/// A borrowing view over a selected slice of the graph. Serializes to
101/// `{"entities":[...],"relations":[...]}`.
102pub struct GraphView<'a> {
103    kg: &'a KnowledgeGraph,
104    entities: Vec<&'a StoredEntity>,
105    relations: Vec<&'a StoredRelation>,
106}
107
108impl GraphView<'_> {
109    /// Materialize into the owned [`KnowledgeGraphOut`]. Used by the direct
110    /// (non-serializing) callers and tests; the server's read handlers
111    /// serialize the view directly instead.
112    pub fn to_owned_out(&self) -> KnowledgeGraphOut {
113        KnowledgeGraphOut {
114            entities: self.entities.iter().map(|e| self.kg.entity_to_output(e)).collect(),
115            relations: self.relations.iter().map(|r| self.kg.relation_to_output(r)).collect(),
116        }
117    }
118}
119
120impl Serialize for GraphView<'_> {
121    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
122        let mut st = s.serialize_struct("KnowledgeGraphOut", 2)?;
123        st.serialize_field("entities", &EntityListRef { kg: self.kg, items: &self.entities })?;
124        st.serialize_field("relations", &RelationListRef { kg: self.kg, items: &self.relations })?;
125        st.end()
126    }
127}
128
129struct EntityListRef<'a> { kg: &'a KnowledgeGraph, items: &'a [&'a StoredEntity] }
130impl Serialize for EntityListRef<'_> {
131    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
132        let mut seq = s.serialize_seq(Some(self.items.len()))?;
133        for &e in self.items {
134            seq.serialize_element(&EntityRef { kg: self.kg, e })?;
135        }
136        seq.end()
137    }
138}
139
140struct RelationListRef<'a> { kg: &'a KnowledgeGraph, items: &'a [&'a StoredRelation] }
141impl Serialize for RelationListRef<'_> {
142    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
143        let mut seq = s.serialize_seq(Some(self.items.len()))?;
144        for &r in self.items {
145            seq.serialize_element(&RelationRef { kg: self.kg, r })?;
146        }
147        seq.end()
148    }
149}
150
151struct EntityRef<'a> { kg: &'a KnowledgeGraph, e: &'a StoredEntity }
152impl Serialize for EntityRef<'_> {
153    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
154        let mut st = s.serialize_struct("Entity", 3)?;
155        st.serialize_field("name", self.kg.interner.lookup(self.e.name))?;
156        st.serialize_field("entityType", self.kg.interner.lookup(self.e.entity_type))?;
157        st.serialize_field("observations", &ObsRef { kg: self.kg, obs: &self.e.observations })?;
158        st.end()
159    }
160}
161
162struct ObsRef<'a> { kg: &'a KnowledgeGraph, obs: &'a [StrId] }
163impl Serialize for ObsRef<'_> {
164    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
165        let mut seq = s.serialize_seq(Some(self.obs.len()))?;
166        for &o in self.obs {
167            seq.serialize_element(self.kg.interner.lookup(o))?;
168        }
169        seq.end()
170    }
171}
172
173struct RelationRef<'a> { kg: &'a KnowledgeGraph, r: &'a StoredRelation }
174impl Serialize for RelationRef<'_> {
175    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
176        let mut st = s.serialize_struct("Relation", 3)?;
177        st.serialize_field("from", self.kg.interner.lookup(self.r.from))?;
178        st.serialize_field("to", self.kg.interner.lookup(self.r.to))?;
179        st.serialize_field("relationType", self.kg.interner.lookup(self.r.relation_type))?;
180        st.end()
181    }
182}
183
184/// Edge-following direction for neighborhood queries.
185#[derive(Clone, Copy, PartialEq, Eq, Debug)]
186pub enum Direction {
187    /// Follow `from -> to` (outgoing edges).
188    Out,
189    /// Follow `to -> from` (incoming edges).
190    In,
191    /// Follow edges regardless of orientation.
192    Both,
193}
194
195impl Direction {
196    /// Parse a direction string; anything other than `"out"`/`"in"` is `Both`.
197    pub fn parse(s: Option<&str>) -> Self {
198        match s {
199            Some("out") => Direction::Out,
200            Some("in") => Direction::In,
201            _ => Direction::Both,
202        }
203    }
204}
205
206/// Escape a string for embedding inside a Mermaid/DOT quoted label.
207fn sanitize_label(s: &str) -> String {
208    let mut out = String::with_capacity(s.len());
209    for c in s.chars() {
210        match c {
211            '"' => out.push('\''),
212            '\n' | '\r' => out.push(' '),
213            _ => out.push(c),
214        }
215    }
216    out
217}
218
219// ---------------------------------------------------------------------------
220// ShardedNameTable – open-addressing hash map split into N independent shards.
221//
222// Each shard uses **ctrl-byte bucket** approach: a 1-byte metadata array
223// stores the 7-bit hash stamp (h2) for each slot, with `0xFF` = EMPTY.
224// On probe, the first memory access is a single byte (ctrl). The full key
225// (StrId) is only compared when the stamp matches — ~127/128 of probe steps
226// touch nothing but the ctrl byte.  See also SwissTable / hashbrown.
227// ---------------------------------------------------------------------------
228const EMPTY_SLOT: u8 = 0xFF;
229
230#[inline(always)]
231const fn h2(hash: u64) -> u8 {
232    (hash & 0x7F) as u8
233}
234
235#[inline(always)]
236const fn h1(hash: u64, mask: usize) -> usize {
237    ((hash >> 7) as usize) & mask
238}
239
240#[derive(Clone)]
241struct NameTableShard {
242    ctrl: Vec<u8>,      // 0xFF = empty; 0x00-0x7F = h2 stamp (bit 7 always clear)
243    names: Vec<StrId>,
244    slots: Vec<u32>,
245    mask: usize,
246    count: usize,
247}
248
249impl NameTableShard {
250    fn new(capacity: usize) -> Self {
251        let cap = capacity.next_power_of_two().max(16);
252        Self {
253            ctrl: vec![EMPTY_SLOT; cap],
254            names: vec![StrId::EMPTY; cap],
255            slots: vec![u32::MAX; cap],
256            mask: cap - 1,
257            count: 0,
258        }
259    }
260
261    #[inline(always)]
262    fn lookup(&self, hash: u64, name: StrId) -> Option<u32> {
263        let stamp = h2(hash);
264        let mask = self.mask;
265        let mut idx = h1(hash, mask);
266        let ctrl = self.ctrl.as_ptr();
267        let names = self.names.as_ptr();
268        let slots = self.slots.as_ptr();
269        let len = self.ctrl.len();
270
271        for _ in 0..len {
272            // Prefetch the ctrl byte 4 slots ahead — overlaps memory latency.
273            let prefetch_idx = idx.wrapping_add(4) & mask;
274            unsafe { prefetch_addr(ctrl.add(prefetch_idx)) };
275
276            // SAFETY: idx always < len because of &mask on each iteration.
277            unsafe {
278                let c = *ctrl.add(idx);
279                // Bit 7 set → EMPTY → key not present.
280                if c & 0x80 != 0 {
281                    return None;
282                }
283                // Stamp match → compare full key (rare: ~1/128 probes).
284                if c == stamp && *names.add(idx) == name {
285                    return Some(*slots.add(idx));
286                }
287            }
288            idx = (idx + 1) & mask;
289        }
290        None
291    }
292
293    fn insert(&mut self, interner: &StringInterner, hash: u64, name: StrId, slot: u32) {
294        if self.count * 4 > self.ctrl.len() * 3 {
295            self.grow(interner);
296        }
297        let stamp = h2(hash);
298        let mask = self.mask;
299        let mut idx = h1(hash, mask);
300        loop {
301            // SAFETY: idx & mask always < len for power-of-two capacity.
302            unsafe {
303                if *self.ctrl.get_unchecked(idx) & 0x80 != 0 {
304                    *self.ctrl.get_unchecked_mut(idx) = stamp;
305                    *self.names.get_unchecked_mut(idx) = name;
306                    *self.slots.get_unchecked_mut(idx) = slot;
307                    self.count += 1;
308                    return;
309                }
310            }
311            idx = (idx + 1) & mask;
312        }
313    }
314
315    fn remove(&mut self, interner: &StringInterner, hash: u64, name: StrId) {
316        let stamp = h2(hash);
317        let mask = self.mask;
318        let mut idx = h1(hash, mask);
319        let len = self.ctrl.len();
320        for _ in 0..len {
321            if self.ctrl[idx] & 0x80 != 0 {
322                return;
323            }
324            if self.ctrl[idx] == stamp && self.names[idx] == name {
325                // Found — remove with shift-back to preserve probe chains.
326                self.ctrl[idx] = EMPTY_SLOT;
327                self.names[idx] = StrId::EMPTY;
328                self.slots[idx] = u32::MAX;
329                self.count -= 1;
330
331                let mut next = (idx + 1) & mask;
332                while self.ctrl[next] & 0x80 == 0 {
333                    let nn = self.names[next];
334                    let ns = self.slots[next];
335                    // Hash is no longer stored (M4) — recompute it from the
336                    // interned name to find the entry's ideal bucket.
337                    let nh = interner.get_hash(nn);
338                    self.ctrl[next] = EMPTY_SLOT;
339                    self.names[next] = StrId::EMPTY;
340                    self.slots[next] = u32::MAX;
341                    self.count -= 1;
342
343                    // Re-insert at its ideal bucket.
344                    let nstamp = h2(nh);
345                    let mut re_idx = h1(nh, mask);
346                    while self.ctrl[re_idx] & 0x80 == 0 {
347                        re_idx = (re_idx + 1) & mask;
348                    }
349                    self.ctrl[re_idx] = nstamp;
350                    self.names[re_idx] = nn;
351                    self.slots[re_idx] = ns;
352                    self.count += 1;
353
354                    next = (next + 1) & mask;
355                }
356                return;
357            }
358            idx = (idx + 1) & mask;
359        }
360    }
361
362    fn grow(&mut self, interner: &StringInterner) {
363        let new_cap = self.ctrl.len() * 2;
364        let new_mask = new_cap - 1;
365        let mut new_ctrl = vec![EMPTY_SLOT; new_cap];
366        let mut new_names = vec![StrId::EMPTY; new_cap];
367        let mut new_slots = vec![u32::MAX; new_cap];
368
369        for i in 0..self.ctrl.len() {
370            if self.ctrl[i] & 0x80 == 0 {
371                // Recompute the hash from the interned name (M4: not stored).
372                let name = self.names[i];
373                let hash = interner.get_hash(name);
374                let stamp = h2(hash);
375                let mut idx = h1(hash, new_mask);
376                while new_ctrl[idx] & 0x80 == 0 {
377                    idx = (idx + 1) & new_mask;
378                }
379                new_ctrl[idx] = stamp;
380                new_names[idx] = name;
381                new_slots[idx] = self.slots[i];
382            }
383        }
384
385        self.ctrl = new_ctrl;
386        self.names = new_names;
387        self.slots = new_slots;
388        self.mask = new_mask;
389    }
390}
391
392#[derive(Clone)]
393struct ShardedNameTable {
394    shards: [NameTableShard; NAME_TABLE_SHARDS],
395}
396
397impl ShardedNameTable {
398    fn new(capacity_per_shard: usize) -> Self {
399        Self {
400            shards: [
401                NameTableShard::new(capacity_per_shard),
402                NameTableShard::new(capacity_per_shard),
403                NameTableShard::new(capacity_per_shard),
404                NameTableShard::new(capacity_per_shard),
405            ],
406        }
407    }
408
409    #[inline(always)]
410    const fn shard(hash: u64) -> usize {
411        (hash as usize) & (NAME_TABLE_SHARDS - 1)
412    }
413
414    #[inline(always)]
415    fn lookup(&self, hash: u64, name: StrId) -> Option<u32> {
416        self.shards[Self::shard(hash)].lookup(hash, name)
417    }
418
419    #[inline(always)]
420    fn insert(&mut self, interner: &StringInterner, hash: u64, name: StrId, slot: u32) {
421        self.shards[Self::shard(hash)].insert(interner, hash, name, slot);
422    }
423
424    #[inline(always)]
425    fn remove(&mut self, interner: &StringInterner, hash: u64, name: StrId) {
426        self.shards[Self::shard(hash)].remove(interner, hash, name);
427    }
428}
429
430// ---------------------------------------------------------------------------
431// KnowledgeGraph – the central type.
432// ---------------------------------------------------------------------------
433pub struct KnowledgeGraph {
434    interner: StringInterner,
435    entity_slots: Vec<Option<StoredEntity>>,
436    /// Tombstoned slot indices available for reuse on the next create (M2),
437    /// so create/delete churn doesn't grow `entity_slots` without bound.
438    free_slots: Vec<u32>,
439    name_table: ShardedNameTable,
440    relations: Vec<StoredRelation>,
441    /// Incremental adjacency index: StrId → outgoing (to, type) pairs.
442    /// Updated on every create_relations/delete_relations/delete_entities.
443    /// Traversals use this instead of rebuilding from scratch (item 3 in plan).
444    adjacency: AHashMap<StrId, Vec<(StrId, StrId)>>,
445    search: SearchIndex,
446    store: BinaryStore,
447}
448
449// ---------------------------------------------------------------------------
450// ReadSnapshot – wait-free, lock-free frozen view of the graph for readers.
451// Created by KnowledgeGraph::snapshot() after each write transaction.
452// ---------------------------------------------------------------------------
453#[derive(Clone)]
454pub struct ReadSnapshot {
455    pub(crate) interner: StringInterner,
456    pub(crate) entity_slots: Arc<[Option<StoredEntity>]>,
457    #[allow(dead_code)]
458    free_slots: Vec<u32>,
459    name_table: ShardedNameTable,
460    pub(crate) relations: Arc<[StoredRelation]>,
461    adjacency: AHashMap<StrId, Vec<(StrId, StrId)>>,
462    search: SearchIndex,
463}
464
465// Fast JSON string escaper — writes escaped string into the buffer without
466// allocating an intermediate string for the escape pass.
467pub(crate) fn push_json_str(buf: &mut String, s: &str) {
468    buf.push('"');
469    for c in s.chars() {
470        match c {
471            '"' => buf.push_str("\\\""),
472            '\\' => buf.push_str("\\\\"),
473            '\n' => buf.push_str("\\n"),
474            '\r' => buf.push_str("\\r"),
475            '\t' => buf.push_str("\\t"),
476            c if c.is_control() => {
477                use std::fmt::Write;
478                write!(buf, "\\u{:04x}", c as u32).unwrap();
479            }
480            c => buf.push(c),
481        }
482    }
483    buf.push('"');
484}
485
486/// A borrowing view over a selection of entities/relations from a ReadSnapshot.
487/// Serializes without intermediate owned String allocations.
488pub struct ReadGraphView<'a> {
489    snap: &'a ReadSnapshot,
490    entities: Vec<&'a StoredEntity>,
491    relations: Vec<&'a StoredRelation>,
492}
493
494impl Serialize for ReadGraphView<'_> {
495    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
496        let mut st = s.serialize_struct("KnowledgeGraphOut", 2)?;
497        st.serialize_field("entities", &ReadEntityListRef { snap: self.snap, items: &self.entities })?;
498        st.serialize_field("relations", &ReadRelationListRef { snap: self.snap, items: &self.relations })?;
499        st.end()
500    }
501}
502
503struct ReadEntityListRef<'a> { snap: &'a ReadSnapshot, items: &'a [&'a StoredEntity] }
504impl Serialize for ReadEntityListRef<'_> {
505    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
506        let mut seq = s.serialize_seq(Some(self.items.len()))?;
507        for &e in self.items {
508            seq.serialize_element(&ReadEntityRef { snap: self.snap, e })?;
509        }
510        seq.end()
511    }
512}
513
514struct ReadRelationListRef<'a> { snap: &'a ReadSnapshot, items: &'a [&'a StoredRelation] }
515impl Serialize for ReadRelationListRef<'_> {
516    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
517        let mut seq = s.serialize_seq(Some(self.items.len()))?;
518        for &r in self.items {
519            seq.serialize_element(&ReadRelationRef { snap: self.snap, r })?;
520        }
521        seq.end()
522    }
523}
524
525struct ReadEntityRef<'a> { snap: &'a ReadSnapshot, e: &'a StoredEntity }
526impl Serialize for ReadEntityRef<'_> {
527    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
528        let mut st = s.serialize_struct("Entity", 3)?;
529        st.serialize_field("name", self.snap.interner.lookup(self.e.name))?;
530        st.serialize_field("entityType", self.snap.interner.lookup(self.e.entity_type))?;
531        st.serialize_field("observations", &ReadObsRef { snap: self.snap, obs: &self.e.observations })?;
532        st.end()
533    }
534}
535
536struct ReadObsRef<'a> { snap: &'a ReadSnapshot, obs: &'a [StrId] }
537impl Serialize for ReadObsRef<'_> {
538    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
539        let mut seq = s.serialize_seq(Some(self.obs.len()))?;
540        for &o in self.obs {
541            seq.serialize_element(self.snap.interner.lookup(o))?;
542        }
543        seq.end()
544    }
545}
546
547struct ReadRelationRef<'a> { snap: &'a ReadSnapshot, r: &'a StoredRelation }
548impl Serialize for ReadRelationRef<'_> {
549    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
550        let mut st = s.serialize_struct("Relation", 3)?;
551        st.serialize_field("from", self.snap.interner.lookup(self.r.from))?;
552        st.serialize_field("to", self.snap.interner.lookup(self.r.to))?;
553        st.serialize_field("relationType", self.snap.interner.lookup(self.r.relation_type))?;
554        st.end()
555    }
556}
557
558// --- ReadSnapshot helpers (mirrors KnowledgeGraph helpers) ---
559impl ReadSnapshot {
560
561    /// Serialize the full graph directly to a JSON string, avoiding intermediate
562    /// owned `Entity`/`Relation` allocations. This is the fast path for handlers.
563    pub fn read_graph_json(&self) -> String {
564        // Rough capacity: 40K entities × ~60 B + 120K relations × ~55 B = ~9 MB
565        let cap = self.entity_slots.len() * 64 + self.relations.len() * 60 + 128;
566        let mut buf = String::with_capacity(cap);
567
568        // entities array
569        buf.push_str(r#"{"entities":["#);
570        let mut first = true;
571        for slot in self.entity_slots.iter() {
572            let Some(e) = slot.as_ref().filter(|e| e.is_live()) else { continue };
573            if first { first = false } else { buf.push(',') }
574            buf.push('{');
575            // name
576            buf.push_str(r#""name":"#);
577            push_json_str(&mut buf, self.interner.lookup(e.name));
578            buf.push(',');
579            // entityType
580            buf.push_str(r#""entityType":"#);
581            push_json_str(&mut buf, self.interner.lookup(e.entity_type));
582            buf.push(',');
583            // observations
584            buf.push_str(r#""observations":["#);
585            for (oi, o) in e.observations.iter().enumerate() {
586                if oi > 0 { buf.push(',') }
587                push_json_str(&mut buf, self.interner.lookup(*o));
588            }
589            buf.push_str("]}");
590        }
591
592        // relations array
593        buf.push_str(r#"],"relations":["#);
594        first = true;
595        for r in self.relations.iter() {
596            if first { first = false } else { buf.push(',') }
597            buf.push('{');
598            buf.push_str(r#""from":"#);
599            push_json_str(&mut buf, self.interner.lookup(r.from));
600            buf.push(',');
601            buf.push_str(r#""to":"#);
602            push_json_str(&mut buf, self.interner.lookup(r.to));
603            buf.push(',');
604            buf.push_str(r#""relationType":"#);
605            push_json_str(&mut buf, self.interner.lookup(r.relation_type));
606            buf.push('}');
607        }
608        buf.push_str("]}");
609
610        buf
611    }
612
613    /// Borrowing view of the full graph, suitable for serde-serialization without
614    /// intermediate owned String allocations.
615    pub fn read_graph_view(&self) -> ReadGraphView<'_> {
616        let entities: Vec<&StoredEntity> = self
617            .entity_slots
618            .iter()
619            .filter_map(|s| s.as_ref().filter(|e| e.is_live()))
620            .collect();
621        let relations: Vec<&StoredRelation> = self.relations.iter().collect();
622        ReadGraphView { snap: self, entities, relations }
623    }
624
625    fn lookup_live_slot(&self, name: &str) -> Option<u32> {
626        let name_id = self.interner.get_optional(name)?;
627        let hash = self.interner.get_hash(name_id);
628        let slot = self.name_table.lookup(hash, name_id)?;
629        self.entity_slots
630            .get(slot as usize)
631            .and_then(|s| s.as_ref())
632            .filter(|e| e.is_live())?;
633        Some(slot)
634    }
635
636    fn entity_by_name_id(&self, name_id: StrId) -> Option<Entity> {
637        let hash = self.interner.get_hash(name_id);
638        let slot = self.name_table.lookup(hash, name_id)?;
639        let e = self.entity_slots.get(slot as usize)?.as_ref()?;
640        Some(self.entity_to_output(e))
641    }
642
643    pub(crate) fn entity_to_output(&self, e: &StoredEntity) -> Entity {
644        Entity {
645            name: self.interner.lookup(e.name).to_string(),
646            entity_type: self.interner.lookup(e.entity_type).to_string(),
647            observations: e
648                .observations
649                .iter()
650                .map(|o| self.interner.lookup(*o).to_string())
651                .collect(),
652        }
653    }
654
655    pub(crate) fn relation_to_output(&self, r: &StoredRelation) -> Relation {
656        Relation {
657            from: self.interner.lookup(r.from).to_string(),
658            to: self.interner.lookup(r.to).to_string(),
659            relation_type: self.interner.lookup(r.relation_type).to_string(),
660        }
661    }
662
663    /// Open named entities + incident relations.
664    pub fn open_nodes(&self, names: &[String]) -> KnowledgeGraphOut {
665        let name_ids: std::collections::HashSet<StrId> = names
666            .iter()
667            .filter_map(|n| self.interner.get_optional(n))
668            .collect();
669        let entities: Vec<Entity> = self
670            .entity_slots
671            .iter()
672            .filter_map(|s| {
673                let e = s.as_ref()?;
674                if e.is_live() && name_ids.contains(&e.name) {
675                    Some(self.entity_to_output(e))
676                } else {
677                    None
678                }
679            })
680            .collect();
681        let matched: std::collections::HashSet<StrId> = entities.iter()
682            .filter_map(|e| self.interner.get_optional(&e.name))
683            .collect();
684        let relations: Vec<Relation> = self
685            .relations
686            .iter()
687            .filter(|r| matched.contains(&r.from) || matched.contains(&r.to))
688            .map(|r| self.relation_to_output(r))
689            .collect();
690        KnowledgeGraphOut { entities, relations }
691    }
692
693    /// Full graph read. Returns all entities and relations.
694    pub fn read_graph(&self) -> KnowledgeGraphOut {
695        let entities: Vec<Entity> = self
696            .entity_slots
697            .iter()
698            .filter_map(|s| s.as_ref().filter(|e| e.is_live()))
699            .map(|e| self.entity_to_output(e))
700            .collect();
701        let relations: Vec<Relation> = self
702            .relations
703            .iter()
704            .map(|r| self.relation_to_output(r))
705            .collect();
706        KnowledgeGraphOut { entities, relations }
707    }
708
709    /// Get a single entity by name.
710    pub fn get_entity(&self, name: &str) -> Option<Entity> {
711        self.lookup_live_slot(name)?;
712        let name_id = self.interner.get_optional(name)?;
713        self.entity_by_name_id(name_id)
714    }
715
716    /// Neighborhood expansion (same logic as KnowledgeGraph::neighbors but read-only).
717    pub fn neighbors(
718        &self,
719        name: &str,
720        direction: Direction,
721        rtype: Option<&str>,
722        depth: u32,
723    ) -> Result<KnowledgeGraphOut> {
724        self.lookup_live_slot(name)
725            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
726        let start = self.interner.get_optional(name).unwrap();
727
728        let rtype_id = match rtype {
729            Some(r) => match self.interner.get_optional(r) {
730                Some(id) => Some(id),
731                None => {
732                    let entities = self.entity_by_name_id(start).into_iter().collect();
733                    return Ok(KnowledgeGraphOut { entities, relations: Vec::new() });
734                }
735            },
736            None => None,
737        };
738
739        let mut visited: AHashSet<StrId> = AHashSet::new();
740        visited.insert(start);
741
742        let type_ok = |r: &StoredRelation, rt: Option<StrId>| rt.is_none_or(|rt_id| r.relation_type == rt_id);
743
744        if depth == 1 {
745            for r in self.relations.iter().filter(|r| type_ok(r, rtype_id)) {
746                match direction {
747                    Direction::Out => {
748                        if r.from == start { visited.insert(r.to); }
749                    }
750                    Direction::In => {
751                        if r.to == start { visited.insert(r.from); }
752                    }
753                    Direction::Both => {
754                        if r.from == start { visited.insert(r.to); }
755                        else if r.to == start { visited.insert(r.from); }
756                    }
757                }
758            }
759        } else if depth >= 2 {
760            let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
761            match direction {
762                Direction::Both => {
763                    for (&node, edges) in &self.adjacency {
764                        for &(nb, rt) in edges {
765                            if rtype_id.is_none_or(|rt_id| rt == rt_id) {
766                                adj.entry(node).or_default().push(nb);
767                            }
768                        }
769                    }
770                }
771                Direction::Out | Direction::In => {
772                    for r in self.relations.iter().filter(|r| type_ok(r, rtype_id)) {
773                        match direction {
774                            Direction::Out => adj.entry(r.from).or_default().push(r.to),
775                            Direction::In => adj.entry(r.to).or_default().push(r.from),
776                            _ => unreachable!(),
777                        }
778                    }
779                }
780            }
781            let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
782            queue.push_back((start, 0));
783            while let Some((node, d)) = queue.pop_front() {
784                if d >= depth { continue; }
785                if let Some(nbrs) = adj.get(&node) {
786                    for &nb in nbrs {
787                        if visited.insert(nb) {
788                            queue.push_back((nb, d + 1));
789                        }
790                    }
791                }
792            }
793        }
794
795        let mut entities = Vec::with_capacity(visited.len());
796        for &nid in &visited {
797            if let Some(e) = self.entity_by_name_id(nid) {
798                entities.push(e);
799            }
800        }
801        let relations: Vec<Relation> = self
802            .relations
803            .iter()
804            .filter(|r| type_ok(r, rtype_id) && visited.contains(&r.from) && visited.contains(&r.to))
805            .map(|r| self.relation_to_output(r))
806            .collect();
807        Ok(KnowledgeGraphOut { entities, relations })
808    }
809
810    /// Describe an entity: entity details, incident relations, neighbors, degree.
811    pub fn describe_entity(&self, name: &str) -> Result<serde_json::Value> {
812        let name_id = self
813            .interner
814            .get_optional(name)
815            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
816        let entity = self
817            .entity_by_name_id(name_id)
818            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
819
820        let mut incident: Vec<Relation> = Vec::new();
821        let mut neighbor_seen: AHashSet<StrId> = AHashSet::new();
822        let mut neighbors: Vec<&str> = Vec::new();
823        for r in self.relations.iter() {
824            if r.from == name_id || r.to == name_id {
825                incident.push(self.relation_to_output(r));
826                let other = if r.from == name_id { r.to } else { r.from };
827                if other != name_id && neighbor_seen.insert(other) {
828                    neighbors.push(self.interner.lookup(other));
829                }
830            }
831        }
832
833        Ok(serde_json::json!({
834            "entity": entity,
835            "relations": incident,
836            "neighbors": neighbors,
837            "degree": incident.len(),
838        }))
839    }
840
841    /// Find the shortest path between two entities.
842    pub fn find_path(&self, from: &str, to: &str) -> Result<Vec<String>> {
843        let from_id = self
844            .interner
845            .get_optional(from)
846            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
847        let to_id = self
848            .interner
849            .get_optional(to)
850            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
851        if self.lookup_live_slot(from).is_none() {
852            return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
853        }
854        if self.lookup_live_slot(to).is_none() {
855            return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
856        }
857
858        // BFS over incremental adjacency index.
859        let mut visited: AHashSet<StrId> = AHashSet::new();
860        let mut parent: AHashMap<StrId, StrId> = AHashMap::new();
861        let mut queue: VecDeque<StrId> = VecDeque::new();
862
863        visited.insert(from_id);
864        queue.push_back(from_id);
865
866        while let Some(current) = queue.pop_front() {
867            if current == to_id { break; }
868            if let Some(neighbors) = self.adjacency.get(&current) {
869                for &(neighbor, _) in neighbors {
870                    if visited.insert(neighbor) {
871                        parent.insert(neighbor, current);
872                        queue.push_back(neighbor);
873                    }
874                }
875            }
876        }
877
878        if !visited.contains(&to_id) {
879            return Err(MCSError::MemoryError(format!(
880                "No path found between '{from}' and '{to}'"
881            )));
882        }
883
884        let mut path = Vec::new();
885        let mut cur = to_id;
886        path.push(self.interner.lookup(cur).to_string());
887        while let Some(&p) = parent.get(&cur) {
888            path.push(self.interner.lookup(p).to_string());
889            cur = p;
890        }
891        path.reverse();
892        Ok(path)
893    }
894
895    /// Extract a subgraph around the given entities.
896    pub fn extract_subgraph(&self, names: &[String], depth: u32) -> Result<KnowledgeGraphOut> {
897        if names.is_empty() {
898            return Ok(KnowledgeGraphOut { entities: Vec::new(), relations: Vec::new() });
899        }
900        let mut visited: AHashSet<StrId> = AHashSet::new();
901        let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
902        for name in names {
903            if let Some(id) = self.interner.get_optional(name)
904                && visited.insert(id)
905            {
906                queue.push_back((id, 0));
907            }
908        }
909        let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
910        for (&node, edges) in &self.adjacency {
911            let nbrs: Vec<StrId> = edges.iter().map(|(to, _)| *to).collect();
912            adj.insert(node, nbrs);
913        }
914        while let Some((node, d)) = queue.pop_front() {
915            if d >= depth { continue; }
916            if let Some(nbrs) = adj.get(&node) {
917                for &nb in nbrs {
918                    if visited.insert(nb) {
919                        queue.push_back((nb, d + 1));
920                    }
921                }
922            }
923        }
924        let mut entities: Vec<Entity> = Vec::with_capacity(visited.len());
925        for &nid in &visited {
926            if let Some(e) = self.entity_by_name_id(nid) {
927                entities.push(e);
928            }
929        }
930        let relations: Vec<Relation> = self
931            .relations
932            .iter()
933            .filter(|r| visited.contains(&r.from) && visited.contains(&r.to))
934            .map(|r| self.relation_to_output(r))
935            .collect();
936        Ok(KnowledgeGraphOut { entities, relations })
937    }
938
939    /// Batch get entities.
940    pub fn batch_get_entities(&self, names: &[String]) -> Vec<Option<Entity>> {
941        names.iter().map(|n| self.get_entity(n)).collect()
942    }
943
944    /// Graph statistics.
945    pub fn graph_stats(&self) -> serde_json::Value {
946        let entity_count = self
947            .entity_slots
948            .iter()
949            .filter(|s| s.as_ref().is_some_and(|e| e.is_live()))
950            .count();
951        let relation_count = self.relations.len();
952        let type_counts = self.entity_type_counts();
953        let relation_type_counts = self.relation_type_counts();
954        serde_json::json!({
955            "entities": entity_count,
956            "relations": relation_count,
957            "entityTypes": type_counts,
958            "relationTypes": relation_type_counts,
959        })
960    }
961
962    /// Search relations by from/to/type filters.
963    pub fn search_relations(&self, from: Option<&str>, to: Option<&str>, rtype: Option<&str>) -> Vec<Relation> {
964        let from_id = from.and_then(|n| self.interner.get_optional(n));
965        let to_id = to.and_then(|n| self.interner.get_optional(n));
966        let rtype_id = rtype.and_then(|n| self.interner.get_optional(n));
967        self.relations
968            .iter()
969            .filter(|r| {
970                from_id.is_none_or(|id| r.from == id)
971                    && to_id.is_none_or(|id| r.to == id)
972                    && rtype_id.is_none_or(|id| r.relation_type == id)
973            })
974            .map(|r| self.relation_to_output(r))
975            .collect()
976    }
977
978    /// Count entities per type.
979    pub fn entity_type_counts(&self) -> Vec<(String, usize)> {
980        let mut counts: AHashMap<StrId, usize> = AHashMap::new();
981        for slot in self.entity_slots.iter() {
982            if let Some(e) = slot.as_ref().filter(|e| e.is_live()) {
983                *counts.entry(e.entity_type).or_default() += 1;
984            }
985        }
986        let mut result: Vec<(String, usize)> = counts
987            .into_iter()
988            .map(|(id, c)| (self.interner.lookup(id).to_string(), c))
989            .collect();
990        result.sort_by(|a, b| a.0.cmp(&b.0));
991        result
992    }
993
994    /// Count relations per type.
995    pub fn relation_type_counts(&self) -> Vec<(String, usize)> {
996        let mut counts: AHashMap<StrId, usize> = AHashMap::new();
997        for r in self.relations.iter() {
998            *counts.entry(r.relation_type).or_default() += 1;
999        }
1000        let mut result: Vec<(String, usize)> = counts
1001            .into_iter()
1002            .map(|(id, c)| (self.interner.lookup(id).to_string(), c))
1003            .collect();
1004        result.sort_by(|a, b| a.0.cmp(&b.0));
1005        result
1006    }
1007
1008    /// Export the graph in one of: json, mermaid, dot.
1009    pub fn export(&self, format: &str) -> Result<String> {
1010        match format {
1011            "json" => serde_json::to_string(&self.read_graph()).map_err(MCSError::JsonError),
1012            "mermaid" => Ok(self.export_mermaid()),
1013            "dot" => Ok(self.export_dot()),
1014            other => Err(MCSError::InvalidParams(format!(
1015                "Unknown export format '{other}' (expected json|mermaid|dot)"
1016            ))),
1017        }
1018    }
1019
1020    fn export_mermaid(&self) -> String {
1021        let mut out = String::with_capacity(4096);
1022        out.push_str("graph LR\n");
1023        for r in self.relations.iter() {
1024            let from = sanitize_label(self.interner.lookup(r.from));
1025            let to = sanitize_label(self.interner.lookup(r.to));
1026            let rt = sanitize_label(self.interner.lookup(r.relation_type));
1027            out.push_str(&format!("    {} -- \"{}\" --> {}\n", from, rt, to));
1028        }
1029        out
1030    }
1031
1032    fn export_dot(&self) -> String {
1033        let mut out = String::with_capacity(4096);
1034        out.push_str("digraph KG {\n");
1035        out.push_str("    rankdir=LR;\n");
1036        for slot in self.entity_slots.iter() {
1037            if let Some(e) = slot.as_ref().filter(|e| e.is_live()) {
1038                let name = sanitize_label(self.interner.lookup(e.name));
1039                let etype = sanitize_label(self.interner.lookup(e.entity_type));
1040                out.push_str(&format!("    \"{}\" [label=\"{}\n({})\"];\n", name, name, etype));
1041            }
1042        }
1043        for r in self.relations.iter() {
1044            let from = sanitize_label(self.interner.lookup(r.from));
1045            let to = sanitize_label(self.interner.lookup(r.to));
1046            let rt = sanitize_label(self.interner.lookup(r.relation_type));
1047            out.push_str(&format!("    \"{}\" -> \"{}\" [label=\"{}\"];\n", from, to, rt));
1048        }
1049        out.push_str("}\n");
1050        out
1051    }
1052
1053    /// Find all paths between two entities.
1054    pub fn find_all_paths(
1055        &self,
1056        from: &str,
1057        to: &str,
1058        max_depth: usize,
1059        max_paths: usize,
1060    ) -> Result<Vec<Vec<String>>> {
1061        let from_id = self
1062            .interner
1063            .get_optional(from)
1064            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
1065        let to_id = self
1066            .interner
1067            .get_optional(to)
1068            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
1069        if self.lookup_live_slot(from).is_none() {
1070            return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
1071        }
1072        if self.lookup_live_slot(to).is_none() {
1073            return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
1074        }
1075        if from_id == to_id {
1076            return Ok(vec![vec![from.to_string()]]);
1077        }
1078        let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::with_capacity(self.adjacency.len());
1079        for (&node, edges) in &self.adjacency {
1080            let nbrs: Vec<StrId> = edges.iter().map(|(to, _)| *to).collect();
1081            adj.insert(node, nbrs);
1082        }
1083        let mut all_paths: Vec<Vec<StrId>> = Vec::new();
1084        let mut current_path = Vec::new();
1085        let mut visited: AHashSet<StrId> = AHashSet::new();
1086        visited.insert(from_id);
1087        current_path.push(from_id);
1088        Self::dfs_all_paths(
1089            &adj,
1090            from_id,
1091            to_id,
1092            max_depth,
1093            max_paths,
1094            &mut visited,
1095            &mut current_path,
1096            &mut all_paths,
1097        );
1098        if all_paths.is_empty() {
1099            return Err(MCSError::MemoryError(format!(
1100                "No path found between '{from}' and '{to}'"
1101            )));
1102        }
1103        let result: Vec<Vec<String>> = all_paths
1104            .into_iter()
1105            .map(|path| {
1106                path.into_iter()
1107                    .map(|id| self.interner.lookup(id).to_string())
1108                    .collect()
1109            })
1110            .collect();
1111        Ok(result)
1112    }
1113
1114    fn dfs_all_paths(
1115        adj: &AHashMap<StrId, Vec<StrId>>,
1116        current: StrId,
1117        target: StrId,
1118        max_depth: usize,
1119        max_paths: usize,
1120        visited: &mut AHashSet<StrId>,
1121        current_path: &mut Vec<StrId>,
1122        all_paths: &mut Vec<Vec<StrId>>,
1123    ) {
1124        if all_paths.len() >= max_paths { return; }
1125        if current == target && current_path.len() > 1 {
1126            all_paths.push(current_path.clone());
1127            return;
1128        }
1129        if current_path.len() > max_depth { return; }
1130        if let Some(neighbors) = adj.get(&current) {
1131            for &nb in neighbors {
1132                if !visited.contains(&nb) {
1133                    visited.insert(nb);
1134                    current_path.push(nb);
1135                    Self::dfs_all_paths(adj, nb, target, max_depth, max_paths, visited, current_path, all_paths);
1136                    current_path.pop();
1137                    visited.remove(&nb);
1138                }
1139            }
1140        }
1141    }
1142
1143    /// Search for entities whose name/type/observations contain `query`.
1144    pub fn search_entities(&self, query: &str) -> Result<Vec<Entity>> {
1145        let token = query.to_lowercase();
1146        let matching = self.search.search(&token, &self.interner);
1147        Ok(matching
1148            .iter()
1149            .filter_map(|idx| {
1150                self.entity_slots
1151                    .get(*idx as usize)?
1152                    .as_ref()
1153                    .filter(|e| e.is_live())
1154                    .map(|e| self.entity_to_output(e))
1155            })
1156            .collect())
1157    }
1158}
1159
1160impl KnowledgeGraph {
1161    pub fn new(path: &Path) -> std::io::Result<Self> {
1162        Self::open(path, None)
1163    }
1164
1165    /// Open the graph, optionally reusing an existing background-sync slot so the
1166    /// fsync thread tracks the current file across a `compact` (D1).
1167    fn open(
1168        path: &Path,
1169        sync_slot: Option<Arc<arc_swap::ArcSwap<std::fs::File>>>,
1170    ) -> std::io::Result<Self> {
1171        let store = BinaryStore::new_with_slot(path, sync_slot)?;
1172
1173        // Replay into local collections, then assign into self — no raw pointers needed (X3).
1174        let mut interner = StringInterner::with_capacity(65536, 1024);
1175        let mut entity_slots: Vec<Option<StoredEntity>> = Vec::with_capacity(256);
1176        let mut name_table = ShardedNameTable::new(64);
1177        let mut relations: Vec<StoredRelation> = Vec::with_capacity(64);
1178        let mut search = SearchIndex::new();
1179
1180        // Transaction buffer: while `Some`, records are accumulated and only
1181        // applied on `TxnCommit`. An unclosed transaction at EOF is discarded,
1182        // which is what makes multi-record operations (e.g. `merge_entities`)
1183        // crash-atomic.
1184        let mut pending: Option<Vec<(RecordKind, Vec<u8>)>> = None;
1185        store.replay(|kind, data| {
1186            match kind {
1187                RecordKind::TxnBegin => pending = Some(Vec::new()),
1188                RecordKind::TxnCommit => {
1189                    if let Some(buffered) = pending.take() {
1190                        for (k, d) in &buffered {
1191                            Self::apply_record(
1192                                *k, d, &mut interner, &mut entity_slots, &mut search,
1193                                &mut name_table, &mut relations,
1194                            );
1195                        }
1196                    }
1197                }
1198                other => match pending.as_mut() {
1199                    Some(buffered) => buffered.push((other, data.to_vec())),
1200                    None => Self::apply_record(
1201                        other, data, &mut interner, &mut entity_slots, &mut search,
1202                        &mut name_table, &mut relations,
1203                    ),
1204                },
1205            }
1206        })?;
1207
1208        // Slots tombstoned by deletes during replay are available for reuse (M2).
1209        let free_slots: Vec<u32> = entity_slots
1210            .iter()
1211            .enumerate()
1212            .filter(|(_, s)| s.is_none())
1213            .map(|(i, _)| i as u32)
1214            .collect();
1215
1216        let mut adjacency: AHashMap<StrId, Vec<(StrId, StrId)>> = AHashMap::new();
1217        for rel in &relations {
1218            adjacency.entry(rel.from).or_default().push((rel.to, rel.relation_type));
1219            adjacency.entry(rel.to).or_default().push((rel.from, rel.relation_type));
1220        }
1221
1222        Ok(Self {
1223            interner,
1224            entity_slots,
1225            free_slots,
1226            name_table,
1227            relations,
1228            adjacency,
1229            search,
1230            store,
1231        })
1232    }
1233
1234    // -----------------------------------------------------------------------
1235    // Replay helpers (static to avoid borrow issues in the closure)
1236    // -----------------------------------------------------------------------
1237
1238    /// Apply one already-decoded log record to the in-memory collections.
1239    /// Shared by direct replay and by transaction commit. `TxnBegin`/`TxnCommit`
1240    /// are handled by the caller and are no-ops here.
1241    #[allow(clippy::too_many_arguments)]
1242    fn apply_record(
1243        kind: RecordKind,
1244        data: &[u8],
1245        interner: &mut StringInterner,
1246        entity_slots: &mut Vec<Option<StoredEntity>>,
1247        search: &mut SearchIndex,
1248        name_table: &mut ShardedNameTable,
1249        relations: &mut Vec<StoredRelation>,
1250    ) {
1251        match kind {
1252            RecordKind::CreateEntity => {
1253                if let Some((name, etype, obs)) = store_enc::decode_create_entity(data) {
1254                    Self::replay_create_entity(
1255                        interner, entity_slots, search, name_table, name, etype, &obs,
1256                    );
1257                }
1258            }
1259            RecordKind::CreateRelation => {
1260                if let Some((from, to, rtype)) = store_enc::decode_create_relation(data) {
1261                    let from_id = interner.intern(from);
1262                    let to_id = interner.intern(to);
1263                    let type_id = interner.intern(rtype);
1264                    relations.push(StoredRelation {
1265                        from: from_id,
1266                        to: to_id,
1267                        relation_type: type_id,
1268                    });
1269                }
1270            }
1271            RecordKind::AddObservations => {
1272                if let Some((name, obs)) = store_enc::decode_add_observations(data) {
1273                    Self::replay_add_observations(
1274                        interner, entity_slots, search, name_table, name, &obs,
1275                    );
1276                }
1277            }
1278            RecordKind::DeleteEntity => {
1279                if let Some(name) = store_enc::decode_delete_entity(data) {
1280                    Self::replay_delete_entity(
1281                        interner, entity_slots, relations, search, name_table, name,
1282                    );
1283                }
1284            }
1285            RecordKind::DeleteObservations => {
1286                if let Some((name, obs)) = store_enc::decode_delete_observations(data) {
1287                    Self::replay_delete_observations(
1288                        interner, entity_slots, search, name_table, name, &obs,
1289                    );
1290                }
1291            }
1292            RecordKind::DeleteRelation => {
1293                if let Some((from, to, rtype)) = store_enc::decode_delete_relation(data) {
1294                    let from_id = interner.intern(from);
1295                    let to_id = interner.intern(to);
1296                    let type_id = interner.intern(rtype);
1297                    relations.retain(|r| {
1298                        !(r.from == from_id && r.to == to_id && r.relation_type == type_id)
1299                    });
1300                }
1301            }
1302            RecordKind::TxnBegin | RecordKind::TxnCommit => {}
1303        }
1304    }
1305
1306    #[allow(clippy::ptr_arg)]
1307    fn replay_create_entity(
1308        interner: &mut StringInterner,
1309        entities: &mut Vec<Option<StoredEntity>>,
1310        search: &mut SearchIndex,
1311        name_table: &mut ShardedNameTable,
1312        name: &str,
1313        etype: &str,
1314        observations: &[&str],
1315    ) {
1316        let name_id = interner.intern(name);
1317        let type_id = interner.intern(etype);
1318        let obs_ids: Vec<StrId> = observations.iter().map(|o| interner.intern(o)).collect();
1319        let slot = entities.len() as u32;
1320        entities.push(Some(StoredEntity {
1321            state: ENTITY_SLOT_LIVE,
1322            name: name_id,
1323            entity_type: type_id,
1324            observations: obs_ids.clone(),
1325        }));
1326        let hash = interner.get_hash(name_id);
1327        name_table.insert(&*interner, hash, name_id, slot);
1328        search.index_entity(interner, slot, name_id, type_id, &obs_ids);
1329    }
1330
1331    fn replay_add_observations(
1332        interner: &mut StringInterner,
1333        entities: &mut [Option<StoredEntity>],
1334        search: &mut SearchIndex,
1335        name_table: &mut ShardedNameTable,
1336        name: &str,
1337        observations: &[&str],
1338    ) {
1339        let name_id = interner.intern(name);
1340        let hash = interner.get_hash(name_id);
1341        if let Some(slot) = name_table.lookup(hash, name_id)
1342            && let Some(Some(entity)) = entities.get_mut(slot as usize)
1343        {
1344            for &o in observations {
1345                let oid = interner.intern(o);
1346                if !entity.observations.contains(&oid) {
1347                    entity.observations.push(oid);
1348                }
1349            }
1350            search.remove_entity(slot);
1351            search.index_entity(
1352                interner,
1353                slot,
1354                entity.name,
1355                entity.entity_type,
1356                &entity.observations,
1357            );
1358        }
1359    }
1360
1361    fn replay_delete_entity(
1362        interner: &mut StringInterner,
1363        entities: &mut [Option<StoredEntity>],
1364        rels: &mut Vec<StoredRelation>,
1365        search: &mut SearchIndex,
1366        name_table: &mut ShardedNameTable,
1367        name: &str,
1368    ) {
1369        let name_id = interner.intern(name);
1370        let hash = interner.get_hash(name_id);
1371        if let Some(slot) = name_table.lookup(hash, name_id)
1372            && let Some(Some(_)) = entities.get(slot as usize)
1373        {
1374            entities[slot as usize] = None;
1375            search.remove_entity(slot);
1376            name_table.remove(&*interner, hash, name_id);
1377        }
1378        rels.retain(|r| r.from != name_id && r.to != name_id);
1379    }
1380
1381    fn replay_delete_observations(
1382        interner: &mut StringInterner,
1383        entities: &mut [Option<StoredEntity>],
1384        search: &mut SearchIndex,
1385        name_table: &mut ShardedNameTable,
1386        name: &str,
1387        observations: &[&str],
1388    ) {
1389        let name_id = interner.intern(name);
1390        let hash = interner.get_hash(name_id);
1391        if let Some(slot) = name_table.lookup(hash, name_id)
1392            && let Some(Some(entity)) = entities.get_mut(slot as usize)
1393        {
1394            let remove_ids: Vec<StrId> = observations.iter().map(|o| interner.intern(o)).collect();
1395            entity.observations.retain(|o| !remove_ids.contains(o));
1396            search.remove_entity(slot);
1397            search.index_entity(
1398                interner,
1399                slot,
1400                entity.name,
1401                entity.entity_type,
1402                &entity.observations,
1403            );
1404        }
1405    }
1406
1407    // -----------------------------------------------------------------------
1408    // Public API
1409    // -----------------------------------------------------------------------
1410
1411    pub const fn interner(&self) -> &StringInterner {
1412        &self.interner
1413    }
1414
1415    /// Return a single entity by exact name match.
1416    pub fn get_entity(&self, name: &str) -> Option<Entity> {
1417        let name_id = self.interner.get_optional(name)?;
1418        let hash = self.interner.get_hash(name_id);
1419        let slot = self.name_table.lookup(hash, name_id)?;
1420        let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
1421        if !stored.is_live() {
1422            return None;
1423        }
1424        Some(self.entity_to_output(stored))
1425    }
1426
1427    /// Return aggregate statistics about the graph.
1428    pub fn graph_stats(&self) -> serde_json::Value {
1429        let live_entities = self
1430            .entity_slots
1431            .iter()
1432            .filter(|s| s.as_ref().is_some_and(|e| e.is_live()))
1433            .count();
1434        let total_relations = self.relations.len();
1435        let index_entries = self.search.len();
1436        let total_obs: usize = self
1437            .entity_slots
1438            .iter()
1439            .filter_map(|s| s.as_ref())
1440            .filter(|e| e.is_live())
1441            .map(|e| e.observations.len())
1442            .sum();
1443
1444        serde_json::json!({
1445            "entities": live_entities,
1446            "relations": total_relations,
1447            "totalObservations": total_obs,
1448            "searchIndexEntries": index_entries,
1449            "internedStrings": self.interner.len(),
1450            "internedBytes": self.interner.total_bytes(),
1451        })
1452    }
1453
1454    /// Search relations by optional filters: `from`, `to`, `relationType`.
1455    /// Any filter that is absent matches everything. A filter value that does
1456    /// not exist in the graph returns empty results.
1457    pub fn search_relations(&self, from: Option<&str>, to: Option<&str>, rtype: Option<&str>) -> Vec<Relation> {
1458        let from_id = match from {
1459            Some(f) => match self.interner.get_optional(f) {
1460                Some(id) => Some(id),
1461                None => return Vec::new(),
1462            },
1463            None => None,
1464        };
1465        let to_id = match to {
1466            Some(t) => match self.interner.get_optional(t) {
1467                Some(id) => Some(id),
1468                None => return Vec::new(),
1469            },
1470            None => None,
1471        };
1472        let rtype_id = match rtype {
1473            Some(r) => match self.interner.get_optional(r) {
1474                Some(id) => Some(id),
1475                None => return Vec::new(),
1476            },
1477            None => None,
1478        };
1479
1480        self.relations
1481            .iter()
1482            .filter(|r| {
1483                from_id.is_none_or(|f| r.from == f)
1484                    && to_id.is_none_or(|t| r.to == t)
1485                    && rtype_id.is_none_or(|rt| r.relation_type == rt)
1486            })
1487            .map(|r| Relation {
1488                from: self.interner.lookup(r.from).to_string(),
1489                to: self.interner.lookup(r.to).to_string(),
1490                relation_type: self.interner.lookup(r.relation_type).to_string(),
1491            })
1492            .collect()
1493    }
1494
1495    /// BFS shortest-path between two entity names. Returns the sequence of
1496    /// entity names along the path (inclusive of both endpoints).
1497    pub fn find_path(&self, from: &str, to: &str) -> Result<Vec<String>> {
1498        let from_id = self.interner.get_optional(from)
1499            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
1500        let to_id = self.interner.get_optional(to)
1501            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
1502        let hash_from = self.interner.get_hash(from_id);
1503        let hash_to = self.interner.get_hash(to_id);
1504
1505        if self.name_table.lookup(hash_from, from_id).is_none() {
1506            return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
1507        }
1508        if self.name_table.lookup(hash_to, to_id).is_none() {
1509            return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
1510        }
1511        if from_id == to_id {
1512            return Ok(vec![from.to_string()]);
1513        }
1514
1515        // Use incremental adjacency index — O(degree) per hop, no rebuild.
1516        let mut visited: AHashSet<StrId> = AHashSet::new();
1517        let mut parent: AHashMap<StrId, StrId> = AHashMap::new();
1518        let mut queue: VecDeque<StrId> = VecDeque::new();
1519
1520        visited.insert(from_id);
1521        queue.push_back(from_id);
1522
1523        while let Some(current) = queue.pop_front() {
1524            if current == to_id {
1525                break;
1526            }
1527
1528            if let Some(neighbors) = self.adjacency.get(&current) {
1529                for &(neighbor, _) in neighbors {
1530                    if visited.insert(neighbor) {
1531                        parent.insert(neighbor, current);
1532                        queue.push_back(neighbor);
1533                    }
1534                }
1535            }
1536        }
1537
1538        if !parent.contains_key(&to_id) && from_id != to_id {
1539            return Err(MCSError::MemoryError(format!(
1540                "No path found between '{from}' and '{to}'"
1541            )));
1542        }
1543
1544        // Reconstruct path
1545        let mut path: Vec<String> = Vec::new();
1546        let mut cur = to_id;
1547        loop {
1548            path.push(self.interner.lookup(cur).to_string());
1549            if cur == from_id {
1550                break;
1551            }
1552            cur = *parent.get(&cur).ok_or_else(|| {
1553                MCSError::MemoryError("Path reconstruction failed".into())
1554            })?;
1555        }
1556        path.reverse();
1557        Ok(path)
1558    }
1559
1560    /// Rewrite the binary log from the current in-memory state.
1561    /// After compaction the log contains only the minimal set of records
1562    /// needed to reconstruct the graph (all creates, no deletes).
1563    /// Crash-safe: writes to a temp file, then atomically renames (C3).
1564    pub fn compact(&mut self) -> Result<()> {
1565        // 1. Collect current state as create-records
1566        let mut create_entities: Vec<Entity> = Vec::new();
1567        let mut create_relations: Vec<Relation> = Vec::new();
1568
1569        for slot in &self.entity_slots {
1570            if let Some(stored) = slot.as_ref().filter(|e| e.is_live()) {
1571                create_entities.push(self.entity_to_output(stored));
1572            }
1573        }
1574        for rel in &self.relations {
1575            create_relations.push(Relation {
1576                from: self.interner.lookup(rel.from).to_string(),
1577                to: self.interner.lookup(rel.to).to_string(),
1578                relation_type: self.interner.lookup(rel.relation_type).to_string(),
1579            });
1580        }
1581
1582        // 2. Write to a temp file first.
1583        //    Remove any stale temp left by a previously-interrupted compact:
1584        //    `BinaryStore::new` opens in append mode and only writes the MAGIC
1585        //    header when the file does not already exist, so appending to a
1586        //    leftover temp would produce a duplicated, header-corrupted log
1587        //    once renamed over the real one (C1).
1588        let tmp_path = self.store.path().with_extension("tmp");
1589        if let Err(e) = std::fs::remove_file(&tmp_path)
1590            && e.kind() != std::io::ErrorKind::NotFound
1591        {
1592            return Err(MCSError::IoError(e));
1593        }
1594        let mut tmp_store = BinaryStore::new(&tmp_path).map_err(MCSError::IoError)?;
1595        for entity in &create_entities {
1596            let mut buf = Vec::new();
1597            store_enc::encode_create_entity(&mut buf, &entity.name, &entity.entity_type, &entity.observations)
1598                .map_err(MCSError::IoError)?;
1599            tmp_store.write_record(RecordKind::CreateEntity, &buf).map_err(MCSError::IoError)?;
1600        }
1601        for relation in &create_relations {
1602            let mut buf = Vec::new();
1603            store_enc::encode_create_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
1604                .map_err(MCSError::IoError)?;
1605            tmp_store.write_record(RecordKind::CreateRelation, &buf).map_err(MCSError::IoError)?;
1606        }
1607        tmp_store.flush_and_sync().map_err(MCSError::IoError)?;
1608        drop(tmp_store);
1609
1610        // 3. Atomically rename over the original (atomic on POSIX), then fsync
1611        //    the containing directory so the rename itself is durable across a
1612        //    crash — content swap is atomic, but the directory entry update is
1613        //    not durable until the dir is synced (C2).
1614        std::fs::rename(&tmp_path, self.store.path()).map_err(MCSError::IoError)?;
1615        sync_parent_dir(self.store.path()).map_err(MCSError::IoError)?;
1616
1617        // 4. Rebuild the entire in-memory graph from the compacted log (M1/M2).
1618        //    Replaying into fresh structures reclaims the interner arena (stale
1619        //    strings from deleted/edited entities), tombstoned entity slots, and
1620        //    stale search-index entries — none of which the old reopen-only path
1621        //    reclaimed.
1622        let path = self.store.path().clone();
1623        // Reuse the existing sync slot so the background fsync thread follows the
1624        // compacted file instead of the renamed-away original (D1).
1625        let sync_slot = Arc::clone(&self.store.sync_slot);
1626        *self = KnowledgeGraph::open(&path, Some(sync_slot)).map_err(MCSError::IoError)?;
1627
1628        Ok(())
1629    }
1630
1631    // ---- Public API with write-ahead log (C1) and error propagation ----
1632
1633    pub fn create_entities(&mut self, entities: &[Entity]) -> Result<Vec<Entity>> {
1634        // Validate up front so an invalid entity never produces partial writes.
1635        for entity in entities {
1636            if entity.name.is_empty() {
1637                return Err(MCSError::InvalidParams(
1638                    "Entity name must not be empty".into(),
1639                ));
1640            }
1641        }
1642        let mut created = Vec::new();
1643        for entity in entities {
1644            // Check dedup before writing (using non-interning lookup)
1645            let existing = self.interner.get_optional(&entity.name)
1646                .and_then(|id| {
1647                    let hash = self.interner.get_hash(id);
1648                    self.name_table.lookup(hash, id)
1649                });
1650            if existing.is_some() {
1651                continue;
1652            }
1653            // Write-ahead: encode and log before mutating state
1654            let mut buf = Vec::new();
1655            store_enc::encode_create_entity(&mut buf, &entity.name, &entity.entity_type, &entity.observations)
1656                .map_err(MCSError::IoError)?;
1657            self.store.write_record(RecordKind::CreateEntity, &buf)
1658                .map_err(MCSError::IoError)?;
1659
1660            let name_id = self.interner.intern(&entity.name);
1661            let hash = self.interner.get_hash(name_id);
1662            let type_id = self.interner.intern(&entity.entity_type);
1663            let obs_ids: Vec<StrId> = entity
1664                .observations
1665                .iter()
1666                .map(|o| self.interner.intern(o))
1667                .collect();
1668            // Reuse a tombstoned slot if one is free (M2); its old search-index
1669            // entries were cleared on delete, so the slot starts clean.
1670            let reused = self.free_slots.pop();
1671            let slot = reused.unwrap_or(self.entity_slots.len() as u32);
1672            self.search
1673                .index_entity(&mut self.interner, slot, name_id, type_id, &obs_ids);
1674            let stored = Some(StoredEntity {
1675                state: ENTITY_SLOT_LIVE,
1676                name: name_id,
1677                entity_type: type_id,
1678                observations: obs_ids,
1679            });
1680            match reused {
1681                Some(s) => self.entity_slots[s as usize] = stored,
1682                None => self.entity_slots.push(stored),
1683            }
1684            self.name_table.insert(&self.interner, hash, name_id, slot);
1685            created.push(Entity {
1686                name: entity.name.clone(),
1687                entity_type: entity.entity_type.clone(),
1688                observations: entity.observations.clone(),
1689            });
1690        }
1691        Ok(created)
1692    }
1693
1694    pub fn create_relations(&mut self, relations: &[Relation]) -> Result<Vec<Relation>> {
1695        // Validate up front so an invalid relation never produces partial writes.
1696        for relation in relations {
1697            if relation.from.is_empty() || relation.to.is_empty() {
1698                return Err(MCSError::InvalidParams(
1699                    "Relation endpoints must not be empty".into(),
1700                ));
1701            }
1702        }
1703        let mut created = Vec::new();
1704        // Build a dedup set for O(1) duplicate checks (P5)
1705        let mut rel_set: AHashSet<(StrId, StrId, StrId)> = AHashSet::new();
1706        for rel in &self.relations {
1707            rel_set.insert((rel.from, rel.to, rel.relation_type));
1708        }
1709        for relation in relations {
1710            let from_id = self.interner.intern(&relation.from);
1711            let to_id = self.interner.intern(&relation.to);
1712            let type_id = self.interner.intern(&relation.relation_type);
1713            if !rel_set.insert((from_id, to_id, type_id)) {
1714                continue;
1715            }
1716            // Write-ahead: log before mutation
1717            let mut buf = Vec::new();
1718            store_enc::encode_create_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
1719                .map_err(MCSError::IoError)?;
1720            self.store.write_record(RecordKind::CreateRelation, &buf)
1721                .map_err(MCSError::IoError)?;
1722
1723            self.relations.push(StoredRelation {
1724                from: from_id,
1725                to: to_id,
1726                relation_type: type_id,
1727            });
1728            self.adjacency.entry(from_id).or_default().push((to_id, type_id));
1729            self.adjacency.entry(to_id).or_default().push((from_id, type_id));
1730            created.push(Relation {
1731                from: relation.from.clone(),
1732                to: relation.to.clone(),
1733                relation_type: relation.relation_type.clone(),
1734            });
1735        }
1736        Ok(created)
1737    }
1738
1739    pub fn add_observations(&mut self, entity_name: &str, contents: &[String]) -> Result<Vec<String>> {
1740        let name_id = self.interner.get_optional(entity_name)
1741            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1742        let hash = self.interner.get_hash(name_id);
1743        let slot = self
1744            .name_table
1745            .lookup(hash, name_id)
1746            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1747        // Snapshot the current observations so we can compute the deduplicated
1748        // additions *without* mutating in-memory state yet.
1749        let existing: AHashSet<StrId> = self
1750            .entity_slots
1751            .get(slot as usize)
1752            .and_then(|e| e.as_ref())
1753            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?
1754            .observations
1755            .iter()
1756            .copied()
1757            .collect();
1758
1759        // Deduplicate against existing observations *and* within this batch, so
1760        // the live result matches what replay (which dedups one-by-one) rebuilds.
1761        let mut added = Vec::new();
1762        let mut interned_added = Vec::new();
1763        let mut seen: AHashSet<StrId> = AHashSet::new();
1764        for content in contents {
1765            let cid = self.interner.intern(content);
1766            if existing.contains(&cid) || !seen.insert(cid) {
1767                continue;
1768            }
1769            interned_added.push(cid);
1770            added.push(content.clone());
1771        }
1772        if added.is_empty() {
1773            return Ok(added);
1774        }
1775
1776        // Write-ahead: the record must hit the log *before* any in-memory
1777        // mutation, so a failed write leaves memory and disk in agreement (C3).
1778        let mut buf = Vec::new();
1779        store_enc::encode_add_observations(&mut buf, entity_name, &added)
1780            .map_err(MCSError::IoError)?;
1781        self.store.write_record(RecordKind::AddObservations, &buf)
1782            .map_err(MCSError::IoError)?;
1783
1784        // Logged — now apply to in-memory state.
1785        let stored = self
1786            .entity_slots
1787            .get_mut(slot as usize)
1788            .and_then(|e| e.as_mut())
1789            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1790        stored.observations.extend_from_slice(&interned_added);
1791
1792        // Incrementally index only the new observation tokens (P3) — no
1793        // full remove + re-index of the whole entity.
1794        self.search
1795            .index_additional(&mut self.interner, slot, &interned_added);
1796        Ok(added)
1797    }
1798
1799    pub fn delete_entities(&mut self, entity_names: &[String]) -> Result<()> {
1800        let mut deleted_names = Vec::new();
1801        for name in entity_names {
1802            let name_id_opt = self.interner.get_optional(name);
1803            if let Some(name_id) = name_id_opt {
1804                let hash = self.interner.get_hash(name_id);
1805                if let Some(slot) = self.name_table.lookup(hash, name_id)
1806                    && let Some(Some(_)) = self.entity_slots.get(slot as usize)
1807                {
1808                    // Write-ahead: log before mutation
1809                    let mut buf = Vec::new();
1810                    store_enc::encode_delete_entity(&mut buf, name)
1811                        .map_err(MCSError::IoError)?;
1812                    self.store.write_record(RecordKind::DeleteEntity, &buf)
1813                        .map_err(MCSError::IoError)?;
1814
1815                    self.entity_slots[slot as usize] = None;
1816                    self.free_slots.push(slot);
1817                    self.search.remove_entity(slot);
1818                    self.name_table.remove(&self.interner, hash, name_id);
1819                    deleted_names.push(name.clone());
1820                }
1821            }
1822        }
1823        if !deleted_names.is_empty() {
1824            // Use a AHashSet for O(1) retain checks (P5)
1825            let deleted_ids: AHashSet<StrId> = deleted_names.iter()
1826                .map(|n| self.interner.intern(n))
1827                .collect();
1828            self.relations
1829                .retain(|r| !deleted_ids.contains(&r.from) && !deleted_ids.contains(&r.to));
1830            // Clean adjacency index
1831            for id in &deleted_ids {
1832                self.adjacency.remove(id);
1833                // Remove references from other entities' adjacency lists
1834                for list in self.adjacency.values_mut() {
1835                    list.retain(|(to, _)| !deleted_ids.contains(to));
1836                }
1837            }
1838        }
1839        Ok(())
1840    }
1841
1842    pub fn delete_observations(&mut self, entity_name: &str, observations: &[String]) -> Result<()> {
1843        let name_id = self.interner.get_optional(entity_name)
1844            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1845        let hash = self.interner.get_hash(name_id);
1846        let slot = self
1847            .name_table
1848            .lookup(hash, name_id)
1849            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1850        // Confirm the slot is live before logging.
1851        self.entity_slots
1852            .get(slot as usize)
1853            .and_then(|e| e.as_ref())
1854            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1855        let remove_ids: AHashSet<StrId> = observations.iter().map(|o| self.interner.intern(o)).collect();
1856
1857        // Write-ahead: log before touching in-memory state (C3).
1858        let mut buf = Vec::new();
1859        store_enc::encode_delete_observations(&mut buf, entity_name, observations)
1860            .map_err(MCSError::IoError)?;
1861        self.store.write_record(RecordKind::DeleteObservations, &buf)
1862            .map_err(MCSError::IoError)?;
1863
1864        // Logged — now apply.
1865        let stored = self
1866            .entity_slots
1867            .get_mut(slot as usize)
1868            .and_then(|e| e.as_mut())
1869            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1870        stored.observations.retain(|o| !remove_ids.contains(o));
1871        self.search.remove_entity(slot);
1872        self.search
1873            .index_entity(&mut self.interner, slot, stored.name, stored.entity_type, &stored.observations);
1874        Ok(())
1875    }
1876
1877    pub fn delete_relations(&mut self, relations: &[Relation]) -> Result<()> {
1878        // Collect targets into a AHashSet for O(1) retain checks (P5)
1879        let rels: AHashSet<(StrId, StrId, StrId)> = relations
1880            .iter()
1881            .map(|r| {
1882                (
1883                    self.interner.intern(&r.from),
1884                    self.interner.intern(&r.to),
1885                    self.interner.intern(&r.relation_type),
1886                )
1887            })
1888            .collect();
1889        // Write-ahead: log every deletion before mutating in-memory state (C3),
1890        // so a failed write can't leave memory ahead of the log.
1891        for relation in relations {
1892            let mut buf = Vec::new();
1893            store_enc::encode_delete_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
1894                .map_err(MCSError::IoError)?;
1895            self.store.write_record(RecordKind::DeleteRelation, &buf)
1896                .map_err(MCSError::IoError)?;
1897        }
1898        self.relations
1899            .retain(|r| !rels.contains(&(r.from, r.to, r.relation_type)));
1900        // Clean adjacency index
1901        for (f, t, rt) in &rels {
1902            if let Some(edges) = self.adjacency.get_mut(f) {
1903                edges.retain(|(to, rtype)| to != t || rtype != rt);
1904                if edges.is_empty() {
1905                    self.adjacency.remove(f);
1906                }
1907            }
1908            if let Some(edges) = self.adjacency.get_mut(t) {
1909                edges.retain(|(to, rtype)| to != f || rtype != rt);
1910                if edges.is_empty() {
1911                    self.adjacency.remove(t);
1912                }
1913            }
1914        }
1915        Ok(())
1916    }
1917
1918    pub fn read_graph(&self) -> KnowledgeGraphOut {
1919        self.read_graph_view().to_owned_out()
1920    }
1921
1922    /// Borrowing, allocation-light view of the full graph (M6). Serializing it
1923    /// streams interned `&str` directly instead of materializing a `String`
1924    /// per name/type/observation.
1925    pub fn read_graph_view(&self) -> GraphView<'_> {
1926        let entities: Vec<&StoredEntity> = self
1927            .entity_slots
1928            .iter()
1929            .filter_map(|s| s.as_ref().filter(|e| e.is_live()))
1930            .collect();
1931        let relations: Vec<&StoredRelation> = self.relations.iter().collect();
1932        GraphView { kg: self, entities, relations }
1933    }
1934
1935    /// Relevance-ranked substring search returning all matches (no pagination).
1936    /// Equivalent to `search_nodes_filtered(query, None, 0, usize::MAX)`.
1937    pub fn search_nodes(&self, query: &str) -> KnowledgeGraphOut {
1938        self.search_nodes_filtered(query, None, 0, usize::MAX)
1939    }
1940
1941    pub fn open_nodes(&self, names: &[String]) -> KnowledgeGraphOut {
1942        self.open_nodes_view(names).to_owned_out()
1943    }
1944
1945    /// Borrowing view variant of [`open_nodes`] (M6).
1946    pub fn open_nodes_view(&self, names: &[String]) -> GraphView<'_> {
1947        let name_ids: AHashSet<StrId> = names.iter()
1948            .filter_map(|n| self.interner.get_optional(n))
1949            .collect();
1950        let entities: Vec<&StoredEntity> = self
1951            .entity_slots
1952            .iter()
1953            .filter_map(|s| {
1954                s.as_ref()
1955                    .filter(|stored| stored.is_live() && name_ids.contains(&stored.name))
1956            })
1957            .collect();
1958        let matched_names: AHashSet<StrId> = entities.iter().map(|e| e.name).collect();
1959        let relations: Vec<&StoredRelation> = self
1960            .relations
1961            .iter()
1962            .filter(|r| matched_names.contains(&r.from) || matched_names.contains(&r.to))
1963            .collect();
1964        GraphView { kg: self, entities, relations }
1965    }
1966
1967    // -----------------------------------------------------------------------
1968    // Internal helpers
1969    // -----------------------------------------------------------------------
1970
1971    fn entity_to_output(&self, stored: &StoredEntity) -> Entity {
1972        Entity {
1973            name: self.interner.lookup(stored.name).to_string(),
1974            entity_type: self.interner.lookup(stored.entity_type).to_string(),
1975            observations: stored
1976                .observations
1977                .iter()
1978                .map(|o| self.interner.lookup(*o).to_string())
1979                .collect(),
1980        }
1981    }
1982
1983    #[inline]
1984    fn relation_to_output(&self, r: &StoredRelation) -> Relation {
1985        Relation {
1986            from: self.interner.lookup(r.from).to_string(),
1987            to: self.interner.lookup(r.to).to_string(),
1988            relation_type: self.interner.lookup(r.relation_type).to_string(),
1989        }
1990    }
1991
1992    /// Resolve a name to its live entity slot, or `None` if absent/deleted.
1993    fn lookup_live_slot(&self, name: &str) -> Option<u32> {
1994        let name_id = self.interner.get_optional(name)?;
1995        let hash = self.interner.get_hash(name_id);
1996        let slot = self.name_table.lookup(hash, name_id)?;
1997        let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
1998        stored.is_live().then_some(slot)
1999    }
2000
2001    /// Materialize a live entity from its interned name id.
2002    fn entity_by_name_id(&self, name_id: StrId) -> Option<Entity> {
2003        let hash = self.interner.get_hash(name_id);
2004        let slot = self.name_table.lookup(hash, name_id)?;
2005        let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
2006        stored.is_live().then(|| self.entity_to_output(stored))
2007    }
2008
2009    /// Tally distinct entity types and their live-entity counts, ranked by
2010    /// count descending (ties broken by name). One linear pass over the dense
2011    /// slot vec; only the final names are allocated.
2012    pub fn entity_type_counts(&self) -> Vec<(String, usize)> {
2013        let mut counts: AHashMap<StrId, usize> = AHashMap::new();
2014        for st in self
2015            .entity_slots
2016            .iter()
2017            .filter_map(|s| s.as_ref())
2018            .filter(|e| e.is_live())
2019        {
2020            *counts.entry(st.entity_type).or_insert(0) += 1;
2021        }
2022        self.rank_counts(counts)
2023    }
2024
2025    /// Tally distinct relation types and their counts, ranked by count desc.
2026    pub fn relation_type_counts(&self) -> Vec<(String, usize)> {
2027        let mut counts: AHashMap<StrId, usize> = AHashMap::new();
2028        for r in &self.relations {
2029            *counts.entry(r.relation_type).or_insert(0) += 1;
2030        }
2031        self.rank_counts(counts)
2032    }
2033
2034    fn rank_counts(&self, counts: AHashMap<StrId, usize>) -> Vec<(String, usize)> {
2035        let mut out: Vec<(String, usize)> = counts
2036            .into_iter()
2037            .map(|(id, c)| (self.interner.lookup(id).to_string(), c))
2038            .collect();
2039        out.sort_unstable_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
2040        out
2041    }
2042
2043    /// Relevance-ranked, optionally type-filtered, paginated node search.
2044    /// Entities come back best-match-first (see [`SearchIndex::search_ranked`]).
2045    /// Relations touching any returned entity (either endpoint) are included.
2046    pub fn search_nodes_filtered(
2047        &self,
2048        query: &str,
2049        entity_type: Option<&str>,
2050        offset: usize,
2051        limit: usize,
2052    ) -> KnowledgeGraphOut {
2053        self.search_nodes_view(query, entity_type, offset, limit).to_owned_out()
2054    }
2055
2056    /// Borrowing view variant of [`search_nodes_filtered`] (M6).
2057    pub fn search_nodes_view(
2058        &self,
2059        query: &str,
2060        entity_type: Option<&str>,
2061        offset: usize,
2062        limit: usize,
2063    ) -> GraphView<'_> {
2064        let type_id = match entity_type {
2065            Some(t) => match self.interner.get_optional(t) {
2066                Some(id) => Some(id),
2067                None => return GraphView { kg: self, entities: Vec::new(), relations: Vec::new() },
2068            },
2069            None => None,
2070        };
2071
2072        let ranked = self.search.search_ranked(query, &self.interner);
2073        let mut selected: AHashSet<StrId> = AHashSet::new();
2074        let mut entities: Vec<&StoredEntity> = Vec::new();
2075        let mut skipped = 0usize;
2076        for (slot, _score) in ranked {
2077            let Some(st) = self
2078                .entity_slots
2079                .get(slot as usize)
2080                .and_then(|s| s.as_ref())
2081                .filter(|e| e.is_live())
2082            else {
2083                continue;
2084            };
2085            if type_id.is_some_and(|tid| st.entity_type != tid) {
2086                continue;
2087            }
2088            if skipped < offset {
2089                skipped += 1;
2090                continue;
2091            }
2092            if entities.len() >= limit {
2093                break;
2094            }
2095            selected.insert(st.name);
2096            entities.push(st);
2097        }
2098
2099        let relations: Vec<&StoredRelation> = self
2100            .relations
2101            .iter()
2102            .filter(|r| selected.contains(&r.from) || selected.contains(&r.to))
2103            .collect();
2104        GraphView { kg: self, entities, relations }
2105    }
2106
2107    /// Type-filtered, paginated view of the whole graph. Unlike [`read_graph`],
2108    /// relations are restricted to those whose **both** endpoints fall in the
2109    /// returned entity page, so the slice is internally consistent.
2110    pub fn read_graph_filtered(
2111        &self,
2112        entity_type: Option<&str>,
2113        offset: usize,
2114        limit: usize,
2115    ) -> KnowledgeGraphOut {
2116        self.read_graph_filtered_view(entity_type, offset, limit).to_owned_out()
2117    }
2118
2119    /// Borrowing view variant of [`read_graph_filtered`] (M6).
2120    pub fn read_graph_filtered_view(
2121        &self,
2122        entity_type: Option<&str>,
2123        offset: usize,
2124        limit: usize,
2125    ) -> GraphView<'_> {
2126        let type_id = match entity_type {
2127            Some(t) => match self.interner.get_optional(t) {
2128                Some(id) => Some(id),
2129                None => return GraphView { kg: self, entities: Vec::new(), relations: Vec::new() },
2130            },
2131            None => None,
2132        };
2133
2134        let mut selected: AHashSet<StrId> = AHashSet::new();
2135        let mut entities: Vec<&StoredEntity> = Vec::new();
2136        let mut skipped = 0usize;
2137        for st in self
2138            .entity_slots
2139            .iter()
2140            .filter_map(|s| s.as_ref())
2141            .filter(|e| e.is_live())
2142        {
2143            if type_id.is_some_and(|tid| st.entity_type != tid) {
2144                continue;
2145            }
2146            if skipped < offset {
2147                skipped += 1;
2148                continue;
2149            }
2150            if entities.len() >= limit {
2151                break;
2152            }
2153            selected.insert(st.name);
2154            entities.push(st);
2155        }
2156
2157        let relations: Vec<&StoredRelation> = self
2158            .relations
2159            .iter()
2160            .filter(|r| selected.contains(&r.from) && selected.contains(&r.to))
2161            .collect();
2162        GraphView { kg: self, entities, relations }
2163    }
2164
2165    /// Neighborhood expansion around `name` out to `depth` hops, following
2166    /// edges in the requested [`Direction`] and (optionally) of one relation
2167    /// type. Returns the origin plus reached entities, and every relation
2168    /// (passing the type filter) whose endpoints are both inside that set.
2169    ///
2170    /// `depth == 1` (the common case) is a single linear pass over the flat
2171    /// relation vec; deeper queries build an adjacency map once (O(E)) and BFS.
2172    pub fn neighbors(
2173        &self,
2174        name: &str,
2175        direction: Direction,
2176        rtype: Option<&str>,
2177        depth: u32,
2178    ) -> Result<KnowledgeGraphOut> {
2179        self.lookup_live_slot(name)
2180            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
2181        // Safe: lookup_live_slot succeeded, so the name is interned.
2182        let start = self.interner.get_optional(name).unwrap();
2183
2184        // An unknown relation-type filter can match nothing: return just origin.
2185        let rtype_id = match rtype {
2186            Some(r) => match self.interner.get_optional(r) {
2187                Some(id) => Some(id),
2188                None => {
2189                    let entities = self.entity_by_name_id(start).into_iter().collect();
2190                    return Ok(KnowledgeGraphOut { entities, relations: Vec::new() });
2191                }
2192            },
2193            None => None,
2194        };
2195
2196        let mut visited: AHashSet<StrId> = AHashSet::new();
2197        visited.insert(start);
2198
2199        let type_ok = |r: &StoredRelation| rtype_id.is_none_or(|rt| r.relation_type == rt);
2200
2201        if depth == 1 {
2202            for r in self.relations.iter().filter(|r| type_ok(r)) {
2203                match direction {
2204                    Direction::Out => {
2205                        if r.from == start {
2206                            visited.insert(r.to);
2207                        }
2208                    }
2209                    Direction::In => {
2210                        if r.to == start {
2211                            visited.insert(r.from);
2212                        }
2213                    }
2214                    Direction::Both => {
2215                        if r.from == start {
2216                            visited.insert(r.to);
2217                        } else if r.to == start {
2218                            visited.insert(r.from);
2219                        }
2220                    }
2221                }
2222            }
2223        } else if depth >= 2 {
2224            // Build a direction-aware adjacency map once, then BFS.
2225            // For Direction::Both we use the incremental adjacency index;
2226            // for Direction::Out/In we filter relations directly.
2227            let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
2228            match direction {
2229                Direction::Both => {
2230                    for (&node, edges) in &self.adjacency {
2231                        for &(nb, rt) in edges {
2232                            if rtype_id.is_none_or(|rt_id| rt == rt_id) {
2233                                adj.entry(node).or_default().push(nb);
2234                            }
2235                        }
2236                    }
2237                }
2238                Direction::Out | Direction::In => {
2239                    for r in self.relations.iter().filter(|r| type_ok(r)) {
2240                        match direction {
2241                            Direction::Out => adj.entry(r.from).or_default().push(r.to),
2242                            Direction::In => adj.entry(r.to).or_default().push(r.from),
2243                            _ => unreachable!(),
2244                        }
2245                    }
2246                }
2247            }
2248            let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
2249            queue.push_back((start, 0));
2250            while let Some((node, d)) = queue.pop_front() {
2251                if d >= depth {
2252                    continue;
2253                }
2254                if let Some(nbrs) = adj.get(&node) {
2255                    for &nb in nbrs {
2256                        if visited.insert(nb) {
2257                            queue.push_back((nb, d + 1));
2258                        }
2259                    }
2260                }
2261            }
2262        }
2263
2264        let mut entities = Vec::with_capacity(visited.len());
2265        for &nid in &visited {
2266            if let Some(e) = self.entity_by_name_id(nid) {
2267                entities.push(e);
2268            }
2269        }
2270        let relations = self
2271            .relations
2272            .iter()
2273            .filter(|r| type_ok(r) && visited.contains(&r.from) && visited.contains(&r.to))
2274            .map(|r| self.relation_to_output(r))
2275            .collect();
2276        Ok(KnowledgeGraphOut { entities, relations })
2277    }
2278
2279    /// One-shot context bundle for a single entity: the entity itself, every
2280    /// incident relation, its distinct neighbor names, and its degree. Saves an
2281    /// agent the get_entity + two search_relations round-trips.
2282    pub fn describe_entity(&self, name: &str) -> Result<serde_json::Value> {
2283        let name_id = self
2284            .interner
2285            .get_optional(name)
2286            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
2287        let entity = self
2288            .entity_by_name_id(name_id)
2289            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
2290
2291        let mut incident: Vec<Relation> = Vec::new();
2292        let mut neighbor_seen: AHashSet<StrId> = AHashSet::new();
2293        let mut neighbors: Vec<&str> = Vec::new();
2294        for r in &self.relations {
2295            if r.from == name_id || r.to == name_id {
2296                incident.push(self.relation_to_output(r));
2297                let other = if r.from == name_id { r.to } else { r.from };
2298                if other != name_id && neighbor_seen.insert(other) {
2299                    neighbors.push(self.interner.lookup(other));
2300                }
2301            }
2302        }
2303
2304        Ok(serde_json::json!({
2305            "entity": entity,
2306            "relations": incident,
2307            "neighbors": neighbors,
2308            "degree": incident.len(),
2309        }))
2310    }
2311
2312    /// Create-or-merge a batch of entities idempotently. Missing entities are
2313    /// created; existing ones keep their type and gain any new observations
2314    /// (deduplicated). Returns a per-entity outcome. The caller is responsible
2315    /// for flushing — every underlying op is already write-ahead logged.
2316    pub fn upsert_entities(&mut self, entities: &[Entity]) -> Result<Vec<serde_json::Value>> {
2317        for e in entities {
2318            if e.name.is_empty() {
2319                return Err(MCSError::InvalidParams(
2320                    "Entity name must not be empty".into(),
2321                ));
2322            }
2323        }
2324        let mut out = Vec::with_capacity(entities.len());
2325        for e in entities {
2326            if self.lookup_live_slot(&e.name).is_some() {
2327                let added = self.add_observations(&e.name, &e.observations)?;
2328                out.push(serde_json::json!({
2329                    "name": e.name,
2330                    "created": false,
2331                    "addedObservations": added,
2332                }));
2333            } else {
2334                let created = self.create_entities(std::slice::from_ref(e))?;
2335                out.push(serde_json::json!({
2336                    "name": e.name,
2337                    "created": !created.is_empty(),
2338                    "addedObservations": e.observations,
2339                }));
2340            }
2341        }
2342        Ok(out)
2343    }
2344
2345    /// Serialize the graph in one of: `json` (read_graph), `mermaid`, `dot`.
2346    pub fn export(&self, format: &str) -> Result<String> {
2347        match format {
2348            "json" => serde_json::to_string(&self.read_graph()).map_err(MCSError::JsonError),
2349            "mermaid" => Ok(self.export_mermaid()),
2350            "dot" => Ok(self.export_dot()),
2351            other => Err(MCSError::InvalidParams(format!(
2352                "Unknown export format '{other}' (expected json|mermaid|dot)"
2353            ))),
2354        }
2355    }
2356
2357    /// Assign each live entity a stable `n{k}` node id for diagram output.
2358    fn diagram_node_ids(&self) -> (AHashMap<StrId, usize>, Vec<(usize, StrId)>) {
2359        let mut ids: AHashMap<StrId, usize> = AHashMap::new();
2360        let mut order: Vec<(usize, StrId)> = Vec::new();
2361        for st in self
2362            .entity_slots
2363            .iter()
2364            .filter_map(|s| s.as_ref())
2365            .filter(|e| e.is_live())
2366        {
2367            let n = ids.len();
2368            ids.insert(st.name, n);
2369            order.push((n, st.name));
2370        }
2371        (ids, order)
2372    }
2373
2374    fn export_mermaid(&self) -> String {
2375        let (ids, order) = self.diagram_node_ids();
2376        let mut s = String::with_capacity(64 + order.len() * 32 + self.relations.len() * 32);
2377        s.push_str("graph LR\n");
2378        for (n, name_id) in &order {
2379            let label = sanitize_label(self.interner.lookup(*name_id));
2380            s.push_str(&format!("  n{n}[\"{label}\"]\n"));
2381        }
2382        for r in &self.relations {
2383            if let (Some(&a), Some(&b)) = (ids.get(&r.from), ids.get(&r.to)) {
2384                let rel = sanitize_label(self.interner.lookup(r.relation_type));
2385                s.push_str(&format!("  n{a} -->|{rel}| n{b}\n"));
2386            }
2387        }
2388        s
2389    }
2390
2391    fn export_dot(&self) -> String {
2392        let (ids, order) = self.diagram_node_ids();
2393        let mut s = String::with_capacity(64 + order.len() * 32 + self.relations.len() * 32);
2394        s.push_str("digraph G {\n");
2395        for (n, name_id) in &order {
2396            let label = sanitize_label(self.interner.lookup(*name_id));
2397            s.push_str(&format!("  n{n} [label=\"{label}\"];\n"));
2398        }
2399        for r in &self.relations {
2400            if let (Some(&a), Some(&b)) = (ids.get(&r.from), ids.get(&r.to)) {
2401                let rel = sanitize_label(self.interner.lookup(r.relation_type));
2402                s.push_str(&format!("  n{a} -> n{b} [label=\"{rel}\"];\n"));
2403            }
2404        }
2405        s.push_str("}\n");
2406        s
2407    }
2408
2409    // ------ High-level productivity tools ------
2410
2411    /// Merge `source` entity into `target` entity. All observations from
2412    /// source are moved to target (deduplicated), all relations involving
2413    /// source are redirected to target (deduplicated), and source is then
2414    /// deleted.
2415    ///
2416    /// The whole merge is **atomic**: every sub-record is written to the log
2417    /// inside a single `TxnBegin`…`TxnCommit` transaction *before* any in-memory
2418    /// mutation. A failed or torn write therefore leaves both memory and the
2419    /// log untouched (an uncommitted transaction is discarded on replay), so the
2420    /// graph can never observe a half-applied merge. Caller flushes.
2421    pub fn merge_entities(&mut self, source: &str, target: &str) -> Result<serde_json::Value> {
2422        if source == target {
2423            return Err(MCSError::InvalidParams(
2424                "Source and target must be different entities".into(),
2425            ));
2426        }
2427        self.lookup_live_slot(source).ok_or_else(|| {
2428            MCSError::InvalidParams(format!("Source entity '{source}' not found"))
2429        })?;
2430        let target_slot = self.lookup_live_slot(target).ok_or_else(|| {
2431            MCSError::InvalidParams(format!("Target entity '{target}' not found"))
2432        })?;
2433
2434        let source_entity = self.get_entity(source).unwrap();
2435        let moved_obs_count = source_entity.observations.len();
2436        let source_id = self.interner.get_optional(source).unwrap();
2437        let target_id = self.interner.get_optional(target).unwrap();
2438
2439        // Observations to move: dedup against target's existing set and within
2440        // the batch (matching what `add_observations` would have done).
2441        let target_existing: AHashSet<StrId> = self.entity_slots[target_slot as usize]
2442            .as_ref()
2443            .unwrap()
2444            .observations
2445            .iter()
2446            .copied()
2447            .collect();
2448        let mut obs_seen: AHashSet<StrId> = AHashSet::new();
2449        let mut obs_to_add: Vec<String> = Vec::new();
2450        for o in &source_entity.observations {
2451            if let Some(oid) = self.interner.get_optional(o)
2452                && !target_existing.contains(&oid)
2453                && obs_seen.insert(oid)
2454            {
2455                obs_to_add.push(o.clone());
2456            }
2457        }
2458
2459        // Relations to redirect: replace source with target, drop self-loops,
2460        // and dedup against existing relations and within the batch.
2461        let existing_rels: AHashSet<(StrId, StrId, StrId)> =
2462            self.relations.iter().map(|r| (r.from, r.to, r.relation_type)).collect();
2463        let mut rel_seen: AHashSet<(StrId, StrId, StrId)> = AHashSet::new();
2464        let mut redirect: Vec<Relation> = Vec::new();
2465        for r in &self.relations {
2466            if r.from != source_id && r.to != source_id {
2467                continue;
2468            }
2469            let new_from = if r.from == source_id { target_id } else { r.from };
2470            let new_to = if r.to == source_id { target_id } else { r.to };
2471            if new_from == new_to {
2472                continue; // self-loop after redirect
2473            }
2474            let key = (new_from, new_to, r.relation_type);
2475            if existing_rels.contains(&key) || !rel_seen.insert(key) {
2476                continue;
2477            }
2478            redirect.push(Relation {
2479                from: self.interner.lookup(new_from).to_string(),
2480                to: self.interner.lookup(new_to).to_string(),
2481                relation_type: self.interner.lookup(r.relation_type).to_string(),
2482            });
2483        }
2484
2485        let added_count = obs_to_add.len();
2486        let redirected = redirect.len() as u32;
2487
2488        // Build every record up front so writing is the only fallible step.
2489        let mut records: Vec<(RecordKind, Vec<u8>)> = Vec::new();
2490        if !obs_to_add.is_empty() {
2491            let mut buf = Vec::new();
2492            store_enc::encode_add_observations(&mut buf, target, &obs_to_add)
2493                .map_err(MCSError::IoError)?;
2494            records.push((RecordKind::AddObservations, buf));
2495        }
2496        for r in &redirect {
2497            let mut buf = Vec::new();
2498            store_enc::encode_create_relation(&mut buf, &r.from, &r.to, &r.relation_type)
2499                .map_err(MCSError::IoError)?;
2500            records.push((RecordKind::CreateRelation, buf));
2501        }
2502        let mut del_buf = Vec::new();
2503        store_enc::encode_delete_entity(&mut del_buf, source).map_err(MCSError::IoError)?;
2504        records.push((RecordKind::DeleteEntity, del_buf));
2505
2506        // Write-ahead, transactionally: begin, all records, commit.
2507        self.store.write_record(RecordKind::TxnBegin, &[]).map_err(MCSError::IoError)?;
2508        for (kind, data) in &records {
2509            self.store.write_record(*kind, data).map_err(MCSError::IoError)?;
2510        }
2511        self.store.write_record(RecordKind::TxnCommit, &[]).map_err(MCSError::IoError)?;
2512
2513        // Logged and committed — now apply to in-memory state (no more logging).
2514        for (kind, data) in &records {
2515            Self::apply_record(
2516                *kind, data, &mut self.interner, &mut self.entity_slots, &mut self.search,
2517                &mut self.name_table, &mut self.relations,
2518            );
2519        }
2520
2521        // `apply_record` maintains `relations` but NOT the incremental
2522        // `adjacency` index (it is built from scratch on load, where this never
2523        // mattered). For a *live* merge we must keep adjacency in sync, or every
2524        // adjacency consumer — find_path, neighbors(depth>=2), extract_subgraph,
2525        // find_all_paths — would return stale results until the next restart
2526        // (M2). The merge removed `source` and added the `redirect` edges, so
2527        // mirror exactly those changes.
2528        self.adjacency.remove(&source_id);
2529        for list in self.adjacency.values_mut() {
2530            list.retain(|(to, _)| *to != source_id);
2531        }
2532        for r in &redirect {
2533            // All endpoints were interned above, so these lookups are infallible.
2534            let from_id = self.interner.get_optional(&r.from).unwrap();
2535            let to_id = self.interner.get_optional(&r.to).unwrap();
2536            let type_id = self.interner.get_optional(&r.relation_type).unwrap();
2537            self.adjacency.entry(from_id).or_default().push((to_id, type_id));
2538            self.adjacency.entry(to_id).or_default().push((from_id, type_id));
2539        }
2540
2541        Ok(serde_json::json!({
2542            "source": source,
2543            "target": target,
2544            "movedObservations": moved_obs_count,
2545            "addedObservations": added_count,
2546            "redirectedRelations": redirected,
2547        }))
2548    }
2549
2550    /// Extract a connected subgraph around one or more seed entity names,
2551    /// expanding out to `depth` hops along all relations (undirected). Returns
2552    /// the set of reached entities and the relations among them.
2553    pub fn extract_subgraph(&self, names: &[String], depth: u32) -> Result<KnowledgeGraphOut> {
2554        if names.is_empty() {
2555            return Ok(KnowledgeGraphOut {
2556                entities: Vec::new(),
2557                relations: Vec::new(),
2558            });
2559        }
2560        // Seed the BFS queue from any names that exist.
2561        let mut visited: AHashSet<StrId> = AHashSet::new();
2562        let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
2563        for name in names {
2564            if let Some(id) = self.interner.get_optional(name)
2565                && visited.insert(id)
2566            {
2567                queue.push_back((id, 0));
2568            }
2569        }
2570        // Build an undirected adjacency map from the incremental index.
2571        let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
2572        for (&node, edges) in &self.adjacency {
2573            let nb: Vec<StrId> = edges.iter().map(|(to, _)| *to).collect();
2574            adj.insert(node, nb);
2575        }
2576        while let Some((node, d)) = queue.pop_front() {
2577            if d >= depth {
2578                continue;
2579            }
2580            if let Some(nbrs) = adj.get(&node) {
2581                for &nb in nbrs {
2582                    if visited.insert(nb) {
2583                        queue.push_back((nb, d + 1));
2584                    }
2585                }
2586            }
2587        }
2588        let mut entities: Vec<Entity> = Vec::with_capacity(visited.len());
2589        for &nid in &visited {
2590            if let Some(e) = self.entity_by_name_id(nid) {
2591                entities.push(e);
2592            }
2593        }
2594        let relations: Vec<Relation> = self
2595            .relations
2596            .iter()
2597            .filter(|r| visited.contains(&r.from) && visited.contains(&r.to))
2598            .map(|r| self.relation_to_output(r))
2599            .collect();
2600        Ok(KnowledgeGraphOut { entities, relations })
2601    }
2602
2603    /// Return full entities for a list of names. Missing names yield `None`.
2604    pub fn batch_get_entities(&self, names: &[String]) -> Vec<Option<Entity>> {
2605        names.iter().map(|n| self.get_entity(n)).collect()
2606    }
2607
2608    /// Recursive DFS helper — collects every simple path from `current` to
2609    /// `target` up to `max_depth` hops, capped at `max_paths` results.
2610    #[allow(clippy::too_many_arguments)]
2611    fn dfs_all_paths(
2612        adj: &AHashMap<StrId, Vec<StrId>>,
2613        current: StrId,
2614        target: StrId,
2615        max_depth: usize,
2616        max_paths: usize,
2617        visited: &mut AHashSet<StrId>,
2618        current_path: &mut Vec<StrId>,
2619        all_paths: &mut Vec<Vec<StrId>>,
2620    ) {
2621        if all_paths.len() >= max_paths {
2622            return;
2623        }
2624        if current == target && current_path.len() > 1 {
2625            all_paths.push(current_path.clone());
2626            return;
2627        }
2628        if current_path.len() > max_depth {
2629            return;
2630        }
2631        if let Some(neighbors) = adj.get(&current) {
2632            for &nb in neighbors {
2633                if visited.insert(nb) {
2634                    current_path.push(nb);
2635                    Self::dfs_all_paths(
2636                        adj, nb, target, max_depth, max_paths, visited, current_path, all_paths,
2637                    );
2638                    current_path.pop();
2639                    visited.remove(&nb);
2640                }
2641            }
2642        }
2643    }
2644
2645    /// Find all simple paths between `from` and `to` up to `max_depth` hops,
2646    /// returning at most `max_paths` results. Paths are found via DFS with
2647    /// backtracking and include both endpoints.
2648    pub fn find_all_paths(
2649        &self,
2650        from: &str,
2651        to: &str,
2652        max_depth: usize,
2653        max_paths: usize,
2654    ) -> Result<Vec<Vec<String>>> {
2655        let from_id = self
2656            .interner
2657            .get_optional(from)
2658            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
2659        let to_id = self
2660            .interner
2661            .get_optional(to)
2662            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
2663        // Verify both are live.
2664        if self.lookup_live_slot(from).is_none() {
2665            return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
2666        }
2667        if self.lookup_live_slot(to).is_none() {
2668            return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
2669        }
2670        if from_id == to_id {
2671            return Ok(vec![vec![from.to_string()]]);
2672        }
2673        // Build undirected adjacency from the incremental index.
2674        let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::with_capacity(self.adjacency.len());
2675        for (&node, edges) in &self.adjacency {
2676            let nbrs: Vec<StrId> = edges.iter().map(|(to, _)| *to).collect();
2677            adj.insert(node, nbrs);
2678        }
2679        let mut all_paths: Vec<Vec<StrId>> = Vec::new();
2680        let mut current_path = Vec::new();
2681        let mut visited: AHashSet<StrId> = AHashSet::new();
2682        visited.insert(from_id);
2683        current_path.push(from_id);
2684        Self::dfs_all_paths(
2685            &adj,
2686            from_id,
2687            to_id,
2688            max_depth,
2689            max_paths,
2690            &mut visited,
2691            &mut current_path,
2692            &mut all_paths,
2693        );
2694        if all_paths.is_empty() {
2695            return Err(MCSError::MemoryError(format!(
2696                "No path found between '{from}' and '{to}'"
2697            )));
2698        }
2699        let result: Vec<Vec<String>> = all_paths
2700            .into_iter()
2701            .map(|path| {
2702                path.into_iter()
2703                    .map(|id| self.interner.lookup(id).to_string())
2704                    .collect()
2705            })
2706            .collect();
2707        Ok(result)
2708    }
2709
2710    // --- Snapshot ---
2711
2712    /// Create a wait-free read snapshot (item 2 in plan).
2713    /// Freezes entity_slots and relations into `Arc<[_]>` and clones the rest.
2714    pub fn snapshot(&self) -> ReadSnapshot {
2715        ReadSnapshot {
2716            interner: self.interner.clone(),
2717            entity_slots: Arc::from_iter(self.entity_slots.iter().cloned()),
2718            free_slots: self.free_slots.clone(),
2719            name_table: self.name_table.clone(),
2720            relations: Arc::from_iter(self.relations.iter().cloned()),
2721            adjacency: self.adjacency.clone(),
2722            search: self.search.clone(),
2723        }
2724    }
2725
2726    // --- Flush & sync ---
2727
2728    /// Flush the `BufWriter` to the kernel buffer (process-crash safe).
2729    pub fn flush(&mut self) -> Result<()> {
2730        self.store.flush().map_err(MCSError::IoError)
2731    }
2732
2733    /// `fsync` the log to disk (OS-crash safe). Called by the background sync
2734    /// thread in [`GraphHandle`]; most callers should use `flush()` instead.
2735    pub fn sync(&mut self) -> Result<()> {
2736        self.store.sync().map_err(MCSError::IoError)
2737    }
2738
2739    /// Flush + fsync (legacy; prefer [`flush`](Self::flush) for production use).
2740    pub fn flush_and_sync(&mut self) -> Result<()> {
2741        self.store.flush_and_sync().map_err(MCSError::IoError)
2742    }
2743}
2744
2745
2746
2747// ---------------------------------------------------------------------------
2748// GraphHandle – wait-free read / serialized-write handle.
2749// ---------------------------------------------------------------------------
2750
2751/// Wait-free read / serialized-write handle to the graph.
2752///
2753/// Readers load a frozen [`ReadSnapshot`] via [`read`](GraphHandle::read)
2754/// (lock-free via `ArcSwap`). Writers take a [`Mutex`] lock, mutate the
2755/// underlying [`KnowledgeGraph`], and publish a fresh snapshot on unlock
2756/// via the [`WriteGuard`] drop glue.
2757///
2758/// A background thread calls `fsync` on the WAL file every 1 second so that
2759/// write handlers never block on disk I/O. The thread is stopped on `Drop`.
2760///
2761/// The sync thread uses its own `Arc<File>` handle (cloned from the WAL file)
2762/// so that `fsync` never contends with the graph mutex. A [`Condvar`] notifies
2763/// the thread immediately after every write, ensuring low-latency sync without
2764/// polling.
2765pub struct GraphHandle {
2766    inner: Arc<parking_lot::Mutex<KnowledgeGraph>>,
2767    snapshot: ArcSwap<ReadSnapshot>,
2768    /// Cached JSON of the full `read_graph` output. Invalidated on every write.
2769    read_cache: ArcSwap<Option<Arc<str>>>,
2770    /// Notifies the background sync thread when a write has flushed data to the
2771    /// kernel buffer. The thread also wakes on a 1-second timeout as a fallback.
2772    /// The `bool` is `true` when there is pending data to sync.
2773    sync_notify: Arc<(StdMutex<bool>, Condvar)>,
2774    /// Signal the background sync thread to stop. Set on `Drop`.
2775    stop_sync: Arc<AtomicBool>,
2776}
2777
2778/// RAII guard that publishes a fresh [`ReadSnapshot`] on drop.
2779pub struct WriteGuard<'a> {
2780    guard: parking_lot::MutexGuard<'a, KnowledgeGraph>,
2781    snapshot: &'a ArcSwap<ReadSnapshot>,
2782    read_cache: &'a ArcSwap<Option<Arc<str>>>,
2783    sync_notify: &'a (StdMutex<bool>, Condvar),
2784    did_publish: bool,
2785}
2786
2787impl WriteGuard<'_> {
2788    /// Publish a snapshot now (eager, before drop). Also called by Drop.
2789    /// Invalidates the serialized read cache. Flushes the WAL to kernel buffer
2790    /// (the background sync thread in [`GraphHandle`] handles the actual `fsync`).
2791    pub fn publish(&mut self) {
2792        if let Err(e) = self.guard.flush() {
2793            tracing::error!("WAL flush failed: {e}");
2794        }
2795        let snap = Arc::new(self.guard.snapshot());
2796        self.snapshot.store(snap);
2797        self.read_cache.store(Arc::new(None));
2798        self.did_publish = true;
2799        // Wake the sync thread — data is in the kernel buffer waiting for fsync.
2800        let (lock, cvar) = self.sync_notify;
2801        let mut pending = lock.lock().unwrap_or_else(|e| e.into_inner());
2802        *pending = true;
2803        cvar.notify_one();
2804    }
2805
2806    /// Access the underlying `KnowledgeGraph` for mutation.
2807    pub fn graph(&mut self) -> &mut KnowledgeGraph {
2808        &mut self.guard
2809    }
2810}
2811
2812impl std::ops::Deref for WriteGuard<'_> {
2813    type Target = KnowledgeGraph;
2814    fn deref(&self) -> &KnowledgeGraph {
2815        &self.guard
2816    }
2817}
2818
2819impl std::ops::DerefMut for WriteGuard<'_> {
2820    fn deref_mut(&mut self) -> &mut KnowledgeGraph {
2821        &mut self.guard
2822    }
2823}
2824
2825impl Drop for WriteGuard<'_> {
2826    fn drop(&mut self) {
2827        if !self.did_publish {
2828            self.publish();
2829        }
2830    }
2831}
2832
2833impl Drop for GraphHandle {
2834    fn drop(&mut self) {
2835        self.stop_sync.store(true, Ordering::Relaxed);
2836        // Wake the sync thread by setting pending=true so the
2837        // wait_timeout_while(|p| !*p) condition breaks immediately.
2838        let (lock, cvar) = &*self.sync_notify;
2839        let mut pending = lock.lock().unwrap_or_else(|e| e.into_inner());
2840        *pending = true;
2841        cvar.notify_one();
2842    }
2843}
2844
2845impl GraphHandle {
2846    /// Open or create the graph at `path`, seeding the initial snapshot.
2847    /// Spawns a background thread that `fsync`s the WAL so that write handlers
2848    /// never block on disk I/O.
2849    pub fn new(path: &Path) -> std::io::Result<Self> {
2850        let kg = KnowledgeGraph::new(path)?;
2851        let snapshot = Arc::new(kg.snapshot());
2852        // Clone the shared sync slot before moving `kg` into the Mutex. The slot
2853        // is updated in place by `compact`/`reopen_truncated`, so the thread
2854        // always fsyncs the current file (D1).
2855        let sync_slot = Arc::clone(&kg.store.sync_slot);
2856        let inner = Arc::new(parking_lot::Mutex::new(kg));
2857
2858        let sync_notify: Arc<(StdMutex<bool>, Condvar)> =
2859            Arc::new((StdMutex::new(false), Condvar::new()));
2860        let notify = Arc::clone(&sync_notify);
2861        let stop_sync = Arc::new(AtomicBool::new(false));
2862
2863        // Background sync thread — calls fsync on a dedicated `Arc<File>`
2864        // handle, never touching the graph mutex. Woken by Condvar after every
2865        // write; falls back to a 1-second timeout.
2866        let sync_stop = Arc::clone(&stop_sync);
2867        std::thread::Builder::new()
2868            .name("mcp-memory-sync".into())
2869            .spawn(move || {
2870                let (lock, cvar) = &*notify;
2871                loop {
2872                    // Wait while there is nothing to sync. Woken by publish()
2873                    // or by the 1-second timeout.
2874                    let mut guard = cvar
2875                        .wait_timeout_while(
2876                            lock.lock().unwrap_or_else(|e| e.into_inner()),
2877                            std::time::Duration::from_secs(1),
2878                            |p| !*p,
2879                        )
2880                        .unwrap_or_else(|e| e.into_inner())
2881                        .0;
2882
2883                    let should_sync = *guard;
2884                    *guard = false;
2885                    // Release the StdMutex before fsync so publish() is not
2886                    // blocked on setting the next pending flag.
2887                    drop(guard);
2888
2889                    if should_sync {
2890                        // Load the current handle each cycle so a compaction
2891                        // that swapped the underlying file is picked up (D1).
2892                        if let Err(e) = sync_slot.load().sync_data() {
2893                            tracing::error!("WAL fsync failed: {e}");
2894                        }
2895                    }
2896
2897                    if sync_stop.load(Ordering::Relaxed) {
2898                        // One final fsync before exiting.
2899                        if let Err(e) = sync_slot.load().sync_data() {
2900                            tracing::error!("WAL final fsync failed: {e}");
2901                        }
2902                        break;
2903                    }
2904                }
2905            })
2906            .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
2907
2908        Ok(Self {
2909            inner,
2910            snapshot: ArcSwap::new(snapshot),
2911            read_cache: ArcSwap::new(Arc::new(None)),
2912            sync_notify,
2913            stop_sync,
2914        })
2915    }
2916
2917    /// Return the cached full-graph JSON, or build and cache it on first call.
2918    /// Invalidated by any write (see [`WriteGuard::publish`]).
2919    pub fn read_graph_cached(&self) -> Arc<str> {
2920        if let Some(cached) = self.read_cache.load().as_ref() {
2921            return cached.clone();
2922        }
2923        let graph = self.read();
2924        let json: Arc<str> = Arc::from(graph.read_graph_json().into_boxed_str());
2925        self.read_cache.store(Arc::new(Some(json.clone())));
2926        json
2927    }
2928
2929    /// Lock-free read snapshot. Holds an `Arc` reference to the frozen graph data.
2930    pub fn read(&self) -> ReadSnapshot {
2931        (**self.snapshot.load()).clone()
2932    }
2933
2934    /// Serialised write access. Returns a guard that publishes a fresh snapshot
2935    /// when dropped (or when [`WriteGuard::publish`] is called eagerly).
2936    pub fn write(&self) -> WriteGuard<'_> {
2937        WriteGuard {
2938            guard: self.inner.lock(),
2939            snapshot: &self.snapshot,
2940            read_cache: &self.read_cache,
2941            sync_notify: &self.sync_notify,
2942            did_publish: false,
2943        }
2944    }
2945}
2946
2947