Skip to main content

mcp_memory/
kg.rs

1use std::collections::VecDeque;
2use std::path::Path;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::{Arc, Condvar, Mutex as StdMutex};
5
6use ahash::{AHashMap, AHashSet};
7use arc_swap::ArcSwap;
8
9use serde::ser::{Serialize, SerializeSeq, SerializeStruct, Serializer};
10
11use crate::errors::{MCSError, Result};
12use crate::intern::{StrId, StringInterner};
13use crate::types::{Entity, Relation, KnowledgeGraphOut};
14use crate::search::SearchIndex;
15use crate::store::{self as store_enc, BinaryStore, RecordKind};
16
17const ENTITY_SLOT_LIVE: u8 = 1;
18const NAME_TABLE_SHARDS: usize = 4;
19
20// ---------------------------------------------------------------------------
21// Prefetch helper – issues a non-binding software prefetch hint to pull a
22// cache-line into L1/L2 while we finish probing the current entry.
23// ---------------------------------------------------------------------------
24#[cfg(target_arch = "x86_64")]
25#[inline(always)]
26unsafe fn prefetch_addr(addr: *const u8) {
27    // _MM_HINT_T0 = 3  (temporal prefetch to all cache levels)
28    std::arch::x86_64::_mm_prefetch::<3>(addr);
29}
30
31#[cfg(not(target_arch = "x86_64"))]
32#[inline(always)]
33const unsafe fn prefetch_addr(_addr: *const u8) {}
34
35/// fsync the directory containing `path` so a rename/create inside it is durable.
36/// On platforms where a directory cannot be opened/synced this is a no-op.
37fn sync_parent_dir(path: &Path) -> std::io::Result<()> {
38    let dir = path.parent().filter(|p| !p.as_os_str().is_empty());
39    let dir = match dir {
40        Some(d) => d,
41        None => Path::new("."),
42    };
43    match std::fs::File::open(dir) {
44        Ok(f) => match f.sync_all() {
45            Ok(()) => Ok(()),
46            // Some filesystems disallow fsync on a directory handle; tolerate it.
47            Err(e) if e.kind() == std::io::ErrorKind::InvalidInput => Ok(()),
48            Err(e) => Err(e),
49        },
50        Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
51        Err(e) => Err(e),
52    }
53}
54
55// ---------------------------------------------------------------------------
56// StoredEntity / StoredRelation – internal representations using StrId.
57// ---------------------------------------------------------------------------
58// Default layout: 40 B / align 8 (Rust packs `state` into the Vec's padding and
59// the `Option` niche is free). With `cache_align`, the slot is rounded to a full
60// 64-byte line so a point lookup/mutation (name_table -> slot index) touches
61// exactly one cache line instead of occasionally straddling two. Costs +60%
62// memory and a wider stride on bulk scans — measure before enabling.
63#[derive(Clone)]
64#[cfg_attr(feature = "cache_align", repr(align(64)))]
65pub(crate) struct StoredEntity {
66    state: u8,
67    pub(crate) name: StrId,
68    pub(crate) entity_type: StrId,
69    pub(crate) observations: Vec<StrId>,
70}
71
72impl StoredEntity {
73    pub(crate) const fn is_live(&self) -> bool {
74        self.state == ENTITY_SLOT_LIVE
75    }
76}
77
78// Default layout: 12 B / align 4 → ~1 in 5 records straddles a 64-byte line.
79// With `cache_align`, align(16) rounds the size to 16 B so 4 records fill a line
80// exactly (no straddle, AVX2-load-friendly) for +33% memory.
81#[derive(Clone)]
82#[cfg_attr(feature = "cache_align", repr(align(16)))]
83pub(crate) struct StoredRelation {
84    pub(crate) from: StrId,
85    pub(crate) to: StrId,
86    pub(crate) relation_type: StrId,
87}
88
89// ---------------------------------------------------------------------------
90// Borrowing serialization views (M6).
91//
92// Read tools used to build owned `Entity`/`Relation` vecs (a fresh `String`
93// per name/type/observation) and *then* serialize them — roughly 2-3x the
94// graph resident at once. These views instead hold references to the selected
95// stored records and emit their interned `&str` directly during
96// serialization, with no intermediate owned strings. The emitted JSON is
97// byte-for-byte identical to serializing `KnowledgeGraphOut`.
98// ---------------------------------------------------------------------------
99
100/// A borrowing view over a selected slice of the graph. Serializes to
101/// `{"entities":[...],"relations":[...]}`.
102pub struct GraphView<'a> {
103    kg: &'a KnowledgeGraph,
104    entities: Vec<&'a StoredEntity>,
105    relations: Vec<&'a StoredRelation>,
106}
107
108impl GraphView<'_> {
109    /// Materialize into the owned [`KnowledgeGraphOut`]. Used by the direct
110    /// (non-serializing) callers and tests; the server's read handlers
111    /// serialize the view directly instead.
112    pub fn to_owned_out(&self) -> KnowledgeGraphOut {
113        KnowledgeGraphOut {
114            entities: self.entities.iter().map(|e| self.kg.entity_to_output(e)).collect(),
115            relations: self.relations.iter().map(|r| self.kg.relation_to_output(r)).collect(),
116        }
117    }
118}
119
120impl Serialize for GraphView<'_> {
121    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
122        let mut st = s.serialize_struct("KnowledgeGraphOut", 2)?;
123        st.serialize_field("entities", &EntityListRef { kg: self.kg, items: &self.entities })?;
124        st.serialize_field("relations", &RelationListRef { kg: self.kg, items: &self.relations })?;
125        st.end()
126    }
127}
128
129struct EntityListRef<'a> { kg: &'a KnowledgeGraph, items: &'a [&'a StoredEntity] }
130impl Serialize for EntityListRef<'_> {
131    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
132        let mut seq = s.serialize_seq(Some(self.items.len()))?;
133        for &e in self.items {
134            seq.serialize_element(&EntityRef { kg: self.kg, e })?;
135        }
136        seq.end()
137    }
138}
139
140struct RelationListRef<'a> { kg: &'a KnowledgeGraph, items: &'a [&'a StoredRelation] }
141impl Serialize for RelationListRef<'_> {
142    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
143        let mut seq = s.serialize_seq(Some(self.items.len()))?;
144        for &r in self.items {
145            seq.serialize_element(&RelationRef { kg: self.kg, r })?;
146        }
147        seq.end()
148    }
149}
150
151struct EntityRef<'a> { kg: &'a KnowledgeGraph, e: &'a StoredEntity }
152impl Serialize for EntityRef<'_> {
153    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
154        let mut st = s.serialize_struct("Entity", 3)?;
155        st.serialize_field("name", self.kg.interner.lookup(self.e.name))?;
156        st.serialize_field("entityType", self.kg.interner.lookup(self.e.entity_type))?;
157        st.serialize_field("observations", &ObsRef { kg: self.kg, obs: &self.e.observations })?;
158        st.end()
159    }
160}
161
162struct ObsRef<'a> { kg: &'a KnowledgeGraph, obs: &'a [StrId] }
163impl Serialize for ObsRef<'_> {
164    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
165        let mut seq = s.serialize_seq(Some(self.obs.len()))?;
166        for &o in self.obs {
167            seq.serialize_element(self.kg.interner.lookup(o))?;
168        }
169        seq.end()
170    }
171}
172
173struct RelationRef<'a> { kg: &'a KnowledgeGraph, r: &'a StoredRelation }
174impl Serialize for RelationRef<'_> {
175    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
176        let mut st = s.serialize_struct("Relation", 3)?;
177        st.serialize_field("from", self.kg.interner.lookup(self.r.from))?;
178        st.serialize_field("to", self.kg.interner.lookup(self.r.to))?;
179        st.serialize_field("relationType", self.kg.interner.lookup(self.r.relation_type))?;
180        st.end()
181    }
182}
183
184/// Edge-following direction for neighborhood queries.
185#[derive(Clone, Copy, PartialEq, Eq, Debug)]
186pub enum Direction {
187    /// Follow `from -> to` (outgoing edges).
188    Out,
189    /// Follow `to -> from` (incoming edges).
190    In,
191    /// Follow edges regardless of orientation.
192    Both,
193}
194
195impl Direction {
196    /// Parse a direction string; anything other than `"out"`/`"in"` is `Both`.
197    pub fn parse(s: Option<&str>) -> Self {
198        match s {
199            Some("out") => Direction::Out,
200            Some("in") => Direction::In,
201            _ => Direction::Both,
202        }
203    }
204}
205
206/// Escape a string for embedding inside a Mermaid/DOT quoted label.
207fn sanitize_label(s: &str) -> String {
208    let mut out = String::with_capacity(s.len());
209    for c in s.chars() {
210        match c {
211            '"' => out.push('\''),
212            '\n' | '\r' => out.push(' '),
213            _ => out.push(c),
214        }
215    }
216    out
217}
218
219// ---------------------------------------------------------------------------
220// ShardedNameTable – open-addressing hash map split into N independent shards.
221//
222// Each shard uses **ctrl-byte bucket** approach: a 1-byte metadata array
223// stores the 7-bit hash stamp (h2) for each slot, with `0xFF` = EMPTY.
224// On probe, the first memory access is a single byte (ctrl). The full key
225// (StrId) is only compared when the stamp matches — ~127/128 of probe steps
226// touch nothing but the ctrl byte.  See also SwissTable / hashbrown.
227// ---------------------------------------------------------------------------
228const EMPTY_SLOT: u8 = 0xFF;
229
230#[inline(always)]
231const fn h2(hash: u64) -> u8 {
232    (hash & 0x7F) as u8
233}
234
235#[inline(always)]
236const fn h1(hash: u64, mask: usize) -> usize {
237    ((hash >> 7) as usize) & mask
238}
239
240#[derive(Clone)]
241struct NameTableShard {
242    ctrl: Vec<u8>,      // 0xFF = empty; 0x00-0x7F = h2 stamp (bit 7 always clear)
243    names: Vec<StrId>,
244    slots: Vec<u32>,
245    mask: usize,
246    count: usize,
247}
248
249impl NameTableShard {
250    fn new(capacity: usize) -> Self {
251        let cap = capacity.next_power_of_two().max(16);
252        Self {
253            ctrl: vec![EMPTY_SLOT; cap],
254            names: vec![StrId::EMPTY; cap],
255            slots: vec![u32::MAX; cap],
256            mask: cap - 1,
257            count: 0,
258        }
259    }
260
261    #[inline(always)]
262    fn lookup(&self, hash: u64, name: StrId) -> Option<u32> {
263        let stamp = h2(hash);
264        let mask = self.mask;
265        let mut idx = h1(hash, mask);
266        let ctrl = self.ctrl.as_ptr();
267        let names = self.names.as_ptr();
268        let slots = self.slots.as_ptr();
269        let len = self.ctrl.len();
270
271        for _ in 0..len {
272            // Prefetch the ctrl byte 4 slots ahead — overlaps memory latency.
273            let prefetch_idx = idx.wrapping_add(4) & mask;
274            unsafe { prefetch_addr(ctrl.add(prefetch_idx)) };
275
276            // SAFETY: idx always < len because of &mask on each iteration.
277            unsafe {
278                let c = *ctrl.add(idx);
279                // Bit 7 set → EMPTY → key not present.
280                if c & 0x80 != 0 {
281                    return None;
282                }
283                // Stamp match → compare full key (rare: ~1/128 probes).
284                if c == stamp && *names.add(idx) == name {
285                    return Some(*slots.add(idx));
286                }
287            }
288            idx = (idx + 1) & mask;
289        }
290        None
291    }
292
293    fn insert(&mut self, interner: &StringInterner, hash: u64, name: StrId, slot: u32) {
294        if self.count * 4 > self.ctrl.len() * 3 {
295            self.grow(interner);
296        }
297        let stamp = h2(hash);
298        let mask = self.mask;
299        let mut idx = h1(hash, mask);
300        loop {
301            // SAFETY: idx & mask always < len for power-of-two capacity.
302            unsafe {
303                if *self.ctrl.get_unchecked(idx) & 0x80 != 0 {
304                    *self.ctrl.get_unchecked_mut(idx) = stamp;
305                    *self.names.get_unchecked_mut(idx) = name;
306                    *self.slots.get_unchecked_mut(idx) = slot;
307                    self.count += 1;
308                    return;
309                }
310            }
311            idx = (idx + 1) & mask;
312        }
313    }
314
315    fn remove(&mut self, interner: &StringInterner, hash: u64, name: StrId) {
316        let stamp = h2(hash);
317        let mask = self.mask;
318        let mut idx = h1(hash, mask);
319        let len = self.ctrl.len();
320        for _ in 0..len {
321            if self.ctrl[idx] & 0x80 != 0 {
322                return;
323            }
324            if self.ctrl[idx] == stamp && self.names[idx] == name {
325                // Found — remove with shift-back to preserve probe chains.
326                self.ctrl[idx] = EMPTY_SLOT;
327                self.names[idx] = StrId::EMPTY;
328                self.slots[idx] = u32::MAX;
329                self.count -= 1;
330
331                let mut next = (idx + 1) & mask;
332                while self.ctrl[next] & 0x80 == 0 {
333                    let nn = self.names[next];
334                    let ns = self.slots[next];
335                    // Hash is no longer stored (M4) — recompute it from the
336                    // interned name to find the entry's ideal bucket.
337                    let nh = interner.get_hash(nn);
338                    self.ctrl[next] = EMPTY_SLOT;
339                    self.names[next] = StrId::EMPTY;
340                    self.slots[next] = u32::MAX;
341                    self.count -= 1;
342
343                    // Re-insert at its ideal bucket.
344                    let nstamp = h2(nh);
345                    let mut re_idx = h1(nh, mask);
346                    while self.ctrl[re_idx] & 0x80 == 0 {
347                        re_idx = (re_idx + 1) & mask;
348                    }
349                    self.ctrl[re_idx] = nstamp;
350                    self.names[re_idx] = nn;
351                    self.slots[re_idx] = ns;
352                    self.count += 1;
353
354                    next = (next + 1) & mask;
355                }
356                return;
357            }
358            idx = (idx + 1) & mask;
359        }
360    }
361
362    fn grow(&mut self, interner: &StringInterner) {
363        let new_cap = self.ctrl.len() * 2;
364        let new_mask = new_cap - 1;
365        let mut new_ctrl = vec![EMPTY_SLOT; new_cap];
366        let mut new_names = vec![StrId::EMPTY; new_cap];
367        let mut new_slots = vec![u32::MAX; new_cap];
368
369        for i in 0..self.ctrl.len() {
370            if self.ctrl[i] & 0x80 == 0 {
371                // Recompute the hash from the interned name (M4: not stored).
372                let name = self.names[i];
373                let hash = interner.get_hash(name);
374                let stamp = h2(hash);
375                let mut idx = h1(hash, new_mask);
376                while new_ctrl[idx] & 0x80 == 0 {
377                    idx = (idx + 1) & new_mask;
378                }
379                new_ctrl[idx] = stamp;
380                new_names[idx] = name;
381                new_slots[idx] = self.slots[i];
382            }
383        }
384
385        self.ctrl = new_ctrl;
386        self.names = new_names;
387        self.slots = new_slots;
388        self.mask = new_mask;
389    }
390}
391
392#[derive(Clone)]
393struct ShardedNameTable {
394    shards: [NameTableShard; NAME_TABLE_SHARDS],
395}
396
397impl ShardedNameTable {
398    fn new(capacity_per_shard: usize) -> Self {
399        Self {
400            shards: [
401                NameTableShard::new(capacity_per_shard),
402                NameTableShard::new(capacity_per_shard),
403                NameTableShard::new(capacity_per_shard),
404                NameTableShard::new(capacity_per_shard),
405            ],
406        }
407    }
408
409    #[inline(always)]
410    const fn shard(hash: u64) -> usize {
411        (hash as usize) & (NAME_TABLE_SHARDS - 1)
412    }
413
414    #[inline(always)]
415    fn lookup(&self, hash: u64, name: StrId) -> Option<u32> {
416        self.shards[Self::shard(hash)].lookup(hash, name)
417    }
418
419    #[inline(always)]
420    fn insert(&mut self, interner: &StringInterner, hash: u64, name: StrId, slot: u32) {
421        self.shards[Self::shard(hash)].insert(interner, hash, name, slot);
422    }
423
424    #[inline(always)]
425    fn remove(&mut self, interner: &StringInterner, hash: u64, name: StrId) {
426        self.shards[Self::shard(hash)].remove(interner, hash, name);
427    }
428}
429
430// ---------------------------------------------------------------------------
431// KnowledgeGraph – the central type.
432// ---------------------------------------------------------------------------
433pub struct KnowledgeGraph {
434    interner: StringInterner,
435    entity_slots: Vec<Option<StoredEntity>>,
436    /// Tombstoned slot indices available for reuse on the next create (M2),
437    /// so create/delete churn doesn't grow `entity_slots` without bound.
438    free_slots: Vec<u32>,
439    name_table: ShardedNameTable,
440    relations: Vec<StoredRelation>,
441    /// Incremental adjacency index: StrId → outgoing (to, type) pairs.
442    /// Updated on every create_relations/delete_relations/delete_entities.
443    /// Traversals use this instead of rebuilding from scratch (item 3 in plan).
444    adjacency: AHashMap<StrId, Vec<(StrId, StrId)>>,
445    search: SearchIndex,
446    store: BinaryStore,
447}
448
449// ---------------------------------------------------------------------------
450// ReadSnapshot – wait-free, lock-free frozen view of the graph for readers.
451// Created by KnowledgeGraph::snapshot() after each write transaction.
452// ---------------------------------------------------------------------------
453#[derive(Clone)]
454pub struct ReadSnapshot {
455    pub(crate) interner: StringInterner,
456    pub(crate) entity_slots: Arc<[Option<StoredEntity>]>,
457    #[allow(dead_code)]
458    free_slots: Vec<u32>,
459    name_table: ShardedNameTable,
460    pub(crate) relations: Arc<[StoredRelation]>,
461    adjacency: AHashMap<StrId, Vec<(StrId, StrId)>>,
462    search: SearchIndex,
463}
464
465// Fast JSON string escaper — writes escaped string into the buffer without
466// allocating an intermediate string for the escape pass.
467pub(crate) fn push_json_str(buf: &mut String, s: &str) {
468    buf.push('"');
469    for c in s.chars() {
470        match c {
471            '"' => buf.push_str("\\\""),
472            '\\' => buf.push_str("\\\\"),
473            '\n' => buf.push_str("\\n"),
474            '\r' => buf.push_str("\\r"),
475            '\t' => buf.push_str("\\t"),
476            c if c.is_control() => {
477                use std::fmt::Write;
478                write!(buf, "\\u{:04x}", c as u32).unwrap();
479            }
480            c => buf.push(c),
481        }
482    }
483    buf.push('"');
484}
485
486/// A borrowing view over a selection of entities/relations from a ReadSnapshot.
487/// Serializes without intermediate owned String allocations.
488pub struct ReadGraphView<'a> {
489    snap: &'a ReadSnapshot,
490    entities: Vec<&'a StoredEntity>,
491    relations: Vec<&'a StoredRelation>,
492}
493
494impl Serialize for ReadGraphView<'_> {
495    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
496        let mut st = s.serialize_struct("KnowledgeGraphOut", 2)?;
497        st.serialize_field("entities", &ReadEntityListRef { snap: self.snap, items: &self.entities })?;
498        st.serialize_field("relations", &ReadRelationListRef { snap: self.snap, items: &self.relations })?;
499        st.end()
500    }
501}
502
503struct ReadEntityListRef<'a> { snap: &'a ReadSnapshot, items: &'a [&'a StoredEntity] }
504impl Serialize for ReadEntityListRef<'_> {
505    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
506        let mut seq = s.serialize_seq(Some(self.items.len()))?;
507        for &e in self.items {
508            seq.serialize_element(&ReadEntityRef { snap: self.snap, e })?;
509        }
510        seq.end()
511    }
512}
513
514struct ReadRelationListRef<'a> { snap: &'a ReadSnapshot, items: &'a [&'a StoredRelation] }
515impl Serialize for ReadRelationListRef<'_> {
516    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
517        let mut seq = s.serialize_seq(Some(self.items.len()))?;
518        for &r in self.items {
519            seq.serialize_element(&ReadRelationRef { snap: self.snap, r })?;
520        }
521        seq.end()
522    }
523}
524
525struct ReadEntityRef<'a> { snap: &'a ReadSnapshot, e: &'a StoredEntity }
526impl Serialize for ReadEntityRef<'_> {
527    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
528        let mut st = s.serialize_struct("Entity", 3)?;
529        st.serialize_field("name", self.snap.interner.lookup(self.e.name))?;
530        st.serialize_field("entityType", self.snap.interner.lookup(self.e.entity_type))?;
531        st.serialize_field("observations", &ReadObsRef { snap: self.snap, obs: &self.e.observations })?;
532        st.end()
533    }
534}
535
536struct ReadObsRef<'a> { snap: &'a ReadSnapshot, obs: &'a [StrId] }
537impl Serialize for ReadObsRef<'_> {
538    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
539        let mut seq = s.serialize_seq(Some(self.obs.len()))?;
540        for &o in self.obs {
541            seq.serialize_element(self.snap.interner.lookup(o))?;
542        }
543        seq.end()
544    }
545}
546
547struct ReadRelationRef<'a> { snap: &'a ReadSnapshot, r: &'a StoredRelation }
548impl Serialize for ReadRelationRef<'_> {
549    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
550        let mut st = s.serialize_struct("Relation", 3)?;
551        st.serialize_field("from", self.snap.interner.lookup(self.r.from))?;
552        st.serialize_field("to", self.snap.interner.lookup(self.r.to))?;
553        st.serialize_field("relationType", self.snap.interner.lookup(self.r.relation_type))?;
554        st.end()
555    }
556}
557
558// --- ReadSnapshot helpers (mirrors KnowledgeGraph helpers) ---
559impl ReadSnapshot {
560
561    /// Serialize the full graph directly to a JSON string, avoiding intermediate
562    /// owned `Entity`/`Relation` allocations. This is the fast path for handlers.
563    pub fn read_graph_json(&self) -> String {
564        // Rough capacity: 40K entities × ~60 B + 120K relations × ~55 B = ~9 MB
565        let cap = self.entity_slots.len() * 64 + self.relations.len() * 60 + 128;
566        let mut buf = String::with_capacity(cap);
567
568        // entities array
569        buf.push_str(r#"{"entities":["#);
570        let mut first = true;
571        for slot in self.entity_slots.iter() {
572            let Some(e) = slot.as_ref().filter(|e| e.is_live()) else { continue };
573            if first { first = false } else { buf.push(',') }
574            buf.push('{');
575            // name
576            buf.push_str(r#""name":"#);
577            push_json_str(&mut buf, self.interner.lookup(e.name));
578            buf.push(',');
579            // entityType
580            buf.push_str(r#""entityType":"#);
581            push_json_str(&mut buf, self.interner.lookup(e.entity_type));
582            buf.push(',');
583            // observations
584            buf.push_str(r#""observations":["#);
585            for (oi, o) in e.observations.iter().enumerate() {
586                if oi > 0 { buf.push(',') }
587                push_json_str(&mut buf, self.interner.lookup(*o));
588            }
589            buf.push_str("]}");
590        }
591
592        // relations array
593        buf.push_str(r#"],"relations":["#);
594        first = true;
595        for r in self.relations.iter() {
596            if first { first = false } else { buf.push(',') }
597            buf.push('{');
598            buf.push_str(r#""from":"#);
599            push_json_str(&mut buf, self.interner.lookup(r.from));
600            buf.push(',');
601            buf.push_str(r#""to":"#);
602            push_json_str(&mut buf, self.interner.lookup(r.to));
603            buf.push(',');
604            buf.push_str(r#""relationType":"#);
605            push_json_str(&mut buf, self.interner.lookup(r.relation_type));
606            buf.push('}');
607        }
608        buf.push_str("]}");
609
610        buf
611    }
612
613    /// Borrowing view of the full graph, suitable for serde-serialization without
614    /// intermediate owned String allocations.
615    pub fn read_graph_view(&self) -> ReadGraphView<'_> {
616        let entities: Vec<&StoredEntity> = self
617            .entity_slots
618            .iter()
619            .filter_map(|s| s.as_ref().filter(|e| e.is_live()))
620            .collect();
621        let relations: Vec<&StoredRelation> = self.relations.iter().collect();
622        ReadGraphView { snap: self, entities, relations }
623    }
624
625    fn lookup_live_slot(&self, name: &str) -> Option<u32> {
626        let name_id = self.interner.get_optional(name)?;
627        let hash = self.interner.get_hash(name_id);
628        let slot = self.name_table.lookup(hash, name_id)?;
629        self.entity_slots
630            .get(slot as usize)
631            .and_then(|s| s.as_ref())
632            .filter(|e| e.is_live())?;
633        Some(slot)
634    }
635
636    fn entity_by_name_id(&self, name_id: StrId) -> Option<Entity> {
637        let hash = self.interner.get_hash(name_id);
638        let slot = self.name_table.lookup(hash, name_id)?;
639        let e = self.entity_slots.get(slot as usize)?.as_ref()?;
640        Some(self.entity_to_output(e))
641    }
642
643    pub(crate) fn entity_to_output(&self, e: &StoredEntity) -> Entity {
644        Entity {
645            name: self.interner.lookup(e.name).to_string(),
646            entity_type: self.interner.lookup(e.entity_type).to_string(),
647            observations: e
648                .observations
649                .iter()
650                .map(|o| self.interner.lookup(*o).to_string())
651                .collect(),
652        }
653    }
654
655    pub(crate) fn relation_to_output(&self, r: &StoredRelation) -> Relation {
656        Relation {
657            from: self.interner.lookup(r.from).to_string(),
658            to: self.interner.lookup(r.to).to_string(),
659            relation_type: self.interner.lookup(r.relation_type).to_string(),
660        }
661    }
662
663    /// Open named entities + incident relations.
664    pub fn open_nodes(&self, names: &[String]) -> KnowledgeGraphOut {
665        let name_ids: std::collections::HashSet<StrId> = names
666            .iter()
667            .filter_map(|n| self.interner.get_optional(n))
668            .collect();
669        let entities: Vec<Entity> = self
670            .entity_slots
671            .iter()
672            .filter_map(|s| {
673                let e = s.as_ref()?;
674                if e.is_live() && name_ids.contains(&e.name) {
675                    Some(self.entity_to_output(e))
676                } else {
677                    None
678                }
679            })
680            .collect();
681        let matched: std::collections::HashSet<StrId> = entities.iter()
682            .filter_map(|e| self.interner.get_optional(&e.name))
683            .collect();
684        let relations: Vec<Relation> = self
685            .relations
686            .iter()
687            .filter(|r| matched.contains(&r.from) || matched.contains(&r.to))
688            .map(|r| self.relation_to_output(r))
689            .collect();
690        KnowledgeGraphOut { entities, relations }
691    }
692
693    /// Full graph read. Returns all entities and relations.
694    pub fn read_graph(&self) -> KnowledgeGraphOut {
695        let entities: Vec<Entity> = self
696            .entity_slots
697            .iter()
698            .filter_map(|s| s.as_ref().filter(|e| e.is_live()))
699            .map(|e| self.entity_to_output(e))
700            .collect();
701        let relations: Vec<Relation> = self
702            .relations
703            .iter()
704            .map(|r| self.relation_to_output(r))
705            .collect();
706        KnowledgeGraphOut { entities, relations }
707    }
708
709    /// Get a single entity by name.
710    pub fn get_entity(&self, name: &str) -> Option<Entity> {
711        self.lookup_live_slot(name)?;
712        let name_id = self.interner.get_optional(name)?;
713        self.entity_by_name_id(name_id)
714    }
715
716    /// Neighborhood expansion (same logic as KnowledgeGraph::neighbors but read-only).
717    pub fn neighbors(
718        &self,
719        name: &str,
720        direction: Direction,
721        rtype: Option<&str>,
722        depth: u32,
723    ) -> Result<KnowledgeGraphOut> {
724        self.lookup_live_slot(name)
725            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
726        let start = self.interner.get_optional(name).unwrap();
727
728        let rtype_id = match rtype {
729            Some(r) => match self.interner.get_optional(r) {
730                Some(id) => Some(id),
731                None => {
732                    let entities = self.entity_by_name_id(start).into_iter().collect();
733                    return Ok(KnowledgeGraphOut { entities, relations: Vec::new() });
734                }
735            },
736            None => None,
737        };
738
739        let mut visited: AHashSet<StrId> = AHashSet::new();
740        visited.insert(start);
741
742        let type_ok = |r: &StoredRelation, rt: Option<StrId>| rt.is_none_or(|rt_id| r.relation_type == rt_id);
743
744        if depth == 1 {
745            for r in self.relations.iter().filter(|r| type_ok(r, rtype_id)) {
746                match direction {
747                    Direction::Out => {
748                        if r.from == start { visited.insert(r.to); }
749                    }
750                    Direction::In => {
751                        if r.to == start { visited.insert(r.from); }
752                    }
753                    Direction::Both => {
754                        if r.from == start { visited.insert(r.to); }
755                        else if r.to == start { visited.insert(r.from); }
756                    }
757                }
758            }
759        } else if depth >= 2 {
760            let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
761            match direction {
762                Direction::Both => {
763                    for (&node, edges) in &self.adjacency {
764                        for &(nb, rt) in edges {
765                            if rtype_id.is_none_or(|rt_id| rt == rt_id) {
766                                adj.entry(node).or_default().push(nb);
767                            }
768                        }
769                    }
770                }
771                Direction::Out | Direction::In => {
772                    for r in self.relations.iter().filter(|r| type_ok(r, rtype_id)) {
773                        match direction {
774                            Direction::Out => adj.entry(r.from).or_default().push(r.to),
775                            Direction::In => adj.entry(r.to).or_default().push(r.from),
776                            _ => unreachable!(),
777                        }
778                    }
779                }
780            }
781            let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
782            queue.push_back((start, 0));
783            while let Some((node, d)) = queue.pop_front() {
784                if d >= depth { continue; }
785                if let Some(nbrs) = adj.get(&node) {
786                    for &nb in nbrs {
787                        if visited.insert(nb) {
788                            queue.push_back((nb, d + 1));
789                        }
790                    }
791                }
792            }
793        }
794
795        let mut entities = Vec::with_capacity(visited.len());
796        for &nid in &visited {
797            if let Some(e) = self.entity_by_name_id(nid) {
798                entities.push(e);
799            }
800        }
801        let relations: Vec<Relation> = self
802            .relations
803            .iter()
804            .filter(|r| type_ok(r, rtype_id) && visited.contains(&r.from) && visited.contains(&r.to))
805            .map(|r| self.relation_to_output(r))
806            .collect();
807        Ok(KnowledgeGraphOut { entities, relations })
808    }
809
810    /// Describe an entity: entity details, incident relations, neighbors, degree.
811    pub fn describe_entity(&self, name: &str) -> Result<serde_json::Value> {
812        let name_id = self
813            .interner
814            .get_optional(name)
815            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
816        let entity = self
817            .entity_by_name_id(name_id)
818            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
819
820        let mut incident: Vec<Relation> = Vec::new();
821        let mut neighbor_seen: AHashSet<StrId> = AHashSet::new();
822        let mut neighbors: Vec<&str> = Vec::new();
823        for r in self.relations.iter() {
824            if r.from == name_id || r.to == name_id {
825                incident.push(self.relation_to_output(r));
826                let other = if r.from == name_id { r.to } else { r.from };
827                if other != name_id && neighbor_seen.insert(other) {
828                    neighbors.push(self.interner.lookup(other));
829                }
830            }
831        }
832
833        Ok(serde_json::json!({
834            "entity": entity,
835            "relations": incident,
836            "neighbors": neighbors,
837            "degree": incident.len(),
838        }))
839    }
840
841    /// Find the shortest path between two entities.
842    pub fn find_path(&self, from: &str, to: &str) -> Result<Vec<String>> {
843        let from_id = self
844            .interner
845            .get_optional(from)
846            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
847        let to_id = self
848            .interner
849            .get_optional(to)
850            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
851        if self.lookup_live_slot(from).is_none() {
852            return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
853        }
854        if self.lookup_live_slot(to).is_none() {
855            return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
856        }
857
858        // BFS over incremental adjacency index.
859        let mut visited: AHashSet<StrId> = AHashSet::new();
860        let mut parent: AHashMap<StrId, StrId> = AHashMap::new();
861        let mut queue: VecDeque<StrId> = VecDeque::new();
862
863        visited.insert(from_id);
864        queue.push_back(from_id);
865
866        while let Some(current) = queue.pop_front() {
867            if current == to_id { break; }
868            if let Some(neighbors) = self.adjacency.get(&current) {
869                for &(neighbor, _) in neighbors {
870                    if visited.insert(neighbor) {
871                        parent.insert(neighbor, current);
872                        queue.push_back(neighbor);
873                    }
874                }
875            }
876        }
877
878        if !visited.contains(&to_id) {
879            return Err(MCSError::MemoryError(format!(
880                "No path found between '{from}' and '{to}'"
881            )));
882        }
883
884        let mut path = Vec::new();
885        let mut cur = to_id;
886        path.push(self.interner.lookup(cur).to_string());
887        while let Some(&p) = parent.get(&cur) {
888            path.push(self.interner.lookup(p).to_string());
889            cur = p;
890        }
891        path.reverse();
892        Ok(path)
893    }
894
895    /// Extract a subgraph around the given entities.
896    pub fn extract_subgraph(&self, names: &[String], depth: u32) -> Result<KnowledgeGraphOut> {
897        if names.is_empty() {
898            return Ok(KnowledgeGraphOut { entities: Vec::new(), relations: Vec::new() });
899        }
900        let mut visited: AHashSet<StrId> = AHashSet::new();
901        let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
902        for name in names {
903            if let Some(id) = self.interner.get_optional(name)
904                && visited.insert(id)
905            {
906                queue.push_back((id, 0));
907            }
908        }
909        let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
910        for (&node, edges) in &self.adjacency {
911            let nbrs: Vec<StrId> = edges.iter().map(|(to, _)| *to).collect();
912            adj.insert(node, nbrs);
913        }
914        while let Some((node, d)) = queue.pop_front() {
915            if d >= depth { continue; }
916            if let Some(nbrs) = adj.get(&node) {
917                for &nb in nbrs {
918                    if visited.insert(nb) {
919                        queue.push_back((nb, d + 1));
920                    }
921                }
922            }
923        }
924        let mut entities: Vec<Entity> = Vec::with_capacity(visited.len());
925        for &nid in &visited {
926            if let Some(e) = self.entity_by_name_id(nid) {
927                entities.push(e);
928            }
929        }
930        let relations: Vec<Relation> = self
931            .relations
932            .iter()
933            .filter(|r| visited.contains(&r.from) && visited.contains(&r.to))
934            .map(|r| self.relation_to_output(r))
935            .collect();
936        Ok(KnowledgeGraphOut { entities, relations })
937    }
938
939    /// Batch get entities.
940    pub fn batch_get_entities(&self, names: &[String]) -> Vec<Option<Entity>> {
941        names.iter().map(|n| self.get_entity(n)).collect()
942    }
943
944    /// Graph statistics.
945    pub fn graph_stats(&self) -> serde_json::Value {
946        let entity_count = self
947            .entity_slots
948            .iter()
949            .filter(|s| s.as_ref().is_some_and(|e| e.is_live()))
950            .count();
951        let relation_count = self.relations.len();
952        let type_counts = self.entity_type_counts();
953        let relation_type_counts = self.relation_type_counts();
954        serde_json::json!({
955            "entities": entity_count,
956            "relations": relation_count,
957            "entityTypes": type_counts,
958            "relationTypes": relation_type_counts,
959        })
960    }
961
962    /// Search relations by from/to/type filters.
963    pub fn search_relations(&self, from: Option<&str>, to: Option<&str>, rtype: Option<&str>) -> Vec<Relation> {
964        let from_id = from.and_then(|n| self.interner.get_optional(n));
965        let to_id = to.and_then(|n| self.interner.get_optional(n));
966        let rtype_id = rtype.and_then(|n| self.interner.get_optional(n));
967        self.relations
968            .iter()
969            .filter(|r| {
970                from_id.is_none_or(|id| r.from == id)
971                    && to_id.is_none_or(|id| r.to == id)
972                    && rtype_id.is_none_or(|id| r.relation_type == id)
973            })
974            .map(|r| self.relation_to_output(r))
975            .collect()
976    }
977
978    /// Count entities per type.
979    pub fn entity_type_counts(&self) -> Vec<(String, usize)> {
980        let mut counts: AHashMap<StrId, usize> = AHashMap::new();
981        for slot in self.entity_slots.iter() {
982            if let Some(e) = slot.as_ref().filter(|e| e.is_live()) {
983                *counts.entry(e.entity_type).or_default() += 1;
984            }
985        }
986        let mut result: Vec<(String, usize)> = counts
987            .into_iter()
988            .map(|(id, c)| (self.interner.lookup(id).to_string(), c))
989            .collect();
990        result.sort_by(|a, b| a.0.cmp(&b.0));
991        result
992    }
993
994    /// Count relations per type.
995    pub fn relation_type_counts(&self) -> Vec<(String, usize)> {
996        let mut counts: AHashMap<StrId, usize> = AHashMap::new();
997        for r in self.relations.iter() {
998            *counts.entry(r.relation_type).or_default() += 1;
999        }
1000        let mut result: Vec<(String, usize)> = counts
1001            .into_iter()
1002            .map(|(id, c)| (self.interner.lookup(id).to_string(), c))
1003            .collect();
1004        result.sort_by(|a, b| a.0.cmp(&b.0));
1005        result
1006    }
1007
1008    /// Export the graph in one of: json, mermaid, dot.
1009    pub fn export(&self, format: &str) -> Result<String> {
1010        match format {
1011            "json" => serde_json::to_string(&self.read_graph()).map_err(MCSError::JsonError),
1012            "mermaid" => Ok(self.export_mermaid()),
1013            "dot" => Ok(self.export_dot()),
1014            other => Err(MCSError::InvalidParams(format!(
1015                "Unknown export format '{other}' (expected json|mermaid|dot)"
1016            ))),
1017        }
1018    }
1019
1020    fn export_mermaid(&self) -> String {
1021        let mut out = String::with_capacity(4096);
1022        out.push_str("graph LR\n");
1023        for r in self.relations.iter() {
1024            let from = sanitize_label(self.interner.lookup(r.from));
1025            let to = sanitize_label(self.interner.lookup(r.to));
1026            let rt = sanitize_label(self.interner.lookup(r.relation_type));
1027            out.push_str(&format!("    {} -- \"{}\" --> {}\n", from, rt, to));
1028        }
1029        out
1030    }
1031
1032    fn export_dot(&self) -> String {
1033        let mut out = String::with_capacity(4096);
1034        out.push_str("digraph KG {\n");
1035        out.push_str("    rankdir=LR;\n");
1036        for slot in self.entity_slots.iter() {
1037            if let Some(e) = slot.as_ref().filter(|e| e.is_live()) {
1038                let name = sanitize_label(self.interner.lookup(e.name));
1039                let etype = sanitize_label(self.interner.lookup(e.entity_type));
1040                out.push_str(&format!("    \"{}\" [label=\"{}\n({})\"];\n", name, name, etype));
1041            }
1042        }
1043        for r in self.relations.iter() {
1044            let from = sanitize_label(self.interner.lookup(r.from));
1045            let to = sanitize_label(self.interner.lookup(r.to));
1046            let rt = sanitize_label(self.interner.lookup(r.relation_type));
1047            out.push_str(&format!("    \"{}\" -> \"{}\" [label=\"{}\"];\n", from, to, rt));
1048        }
1049        out.push_str("}\n");
1050        out
1051    }
1052
1053    /// Find all paths between two entities.
1054    pub fn find_all_paths(
1055        &self,
1056        from: &str,
1057        to: &str,
1058        max_depth: usize,
1059        max_paths: usize,
1060    ) -> Result<Vec<Vec<String>>> {
1061        let from_id = self
1062            .interner
1063            .get_optional(from)
1064            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
1065        let to_id = self
1066            .interner
1067            .get_optional(to)
1068            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
1069        if self.lookup_live_slot(from).is_none() {
1070            return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
1071        }
1072        if self.lookup_live_slot(to).is_none() {
1073            return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
1074        }
1075        if from_id == to_id {
1076            return Ok(vec![vec![from.to_string()]]);
1077        }
1078        let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::with_capacity(self.adjacency.len());
1079        for (&node, edges) in &self.adjacency {
1080            let nbrs: Vec<StrId> = edges.iter().map(|(to, _)| *to).collect();
1081            adj.insert(node, nbrs);
1082        }
1083        let mut all_paths: Vec<Vec<StrId>> = Vec::new();
1084        let mut current_path = Vec::new();
1085        let mut visited: AHashSet<StrId> = AHashSet::new();
1086        visited.insert(from_id);
1087        current_path.push(from_id);
1088        Self::dfs_all_paths(
1089            &adj,
1090            from_id,
1091            to_id,
1092            max_depth,
1093            max_paths,
1094            &mut visited,
1095            &mut current_path,
1096            &mut all_paths,
1097        );
1098        if all_paths.is_empty() {
1099            return Err(MCSError::MemoryError(format!(
1100                "No path found between '{from}' and '{to}'"
1101            )));
1102        }
1103        let result: Vec<Vec<String>> = all_paths
1104            .into_iter()
1105            .map(|path| {
1106                path.into_iter()
1107                    .map(|id| self.interner.lookup(id).to_string())
1108                    .collect()
1109            })
1110            .collect();
1111        Ok(result)
1112    }
1113
1114    fn dfs_all_paths(
1115        adj: &AHashMap<StrId, Vec<StrId>>,
1116        current: StrId,
1117        target: StrId,
1118        max_depth: usize,
1119        max_paths: usize,
1120        visited: &mut AHashSet<StrId>,
1121        current_path: &mut Vec<StrId>,
1122        all_paths: &mut Vec<Vec<StrId>>,
1123    ) {
1124        if all_paths.len() >= max_paths { return; }
1125        if current == target && current_path.len() > 1 {
1126            all_paths.push(current_path.clone());
1127            return;
1128        }
1129        if current_path.len() > max_depth { return; }
1130        if let Some(neighbors) = adj.get(&current) {
1131            for &nb in neighbors {
1132                if !visited.contains(&nb) {
1133                    visited.insert(nb);
1134                    current_path.push(nb);
1135                    Self::dfs_all_paths(adj, nb, target, max_depth, max_paths, visited, current_path, all_paths);
1136                    current_path.pop();
1137                    visited.remove(&nb);
1138                }
1139            }
1140        }
1141    }
1142
1143    /// Search for entities whose name/type/observations contain `query`.
1144    pub fn search_entities(&self, query: &str) -> Result<Vec<Entity>> {
1145        let token = query.to_lowercase();
1146        let matching = self.search.search(&token, &self.interner);
1147        Ok(matching
1148            .iter()
1149            .filter_map(|idx| {
1150                self.entity_slots
1151                    .get(*idx as usize)?
1152                    .as_ref()
1153                    .filter(|e| e.is_live())
1154                    .map(|e| self.entity_to_output(e))
1155            })
1156            .collect())
1157    }
1158}
1159
1160impl KnowledgeGraph {
1161    pub fn new(path: &Path) -> std::io::Result<Self> {
1162        let store = BinaryStore::new(path)?;
1163
1164        // Replay into local collections, then assign into self — no raw pointers needed (X3).
1165        let mut interner = StringInterner::with_capacity(65536, 1024);
1166        let mut entity_slots: Vec<Option<StoredEntity>> = Vec::with_capacity(256);
1167        let mut name_table = ShardedNameTable::new(64);
1168        let mut relations: Vec<StoredRelation> = Vec::with_capacity(64);
1169        let mut search = SearchIndex::new();
1170
1171        // Transaction buffer: while `Some`, records are accumulated and only
1172        // applied on `TxnCommit`. An unclosed transaction at EOF is discarded,
1173        // which is what makes multi-record operations (e.g. `merge_entities`)
1174        // crash-atomic.
1175        let mut pending: Option<Vec<(RecordKind, Vec<u8>)>> = None;
1176        store.replay(|kind, data| {
1177            match kind {
1178                RecordKind::TxnBegin => pending = Some(Vec::new()),
1179                RecordKind::TxnCommit => {
1180                    if let Some(buffered) = pending.take() {
1181                        for (k, d) in &buffered {
1182                            Self::apply_record(
1183                                *k, d, &mut interner, &mut entity_slots, &mut search,
1184                                &mut name_table, &mut relations,
1185                            );
1186                        }
1187                    }
1188                }
1189                other => match pending.as_mut() {
1190                    Some(buffered) => buffered.push((other, data.to_vec())),
1191                    None => Self::apply_record(
1192                        other, data, &mut interner, &mut entity_slots, &mut search,
1193                        &mut name_table, &mut relations,
1194                    ),
1195                },
1196            }
1197        })?;
1198
1199        // Slots tombstoned by deletes during replay are available for reuse (M2).
1200        let free_slots: Vec<u32> = entity_slots
1201            .iter()
1202            .enumerate()
1203            .filter(|(_, s)| s.is_none())
1204            .map(|(i, _)| i as u32)
1205            .collect();
1206
1207        let mut adjacency: AHashMap<StrId, Vec<(StrId, StrId)>> = AHashMap::new();
1208        for rel in &relations {
1209            adjacency.entry(rel.from).or_default().push((rel.to, rel.relation_type));
1210            adjacency.entry(rel.to).or_default().push((rel.from, rel.relation_type));
1211        }
1212
1213        Ok(Self {
1214            interner,
1215            entity_slots,
1216            free_slots,
1217            name_table,
1218            relations,
1219            adjacency,
1220            search,
1221            store,
1222        })
1223    }
1224
1225    // -----------------------------------------------------------------------
1226    // Replay helpers (static to avoid borrow issues in the closure)
1227    // -----------------------------------------------------------------------
1228
1229    /// Apply one already-decoded log record to the in-memory collections.
1230    /// Shared by direct replay and by transaction commit. `TxnBegin`/`TxnCommit`
1231    /// are handled by the caller and are no-ops here.
1232    #[allow(clippy::too_many_arguments)]
1233    fn apply_record(
1234        kind: RecordKind,
1235        data: &[u8],
1236        interner: &mut StringInterner,
1237        entity_slots: &mut Vec<Option<StoredEntity>>,
1238        search: &mut SearchIndex,
1239        name_table: &mut ShardedNameTable,
1240        relations: &mut Vec<StoredRelation>,
1241    ) {
1242        match kind {
1243            RecordKind::CreateEntity => {
1244                if let Some((name, etype, obs)) = store_enc::decode_create_entity(data) {
1245                    Self::replay_create_entity(
1246                        interner, entity_slots, search, name_table, name, etype, &obs,
1247                    );
1248                }
1249            }
1250            RecordKind::CreateRelation => {
1251                if let Some((from, to, rtype)) = store_enc::decode_create_relation(data) {
1252                    let from_id = interner.intern(from);
1253                    let to_id = interner.intern(to);
1254                    let type_id = interner.intern(rtype);
1255                    relations.push(StoredRelation {
1256                        from: from_id,
1257                        to: to_id,
1258                        relation_type: type_id,
1259                    });
1260                }
1261            }
1262            RecordKind::AddObservations => {
1263                if let Some((name, obs)) = store_enc::decode_add_observations(data) {
1264                    Self::replay_add_observations(
1265                        interner, entity_slots, search, name_table, name, &obs,
1266                    );
1267                }
1268            }
1269            RecordKind::DeleteEntity => {
1270                if let Some(name) = store_enc::decode_delete_entity(data) {
1271                    Self::replay_delete_entity(
1272                        interner, entity_slots, relations, search, name_table, name,
1273                    );
1274                }
1275            }
1276            RecordKind::DeleteObservations => {
1277                if let Some((name, obs)) = store_enc::decode_delete_observations(data) {
1278                    Self::replay_delete_observations(
1279                        interner, entity_slots, search, name_table, name, &obs,
1280                    );
1281                }
1282            }
1283            RecordKind::DeleteRelation => {
1284                if let Some((from, to, rtype)) = store_enc::decode_delete_relation(data) {
1285                    let from_id = interner.intern(from);
1286                    let to_id = interner.intern(to);
1287                    let type_id = interner.intern(rtype);
1288                    relations.retain(|r| {
1289                        !(r.from == from_id && r.to == to_id && r.relation_type == type_id)
1290                    });
1291                }
1292            }
1293            RecordKind::TxnBegin | RecordKind::TxnCommit => {}
1294        }
1295    }
1296
1297    #[allow(clippy::ptr_arg)]
1298    fn replay_create_entity(
1299        interner: &mut StringInterner,
1300        entities: &mut Vec<Option<StoredEntity>>,
1301        search: &mut SearchIndex,
1302        name_table: &mut ShardedNameTable,
1303        name: &str,
1304        etype: &str,
1305        observations: &[&str],
1306    ) {
1307        let name_id = interner.intern(name);
1308        let type_id = interner.intern(etype);
1309        let obs_ids: Vec<StrId> = observations.iter().map(|o| interner.intern(o)).collect();
1310        let slot = entities.len() as u32;
1311        entities.push(Some(StoredEntity {
1312            state: ENTITY_SLOT_LIVE,
1313            name: name_id,
1314            entity_type: type_id,
1315            observations: obs_ids.clone(),
1316        }));
1317        let hash = interner.get_hash(name_id);
1318        name_table.insert(&*interner, hash, name_id, slot);
1319        search.index_entity(interner, slot, name_id, type_id, &obs_ids);
1320    }
1321
1322    fn replay_add_observations(
1323        interner: &mut StringInterner,
1324        entities: &mut [Option<StoredEntity>],
1325        search: &mut SearchIndex,
1326        name_table: &mut ShardedNameTable,
1327        name: &str,
1328        observations: &[&str],
1329    ) {
1330        let name_id = interner.intern(name);
1331        let hash = interner.get_hash(name_id);
1332        if let Some(slot) = name_table.lookup(hash, name_id)
1333            && let Some(Some(entity)) = entities.get_mut(slot as usize)
1334        {
1335            for &o in observations {
1336                let oid = interner.intern(o);
1337                if !entity.observations.contains(&oid) {
1338                    entity.observations.push(oid);
1339                }
1340            }
1341            search.remove_entity(slot);
1342            search.index_entity(
1343                interner,
1344                slot,
1345                entity.name,
1346                entity.entity_type,
1347                &entity.observations,
1348            );
1349        }
1350    }
1351
1352    fn replay_delete_entity(
1353        interner: &mut StringInterner,
1354        entities: &mut [Option<StoredEntity>],
1355        rels: &mut Vec<StoredRelation>,
1356        search: &mut SearchIndex,
1357        name_table: &mut ShardedNameTable,
1358        name: &str,
1359    ) {
1360        let name_id = interner.intern(name);
1361        let hash = interner.get_hash(name_id);
1362        if let Some(slot) = name_table.lookup(hash, name_id)
1363            && let Some(Some(_)) = entities.get(slot as usize)
1364        {
1365            entities[slot as usize] = None;
1366            search.remove_entity(slot);
1367            name_table.remove(&*interner, hash, name_id);
1368        }
1369        rels.retain(|r| r.from != name_id && r.to != name_id);
1370    }
1371
1372    fn replay_delete_observations(
1373        interner: &mut StringInterner,
1374        entities: &mut [Option<StoredEntity>],
1375        search: &mut SearchIndex,
1376        name_table: &mut ShardedNameTable,
1377        name: &str,
1378        observations: &[&str],
1379    ) {
1380        let name_id = interner.intern(name);
1381        let hash = interner.get_hash(name_id);
1382        if let Some(slot) = name_table.lookup(hash, name_id)
1383            && let Some(Some(entity)) = entities.get_mut(slot as usize)
1384        {
1385            let remove_ids: Vec<StrId> = observations.iter().map(|o| interner.intern(o)).collect();
1386            entity.observations.retain(|o| !remove_ids.contains(o));
1387            search.remove_entity(slot);
1388            search.index_entity(
1389                interner,
1390                slot,
1391                entity.name,
1392                entity.entity_type,
1393                &entity.observations,
1394            );
1395        }
1396    }
1397
1398    // -----------------------------------------------------------------------
1399    // Public API
1400    // -----------------------------------------------------------------------
1401
1402    pub const fn interner(&self) -> &StringInterner {
1403        &self.interner
1404    }
1405
1406    /// Return a single entity by exact name match.
1407    pub fn get_entity(&self, name: &str) -> Option<Entity> {
1408        let name_id = self.interner.get_optional(name)?;
1409        let hash = self.interner.get_hash(name_id);
1410        let slot = self.name_table.lookup(hash, name_id)?;
1411        let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
1412        if !stored.is_live() {
1413            return None;
1414        }
1415        Some(self.entity_to_output(stored))
1416    }
1417
1418    /// Return aggregate statistics about the graph.
1419    pub fn graph_stats(&self) -> serde_json::Value {
1420        let live_entities = self
1421            .entity_slots
1422            .iter()
1423            .filter(|s| s.as_ref().is_some_and(|e| e.is_live()))
1424            .count();
1425        let total_relations = self.relations.len();
1426        let index_entries = self.search.len();
1427        let total_obs: usize = self
1428            .entity_slots
1429            .iter()
1430            .filter_map(|s| s.as_ref())
1431            .filter(|e| e.is_live())
1432            .map(|e| e.observations.len())
1433            .sum();
1434
1435        serde_json::json!({
1436            "entities": live_entities,
1437            "relations": total_relations,
1438            "totalObservations": total_obs,
1439            "searchIndexEntries": index_entries,
1440            "internedStrings": self.interner.len(),
1441            "internedBytes": self.interner.total_bytes(),
1442        })
1443    }
1444
1445    /// Search relations by optional filters: `from`, `to`, `relationType`.
1446    /// Any filter that is absent matches everything. A filter value that does
1447    /// not exist in the graph returns empty results.
1448    pub fn search_relations(&self, from: Option<&str>, to: Option<&str>, rtype: Option<&str>) -> Vec<Relation> {
1449        let from_id = match from {
1450            Some(f) => match self.interner.get_optional(f) {
1451                Some(id) => Some(id),
1452                None => return Vec::new(),
1453            },
1454            None => None,
1455        };
1456        let to_id = match to {
1457            Some(t) => match self.interner.get_optional(t) {
1458                Some(id) => Some(id),
1459                None => return Vec::new(),
1460            },
1461            None => None,
1462        };
1463        let rtype_id = match rtype {
1464            Some(r) => match self.interner.get_optional(r) {
1465                Some(id) => Some(id),
1466                None => return Vec::new(),
1467            },
1468            None => None,
1469        };
1470
1471        self.relations
1472            .iter()
1473            .filter(|r| {
1474                from_id.is_none_or(|f| r.from == f)
1475                    && to_id.is_none_or(|t| r.to == t)
1476                    && rtype_id.is_none_or(|rt| r.relation_type == rt)
1477            })
1478            .map(|r| Relation {
1479                from: self.interner.lookup(r.from).to_string(),
1480                to: self.interner.lookup(r.to).to_string(),
1481                relation_type: self.interner.lookup(r.relation_type).to_string(),
1482            })
1483            .collect()
1484    }
1485
1486    /// BFS shortest-path between two entity names. Returns the sequence of
1487    /// entity names along the path (inclusive of both endpoints).
1488    pub fn find_path(&self, from: &str, to: &str) -> Result<Vec<String>> {
1489        let from_id = self.interner.get_optional(from)
1490            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
1491        let to_id = self.interner.get_optional(to)
1492            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
1493        let hash_from = self.interner.get_hash(from_id);
1494        let hash_to = self.interner.get_hash(to_id);
1495
1496        if self.name_table.lookup(hash_from, from_id).is_none() {
1497            return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
1498        }
1499        if self.name_table.lookup(hash_to, to_id).is_none() {
1500            return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
1501        }
1502        if from_id == to_id {
1503            return Ok(vec![from.to_string()]);
1504        }
1505
1506        // Use incremental adjacency index — O(degree) per hop, no rebuild.
1507        let mut visited: AHashSet<StrId> = AHashSet::new();
1508        let mut parent: AHashMap<StrId, StrId> = AHashMap::new();
1509        let mut queue: VecDeque<StrId> = VecDeque::new();
1510
1511        visited.insert(from_id);
1512        queue.push_back(from_id);
1513
1514        while let Some(current) = queue.pop_front() {
1515            if current == to_id {
1516                break;
1517            }
1518
1519            if let Some(neighbors) = self.adjacency.get(&current) {
1520                for &(neighbor, _) in neighbors {
1521                    if visited.insert(neighbor) {
1522                        parent.insert(neighbor, current);
1523                        queue.push_back(neighbor);
1524                    }
1525                }
1526            }
1527        }
1528
1529        if !parent.contains_key(&to_id) && from_id != to_id {
1530            return Err(MCSError::MemoryError(format!(
1531                "No path found between '{from}' and '{to}'"
1532            )));
1533        }
1534
1535        // Reconstruct path
1536        let mut path: Vec<String> = Vec::new();
1537        let mut cur = to_id;
1538        loop {
1539            path.push(self.interner.lookup(cur).to_string());
1540            if cur == from_id {
1541                break;
1542            }
1543            cur = *parent.get(&cur).ok_or_else(|| {
1544                MCSError::MemoryError("Path reconstruction failed".into())
1545            })?;
1546        }
1547        path.reverse();
1548        Ok(path)
1549    }
1550
1551    /// Rewrite the binary log from the current in-memory state.
1552    /// After compaction the log contains only the minimal set of records
1553    /// needed to reconstruct the graph (all creates, no deletes).
1554    /// Crash-safe: writes to a temp file, then atomically renames (C3).
1555    pub fn compact(&mut self) -> Result<()> {
1556        // 1. Collect current state as create-records
1557        let mut create_entities: Vec<Entity> = Vec::new();
1558        let mut create_relations: Vec<Relation> = Vec::new();
1559
1560        for slot in &self.entity_slots {
1561            if let Some(stored) = slot.as_ref().filter(|e| e.is_live()) {
1562                create_entities.push(self.entity_to_output(stored));
1563            }
1564        }
1565        for rel in &self.relations {
1566            create_relations.push(Relation {
1567                from: self.interner.lookup(rel.from).to_string(),
1568                to: self.interner.lookup(rel.to).to_string(),
1569                relation_type: self.interner.lookup(rel.relation_type).to_string(),
1570            });
1571        }
1572
1573        // 2. Write to a temp file first.
1574        //    Remove any stale temp left by a previously-interrupted compact:
1575        //    `BinaryStore::new` opens in append mode and only writes the MAGIC
1576        //    header when the file does not already exist, so appending to a
1577        //    leftover temp would produce a duplicated, header-corrupted log
1578        //    once renamed over the real one (C1).
1579        let tmp_path = self.store.path().with_extension("tmp");
1580        if let Err(e) = std::fs::remove_file(&tmp_path)
1581            && e.kind() != std::io::ErrorKind::NotFound
1582        {
1583            return Err(MCSError::IoError(e));
1584        }
1585        let mut tmp_store = BinaryStore::new(&tmp_path).map_err(MCSError::IoError)?;
1586        for entity in &create_entities {
1587            let mut buf = Vec::new();
1588            store_enc::encode_create_entity(&mut buf, &entity.name, &entity.entity_type, &entity.observations)
1589                .map_err(MCSError::IoError)?;
1590            tmp_store.write_record(RecordKind::CreateEntity, &buf).map_err(MCSError::IoError)?;
1591        }
1592        for relation in &create_relations {
1593            let mut buf = Vec::new();
1594            store_enc::encode_create_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
1595                .map_err(MCSError::IoError)?;
1596            tmp_store.write_record(RecordKind::CreateRelation, &buf).map_err(MCSError::IoError)?;
1597        }
1598        tmp_store.flush_and_sync().map_err(MCSError::IoError)?;
1599        drop(tmp_store);
1600
1601        // 3. Atomically rename over the original (atomic on POSIX), then fsync
1602        //    the containing directory so the rename itself is durable across a
1603        //    crash — content swap is atomic, but the directory entry update is
1604        //    not durable until the dir is synced (C2).
1605        std::fs::rename(&tmp_path, self.store.path()).map_err(MCSError::IoError)?;
1606        sync_parent_dir(self.store.path()).map_err(MCSError::IoError)?;
1607
1608        // 4. Rebuild the entire in-memory graph from the compacted log (M1/M2).
1609        //    Replaying into fresh structures reclaims the interner arena (stale
1610        //    strings from deleted/edited entities), tombstoned entity slots, and
1611        //    stale search-index entries — none of which the old reopen-only path
1612        //    reclaimed.
1613        let path = self.store.path().clone();
1614        *self = KnowledgeGraph::new(&path).map_err(MCSError::IoError)?;
1615
1616        Ok(())
1617    }
1618
1619    // ---- Public API with write-ahead log (C1) and error propagation ----
1620
1621    pub fn create_entities(&mut self, entities: &[Entity]) -> Result<Vec<Entity>> {
1622        // Validate up front so an invalid entity never produces partial writes.
1623        for entity in entities {
1624            if entity.name.is_empty() {
1625                return Err(MCSError::InvalidParams(
1626                    "Entity name must not be empty".into(),
1627                ));
1628            }
1629        }
1630        let mut created = Vec::new();
1631        for entity in entities {
1632            // Check dedup before writing (using non-interning lookup)
1633            let existing = self.interner.get_optional(&entity.name)
1634                .and_then(|id| {
1635                    let hash = self.interner.get_hash(id);
1636                    self.name_table.lookup(hash, id)
1637                });
1638            if existing.is_some() {
1639                continue;
1640            }
1641            // Write-ahead: encode and log before mutating state
1642            let mut buf = Vec::new();
1643            store_enc::encode_create_entity(&mut buf, &entity.name, &entity.entity_type, &entity.observations)
1644                .map_err(MCSError::IoError)?;
1645            self.store.write_record(RecordKind::CreateEntity, &buf)
1646                .map_err(MCSError::IoError)?;
1647
1648            let name_id = self.interner.intern(&entity.name);
1649            let hash = self.interner.get_hash(name_id);
1650            let type_id = self.interner.intern(&entity.entity_type);
1651            let obs_ids: Vec<StrId> = entity
1652                .observations
1653                .iter()
1654                .map(|o| self.interner.intern(o))
1655                .collect();
1656            // Reuse a tombstoned slot if one is free (M2); its old search-index
1657            // entries were cleared on delete, so the slot starts clean.
1658            let reused = self.free_slots.pop();
1659            let slot = reused.unwrap_or(self.entity_slots.len() as u32);
1660            self.search
1661                .index_entity(&mut self.interner, slot, name_id, type_id, &obs_ids);
1662            let stored = Some(StoredEntity {
1663                state: ENTITY_SLOT_LIVE,
1664                name: name_id,
1665                entity_type: type_id,
1666                observations: obs_ids,
1667            });
1668            match reused {
1669                Some(s) => self.entity_slots[s as usize] = stored,
1670                None => self.entity_slots.push(stored),
1671            }
1672            self.name_table.insert(&self.interner, hash, name_id, slot);
1673            created.push(Entity {
1674                name: entity.name.clone(),
1675                entity_type: entity.entity_type.clone(),
1676                observations: entity.observations.clone(),
1677            });
1678        }
1679        Ok(created)
1680    }
1681
1682    pub fn create_relations(&mut self, relations: &[Relation]) -> Result<Vec<Relation>> {
1683        // Validate up front so an invalid relation never produces partial writes.
1684        for relation in relations {
1685            if relation.from.is_empty() || relation.to.is_empty() {
1686                return Err(MCSError::InvalidParams(
1687                    "Relation endpoints must not be empty".into(),
1688                ));
1689            }
1690        }
1691        let mut created = Vec::new();
1692        // Build a dedup set for O(1) duplicate checks (P5)
1693        let mut rel_set: AHashSet<(StrId, StrId, StrId)> = AHashSet::new();
1694        for rel in &self.relations {
1695            rel_set.insert((rel.from, rel.to, rel.relation_type));
1696        }
1697        for relation in relations {
1698            let from_id = self.interner.intern(&relation.from);
1699            let to_id = self.interner.intern(&relation.to);
1700            let type_id = self.interner.intern(&relation.relation_type);
1701            if !rel_set.insert((from_id, to_id, type_id)) {
1702                continue;
1703            }
1704            // Write-ahead: log before mutation
1705            let mut buf = Vec::new();
1706            store_enc::encode_create_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
1707                .map_err(MCSError::IoError)?;
1708            self.store.write_record(RecordKind::CreateRelation, &buf)
1709                .map_err(MCSError::IoError)?;
1710
1711            self.relations.push(StoredRelation {
1712                from: from_id,
1713                to: to_id,
1714                relation_type: type_id,
1715            });
1716            self.adjacency.entry(from_id).or_default().push((to_id, type_id));
1717            self.adjacency.entry(to_id).or_default().push((from_id, type_id));
1718            created.push(Relation {
1719                from: relation.from.clone(),
1720                to: relation.to.clone(),
1721                relation_type: relation.relation_type.clone(),
1722            });
1723        }
1724        Ok(created)
1725    }
1726
1727    pub fn add_observations(&mut self, entity_name: &str, contents: &[String]) -> Result<Vec<String>> {
1728        let name_id = self.interner.get_optional(entity_name)
1729            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1730        let hash = self.interner.get_hash(name_id);
1731        let slot = self
1732            .name_table
1733            .lookup(hash, name_id)
1734            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1735        // Snapshot the current observations so we can compute the deduplicated
1736        // additions *without* mutating in-memory state yet.
1737        let existing: AHashSet<StrId> = self
1738            .entity_slots
1739            .get(slot as usize)
1740            .and_then(|e| e.as_ref())
1741            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?
1742            .observations
1743            .iter()
1744            .copied()
1745            .collect();
1746
1747        // Deduplicate against existing observations *and* within this batch, so
1748        // the live result matches what replay (which dedups one-by-one) rebuilds.
1749        let mut added = Vec::new();
1750        let mut interned_added = Vec::new();
1751        let mut seen: AHashSet<StrId> = AHashSet::new();
1752        for content in contents {
1753            let cid = self.interner.intern(content);
1754            if existing.contains(&cid) || !seen.insert(cid) {
1755                continue;
1756            }
1757            interned_added.push(cid);
1758            added.push(content.clone());
1759        }
1760        if added.is_empty() {
1761            return Ok(added);
1762        }
1763
1764        // Write-ahead: the record must hit the log *before* any in-memory
1765        // mutation, so a failed write leaves memory and disk in agreement (C3).
1766        let mut buf = Vec::new();
1767        store_enc::encode_add_observations(&mut buf, entity_name, &added)
1768            .map_err(MCSError::IoError)?;
1769        self.store.write_record(RecordKind::AddObservations, &buf)
1770            .map_err(MCSError::IoError)?;
1771
1772        // Logged — now apply to in-memory state.
1773        let stored = self
1774            .entity_slots
1775            .get_mut(slot as usize)
1776            .and_then(|e| e.as_mut())
1777            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1778        stored.observations.extend_from_slice(&interned_added);
1779
1780        // Incrementally index only the new observation tokens (P3) — no
1781        // full remove + re-index of the whole entity.
1782        self.search
1783            .index_additional(&mut self.interner, slot, &interned_added);
1784        Ok(added)
1785    }
1786
1787    pub fn delete_entities(&mut self, entity_names: &[String]) -> Result<()> {
1788        let mut deleted_names = Vec::new();
1789        for name in entity_names {
1790            let name_id_opt = self.interner.get_optional(name);
1791            if let Some(name_id) = name_id_opt {
1792                let hash = self.interner.get_hash(name_id);
1793                if let Some(slot) = self.name_table.lookup(hash, name_id)
1794                    && let Some(Some(_)) = self.entity_slots.get(slot as usize)
1795                {
1796                    // Write-ahead: log before mutation
1797                    let mut buf = Vec::new();
1798                    store_enc::encode_delete_entity(&mut buf, name)
1799                        .map_err(MCSError::IoError)?;
1800                    self.store.write_record(RecordKind::DeleteEntity, &buf)
1801                        .map_err(MCSError::IoError)?;
1802
1803                    self.entity_slots[slot as usize] = None;
1804                    self.free_slots.push(slot);
1805                    self.search.remove_entity(slot);
1806                    self.name_table.remove(&self.interner, hash, name_id);
1807                    deleted_names.push(name.clone());
1808                }
1809            }
1810        }
1811        if !deleted_names.is_empty() {
1812            // Use a AHashSet for O(1) retain checks (P5)
1813            let deleted_ids: AHashSet<StrId> = deleted_names.iter()
1814                .map(|n| self.interner.intern(n))
1815                .collect();
1816            self.relations
1817                .retain(|r| !deleted_ids.contains(&r.from) && !deleted_ids.contains(&r.to));
1818            // Clean adjacency index
1819            for id in &deleted_ids {
1820                self.adjacency.remove(id);
1821                // Remove references from other entities' adjacency lists
1822                for list in self.adjacency.values_mut() {
1823                    list.retain(|(to, _)| !deleted_ids.contains(to));
1824                }
1825            }
1826        }
1827        Ok(())
1828    }
1829
1830    pub fn delete_observations(&mut self, entity_name: &str, observations: &[String]) -> Result<()> {
1831        let name_id = self.interner.get_optional(entity_name)
1832            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1833        let hash = self.interner.get_hash(name_id);
1834        let slot = self
1835            .name_table
1836            .lookup(hash, name_id)
1837            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1838        // Confirm the slot is live before logging.
1839        self.entity_slots
1840            .get(slot as usize)
1841            .and_then(|e| e.as_ref())
1842            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1843        let remove_ids: AHashSet<StrId> = observations.iter().map(|o| self.interner.intern(o)).collect();
1844
1845        // Write-ahead: log before touching in-memory state (C3).
1846        let mut buf = Vec::new();
1847        store_enc::encode_delete_observations(&mut buf, entity_name, observations)
1848            .map_err(MCSError::IoError)?;
1849        self.store.write_record(RecordKind::DeleteObservations, &buf)
1850            .map_err(MCSError::IoError)?;
1851
1852        // Logged — now apply.
1853        let stored = self
1854            .entity_slots
1855            .get_mut(slot as usize)
1856            .and_then(|e| e.as_mut())
1857            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1858        stored.observations.retain(|o| !remove_ids.contains(o));
1859        self.search.remove_entity(slot);
1860        self.search
1861            .index_entity(&mut self.interner, slot, stored.name, stored.entity_type, &stored.observations);
1862        Ok(())
1863    }
1864
1865    pub fn delete_relations(&mut self, relations: &[Relation]) -> Result<()> {
1866        // Collect targets into a AHashSet for O(1) retain checks (P5)
1867        let rels: AHashSet<(StrId, StrId, StrId)> = relations
1868            .iter()
1869            .map(|r| {
1870                (
1871                    self.interner.intern(&r.from),
1872                    self.interner.intern(&r.to),
1873                    self.interner.intern(&r.relation_type),
1874                )
1875            })
1876            .collect();
1877        // Write-ahead: log every deletion before mutating in-memory state (C3),
1878        // so a failed write can't leave memory ahead of the log.
1879        for relation in relations {
1880            let mut buf = Vec::new();
1881            store_enc::encode_delete_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
1882                .map_err(MCSError::IoError)?;
1883            self.store.write_record(RecordKind::DeleteRelation, &buf)
1884                .map_err(MCSError::IoError)?;
1885        }
1886        self.relations
1887            .retain(|r| !rels.contains(&(r.from, r.to, r.relation_type)));
1888        // Clean adjacency index
1889        for (f, t, rt) in &rels {
1890            if let Some(edges) = self.adjacency.get_mut(f) {
1891                edges.retain(|(to, rtype)| to != t || rtype != rt);
1892                if edges.is_empty() {
1893                    self.adjacency.remove(f);
1894                }
1895            }
1896            if let Some(edges) = self.adjacency.get_mut(t) {
1897                edges.retain(|(to, rtype)| to != f || rtype != rt);
1898                if edges.is_empty() {
1899                    self.adjacency.remove(t);
1900                }
1901            }
1902        }
1903        Ok(())
1904    }
1905
1906    pub fn read_graph(&self) -> KnowledgeGraphOut {
1907        self.read_graph_view().to_owned_out()
1908    }
1909
1910    /// Borrowing, allocation-light view of the full graph (M6). Serializing it
1911    /// streams interned `&str` directly instead of materializing a `String`
1912    /// per name/type/observation.
1913    pub fn read_graph_view(&self) -> GraphView<'_> {
1914        let entities: Vec<&StoredEntity> = self
1915            .entity_slots
1916            .iter()
1917            .filter_map(|s| s.as_ref().filter(|e| e.is_live()))
1918            .collect();
1919        let relations: Vec<&StoredRelation> = self.relations.iter().collect();
1920        GraphView { kg: self, entities, relations }
1921    }
1922
1923    /// Relevance-ranked substring search returning all matches (no pagination).
1924    /// Equivalent to `search_nodes_filtered(query, None, 0, usize::MAX)`.
1925    pub fn search_nodes(&self, query: &str) -> KnowledgeGraphOut {
1926        self.search_nodes_filtered(query, None, 0, usize::MAX)
1927    }
1928
1929    pub fn open_nodes(&self, names: &[String]) -> KnowledgeGraphOut {
1930        self.open_nodes_view(names).to_owned_out()
1931    }
1932
1933    /// Borrowing view variant of [`open_nodes`] (M6).
1934    pub fn open_nodes_view(&self, names: &[String]) -> GraphView<'_> {
1935        let name_ids: AHashSet<StrId> = names.iter()
1936            .filter_map(|n| self.interner.get_optional(n))
1937            .collect();
1938        let entities: Vec<&StoredEntity> = self
1939            .entity_slots
1940            .iter()
1941            .filter_map(|s| {
1942                s.as_ref()
1943                    .filter(|stored| stored.is_live() && name_ids.contains(&stored.name))
1944            })
1945            .collect();
1946        let matched_names: AHashSet<StrId> = entities.iter().map(|e| e.name).collect();
1947        let relations: Vec<&StoredRelation> = self
1948            .relations
1949            .iter()
1950            .filter(|r| matched_names.contains(&r.from) || matched_names.contains(&r.to))
1951            .collect();
1952        GraphView { kg: self, entities, relations }
1953    }
1954
1955    // -----------------------------------------------------------------------
1956    // Internal helpers
1957    // -----------------------------------------------------------------------
1958
1959    fn entity_to_output(&self, stored: &StoredEntity) -> Entity {
1960        Entity {
1961            name: self.interner.lookup(stored.name).to_string(),
1962            entity_type: self.interner.lookup(stored.entity_type).to_string(),
1963            observations: stored
1964                .observations
1965                .iter()
1966                .map(|o| self.interner.lookup(*o).to_string())
1967                .collect(),
1968        }
1969    }
1970
1971    #[inline]
1972    fn relation_to_output(&self, r: &StoredRelation) -> Relation {
1973        Relation {
1974            from: self.interner.lookup(r.from).to_string(),
1975            to: self.interner.lookup(r.to).to_string(),
1976            relation_type: self.interner.lookup(r.relation_type).to_string(),
1977        }
1978    }
1979
1980    /// Resolve a name to its live entity slot, or `None` if absent/deleted.
1981    fn lookup_live_slot(&self, name: &str) -> Option<u32> {
1982        let name_id = self.interner.get_optional(name)?;
1983        let hash = self.interner.get_hash(name_id);
1984        let slot = self.name_table.lookup(hash, name_id)?;
1985        let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
1986        stored.is_live().then_some(slot)
1987    }
1988
1989    /// Materialize a live entity from its interned name id.
1990    fn entity_by_name_id(&self, name_id: StrId) -> Option<Entity> {
1991        let hash = self.interner.get_hash(name_id);
1992        let slot = self.name_table.lookup(hash, name_id)?;
1993        let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
1994        stored.is_live().then(|| self.entity_to_output(stored))
1995    }
1996
1997    /// Tally distinct entity types and their live-entity counts, ranked by
1998    /// count descending (ties broken by name). One linear pass over the dense
1999    /// slot vec; only the final names are allocated.
2000    pub fn entity_type_counts(&self) -> Vec<(String, usize)> {
2001        let mut counts: AHashMap<StrId, usize> = AHashMap::new();
2002        for st in self
2003            .entity_slots
2004            .iter()
2005            .filter_map(|s| s.as_ref())
2006            .filter(|e| e.is_live())
2007        {
2008            *counts.entry(st.entity_type).or_insert(0) += 1;
2009        }
2010        self.rank_counts(counts)
2011    }
2012
2013    /// Tally distinct relation types and their counts, ranked by count desc.
2014    pub fn relation_type_counts(&self) -> Vec<(String, usize)> {
2015        let mut counts: AHashMap<StrId, usize> = AHashMap::new();
2016        for r in &self.relations {
2017            *counts.entry(r.relation_type).or_insert(0) += 1;
2018        }
2019        self.rank_counts(counts)
2020    }
2021
2022    fn rank_counts(&self, counts: AHashMap<StrId, usize>) -> Vec<(String, usize)> {
2023        let mut out: Vec<(String, usize)> = counts
2024            .into_iter()
2025            .map(|(id, c)| (self.interner.lookup(id).to_string(), c))
2026            .collect();
2027        out.sort_unstable_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
2028        out
2029    }
2030
2031    /// Relevance-ranked, optionally type-filtered, paginated node search.
2032    /// Entities come back best-match-first (see [`SearchIndex::search_ranked`]).
2033    /// Relations touching any returned entity (either endpoint) are included.
2034    pub fn search_nodes_filtered(
2035        &self,
2036        query: &str,
2037        entity_type: Option<&str>,
2038        offset: usize,
2039        limit: usize,
2040    ) -> KnowledgeGraphOut {
2041        self.search_nodes_view(query, entity_type, offset, limit).to_owned_out()
2042    }
2043
2044    /// Borrowing view variant of [`search_nodes_filtered`] (M6).
2045    pub fn search_nodes_view(
2046        &self,
2047        query: &str,
2048        entity_type: Option<&str>,
2049        offset: usize,
2050        limit: usize,
2051    ) -> GraphView<'_> {
2052        let type_id = match entity_type {
2053            Some(t) => match self.interner.get_optional(t) {
2054                Some(id) => Some(id),
2055                None => return GraphView { kg: self, entities: Vec::new(), relations: Vec::new() },
2056            },
2057            None => None,
2058        };
2059
2060        let ranked = self.search.search_ranked(query, &self.interner);
2061        let mut selected: AHashSet<StrId> = AHashSet::new();
2062        let mut entities: Vec<&StoredEntity> = Vec::new();
2063        let mut skipped = 0usize;
2064        for (slot, _score) in ranked {
2065            let Some(st) = self
2066                .entity_slots
2067                .get(slot as usize)
2068                .and_then(|s| s.as_ref())
2069                .filter(|e| e.is_live())
2070            else {
2071                continue;
2072            };
2073            if type_id.is_some_and(|tid| st.entity_type != tid) {
2074                continue;
2075            }
2076            if skipped < offset {
2077                skipped += 1;
2078                continue;
2079            }
2080            if entities.len() >= limit {
2081                break;
2082            }
2083            selected.insert(st.name);
2084            entities.push(st);
2085        }
2086
2087        let relations: Vec<&StoredRelation> = self
2088            .relations
2089            .iter()
2090            .filter(|r| selected.contains(&r.from) || selected.contains(&r.to))
2091            .collect();
2092        GraphView { kg: self, entities, relations }
2093    }
2094
2095    /// Type-filtered, paginated view of the whole graph. Unlike [`read_graph`],
2096    /// relations are restricted to those whose **both** endpoints fall in the
2097    /// returned entity page, so the slice is internally consistent.
2098    pub fn read_graph_filtered(
2099        &self,
2100        entity_type: Option<&str>,
2101        offset: usize,
2102        limit: usize,
2103    ) -> KnowledgeGraphOut {
2104        self.read_graph_filtered_view(entity_type, offset, limit).to_owned_out()
2105    }
2106
2107    /// Borrowing view variant of [`read_graph_filtered`] (M6).
2108    pub fn read_graph_filtered_view(
2109        &self,
2110        entity_type: Option<&str>,
2111        offset: usize,
2112        limit: usize,
2113    ) -> GraphView<'_> {
2114        let type_id = match entity_type {
2115            Some(t) => match self.interner.get_optional(t) {
2116                Some(id) => Some(id),
2117                None => return GraphView { kg: self, entities: Vec::new(), relations: Vec::new() },
2118            },
2119            None => None,
2120        };
2121
2122        let mut selected: AHashSet<StrId> = AHashSet::new();
2123        let mut entities: Vec<&StoredEntity> = Vec::new();
2124        let mut skipped = 0usize;
2125        for st in self
2126            .entity_slots
2127            .iter()
2128            .filter_map(|s| s.as_ref())
2129            .filter(|e| e.is_live())
2130        {
2131            if type_id.is_some_and(|tid| st.entity_type != tid) {
2132                continue;
2133            }
2134            if skipped < offset {
2135                skipped += 1;
2136                continue;
2137            }
2138            if entities.len() >= limit {
2139                break;
2140            }
2141            selected.insert(st.name);
2142            entities.push(st);
2143        }
2144
2145        let relations: Vec<&StoredRelation> = self
2146            .relations
2147            .iter()
2148            .filter(|r| selected.contains(&r.from) && selected.contains(&r.to))
2149            .collect();
2150        GraphView { kg: self, entities, relations }
2151    }
2152
2153    /// Neighborhood expansion around `name` out to `depth` hops, following
2154    /// edges in the requested [`Direction`] and (optionally) of one relation
2155    /// type. Returns the origin plus reached entities, and every relation
2156    /// (passing the type filter) whose endpoints are both inside that set.
2157    ///
2158    /// `depth == 1` (the common case) is a single linear pass over the flat
2159    /// relation vec; deeper queries build an adjacency map once (O(E)) and BFS.
2160    pub fn neighbors(
2161        &self,
2162        name: &str,
2163        direction: Direction,
2164        rtype: Option<&str>,
2165        depth: u32,
2166    ) -> Result<KnowledgeGraphOut> {
2167        self.lookup_live_slot(name)
2168            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
2169        // Safe: lookup_live_slot succeeded, so the name is interned.
2170        let start = self.interner.get_optional(name).unwrap();
2171
2172        // An unknown relation-type filter can match nothing: return just origin.
2173        let rtype_id = match rtype {
2174            Some(r) => match self.interner.get_optional(r) {
2175                Some(id) => Some(id),
2176                None => {
2177                    let entities = self.entity_by_name_id(start).into_iter().collect();
2178                    return Ok(KnowledgeGraphOut { entities, relations: Vec::new() });
2179                }
2180            },
2181            None => None,
2182        };
2183
2184        let mut visited: AHashSet<StrId> = AHashSet::new();
2185        visited.insert(start);
2186
2187        let type_ok = |r: &StoredRelation| rtype_id.is_none_or(|rt| r.relation_type == rt);
2188
2189        if depth == 1 {
2190            for r in self.relations.iter().filter(|r| type_ok(r)) {
2191                match direction {
2192                    Direction::Out => {
2193                        if r.from == start {
2194                            visited.insert(r.to);
2195                        }
2196                    }
2197                    Direction::In => {
2198                        if r.to == start {
2199                            visited.insert(r.from);
2200                        }
2201                    }
2202                    Direction::Both => {
2203                        if r.from == start {
2204                            visited.insert(r.to);
2205                        } else if r.to == start {
2206                            visited.insert(r.from);
2207                        }
2208                    }
2209                }
2210            }
2211        } else if depth >= 2 {
2212            // Build a direction-aware adjacency map once, then BFS.
2213            // For Direction::Both we use the incremental adjacency index;
2214            // for Direction::Out/In we filter relations directly.
2215            let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
2216            match direction {
2217                Direction::Both => {
2218                    for (&node, edges) in &self.adjacency {
2219                        for &(nb, rt) in edges {
2220                            if rtype_id.is_none_or(|rt_id| rt == rt_id) {
2221                                adj.entry(node).or_default().push(nb);
2222                            }
2223                        }
2224                    }
2225                }
2226                Direction::Out | Direction::In => {
2227                    for r in self.relations.iter().filter(|r| type_ok(r)) {
2228                        match direction {
2229                            Direction::Out => adj.entry(r.from).or_default().push(r.to),
2230                            Direction::In => adj.entry(r.to).or_default().push(r.from),
2231                            _ => unreachable!(),
2232                        }
2233                    }
2234                }
2235            }
2236            let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
2237            queue.push_back((start, 0));
2238            while let Some((node, d)) = queue.pop_front() {
2239                if d >= depth {
2240                    continue;
2241                }
2242                if let Some(nbrs) = adj.get(&node) {
2243                    for &nb in nbrs {
2244                        if visited.insert(nb) {
2245                            queue.push_back((nb, d + 1));
2246                        }
2247                    }
2248                }
2249            }
2250        }
2251
2252        let mut entities = Vec::with_capacity(visited.len());
2253        for &nid in &visited {
2254            if let Some(e) = self.entity_by_name_id(nid) {
2255                entities.push(e);
2256            }
2257        }
2258        let relations = self
2259            .relations
2260            .iter()
2261            .filter(|r| type_ok(r) && visited.contains(&r.from) && visited.contains(&r.to))
2262            .map(|r| self.relation_to_output(r))
2263            .collect();
2264        Ok(KnowledgeGraphOut { entities, relations })
2265    }
2266
2267    /// One-shot context bundle for a single entity: the entity itself, every
2268    /// incident relation, its distinct neighbor names, and its degree. Saves an
2269    /// agent the get_entity + two search_relations round-trips.
2270    pub fn describe_entity(&self, name: &str) -> Result<serde_json::Value> {
2271        let name_id = self
2272            .interner
2273            .get_optional(name)
2274            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
2275        let entity = self
2276            .entity_by_name_id(name_id)
2277            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
2278
2279        let mut incident: Vec<Relation> = Vec::new();
2280        let mut neighbor_seen: AHashSet<StrId> = AHashSet::new();
2281        let mut neighbors: Vec<&str> = Vec::new();
2282        for r in &self.relations {
2283            if r.from == name_id || r.to == name_id {
2284                incident.push(self.relation_to_output(r));
2285                let other = if r.from == name_id { r.to } else { r.from };
2286                if other != name_id && neighbor_seen.insert(other) {
2287                    neighbors.push(self.interner.lookup(other));
2288                }
2289            }
2290        }
2291
2292        Ok(serde_json::json!({
2293            "entity": entity,
2294            "relations": incident,
2295            "neighbors": neighbors,
2296            "degree": incident.len(),
2297        }))
2298    }
2299
2300    /// Create-or-merge a batch of entities idempotently. Missing entities are
2301    /// created; existing ones keep their type and gain any new observations
2302    /// (deduplicated). Returns a per-entity outcome. The caller is responsible
2303    /// for flushing — every underlying op is already write-ahead logged.
2304    pub fn upsert_entities(&mut self, entities: &[Entity]) -> Result<Vec<serde_json::Value>> {
2305        for e in entities {
2306            if e.name.is_empty() {
2307                return Err(MCSError::InvalidParams(
2308                    "Entity name must not be empty".into(),
2309                ));
2310            }
2311        }
2312        let mut out = Vec::with_capacity(entities.len());
2313        for e in entities {
2314            if self.lookup_live_slot(&e.name).is_some() {
2315                let added = self.add_observations(&e.name, &e.observations)?;
2316                out.push(serde_json::json!({
2317                    "name": e.name,
2318                    "created": false,
2319                    "addedObservations": added,
2320                }));
2321            } else {
2322                let created = self.create_entities(std::slice::from_ref(e))?;
2323                out.push(serde_json::json!({
2324                    "name": e.name,
2325                    "created": !created.is_empty(),
2326                    "addedObservations": e.observations,
2327                }));
2328            }
2329        }
2330        Ok(out)
2331    }
2332
2333    /// Serialize the graph in one of: `json` (read_graph), `mermaid`, `dot`.
2334    pub fn export(&self, format: &str) -> Result<String> {
2335        match format {
2336            "json" => serde_json::to_string(&self.read_graph()).map_err(MCSError::JsonError),
2337            "mermaid" => Ok(self.export_mermaid()),
2338            "dot" => Ok(self.export_dot()),
2339            other => Err(MCSError::InvalidParams(format!(
2340                "Unknown export format '{other}' (expected json|mermaid|dot)"
2341            ))),
2342        }
2343    }
2344
2345    /// Assign each live entity a stable `n{k}` node id for diagram output.
2346    fn diagram_node_ids(&self) -> (AHashMap<StrId, usize>, Vec<(usize, StrId)>) {
2347        let mut ids: AHashMap<StrId, usize> = AHashMap::new();
2348        let mut order: Vec<(usize, StrId)> = Vec::new();
2349        for st in self
2350            .entity_slots
2351            .iter()
2352            .filter_map(|s| s.as_ref())
2353            .filter(|e| e.is_live())
2354        {
2355            let n = ids.len();
2356            ids.insert(st.name, n);
2357            order.push((n, st.name));
2358        }
2359        (ids, order)
2360    }
2361
2362    fn export_mermaid(&self) -> String {
2363        let (ids, order) = self.diagram_node_ids();
2364        let mut s = String::with_capacity(64 + order.len() * 32 + self.relations.len() * 32);
2365        s.push_str("graph LR\n");
2366        for (n, name_id) in &order {
2367            let label = sanitize_label(self.interner.lookup(*name_id));
2368            s.push_str(&format!("  n{n}[\"{label}\"]\n"));
2369        }
2370        for r in &self.relations {
2371            if let (Some(&a), Some(&b)) = (ids.get(&r.from), ids.get(&r.to)) {
2372                let rel = sanitize_label(self.interner.lookup(r.relation_type));
2373                s.push_str(&format!("  n{a} -->|{rel}| n{b}\n"));
2374            }
2375        }
2376        s
2377    }
2378
2379    fn export_dot(&self) -> String {
2380        let (ids, order) = self.diagram_node_ids();
2381        let mut s = String::with_capacity(64 + order.len() * 32 + self.relations.len() * 32);
2382        s.push_str("digraph G {\n");
2383        for (n, name_id) in &order {
2384            let label = sanitize_label(self.interner.lookup(*name_id));
2385            s.push_str(&format!("  n{n} [label=\"{label}\"];\n"));
2386        }
2387        for r in &self.relations {
2388            if let (Some(&a), Some(&b)) = (ids.get(&r.from), ids.get(&r.to)) {
2389                let rel = sanitize_label(self.interner.lookup(r.relation_type));
2390                s.push_str(&format!("  n{a} -> n{b} [label=\"{rel}\"];\n"));
2391            }
2392        }
2393        s.push_str("}\n");
2394        s
2395    }
2396
2397    // ------ High-level productivity tools ------
2398
2399    /// Merge `source` entity into `target` entity. All observations from
2400    /// source are moved to target (deduplicated), all relations involving
2401    /// source are redirected to target (deduplicated), and source is then
2402    /// deleted.
2403    ///
2404    /// The whole merge is **atomic**: every sub-record is written to the log
2405    /// inside a single `TxnBegin`…`TxnCommit` transaction *before* any in-memory
2406    /// mutation. A failed or torn write therefore leaves both memory and the
2407    /// log untouched (an uncommitted transaction is discarded on replay), so the
2408    /// graph can never observe a half-applied merge. Caller flushes.
2409    pub fn merge_entities(&mut self, source: &str, target: &str) -> Result<serde_json::Value> {
2410        if source == target {
2411            return Err(MCSError::InvalidParams(
2412                "Source and target must be different entities".into(),
2413            ));
2414        }
2415        self.lookup_live_slot(source).ok_or_else(|| {
2416            MCSError::InvalidParams(format!("Source entity '{source}' not found"))
2417        })?;
2418        let target_slot = self.lookup_live_slot(target).ok_or_else(|| {
2419            MCSError::InvalidParams(format!("Target entity '{target}' not found"))
2420        })?;
2421
2422        let source_entity = self.get_entity(source).unwrap();
2423        let moved_obs_count = source_entity.observations.len();
2424        let source_id = self.interner.get_optional(source).unwrap();
2425        let target_id = self.interner.get_optional(target).unwrap();
2426
2427        // Observations to move: dedup against target's existing set and within
2428        // the batch (matching what `add_observations` would have done).
2429        let target_existing: AHashSet<StrId> = self.entity_slots[target_slot as usize]
2430            .as_ref()
2431            .unwrap()
2432            .observations
2433            .iter()
2434            .copied()
2435            .collect();
2436        let mut obs_seen: AHashSet<StrId> = AHashSet::new();
2437        let mut obs_to_add: Vec<String> = Vec::new();
2438        for o in &source_entity.observations {
2439            if let Some(oid) = self.interner.get_optional(o)
2440                && !target_existing.contains(&oid)
2441                && obs_seen.insert(oid)
2442            {
2443                obs_to_add.push(o.clone());
2444            }
2445        }
2446
2447        // Relations to redirect: replace source with target, drop self-loops,
2448        // and dedup against existing relations and within the batch.
2449        let existing_rels: AHashSet<(StrId, StrId, StrId)> =
2450            self.relations.iter().map(|r| (r.from, r.to, r.relation_type)).collect();
2451        let mut rel_seen: AHashSet<(StrId, StrId, StrId)> = AHashSet::new();
2452        let mut redirect: Vec<Relation> = Vec::new();
2453        for r in &self.relations {
2454            if r.from != source_id && r.to != source_id {
2455                continue;
2456            }
2457            let new_from = if r.from == source_id { target_id } else { r.from };
2458            let new_to = if r.to == source_id { target_id } else { r.to };
2459            if new_from == new_to {
2460                continue; // self-loop after redirect
2461            }
2462            let key = (new_from, new_to, r.relation_type);
2463            if existing_rels.contains(&key) || !rel_seen.insert(key) {
2464                continue;
2465            }
2466            redirect.push(Relation {
2467                from: self.interner.lookup(new_from).to_string(),
2468                to: self.interner.lookup(new_to).to_string(),
2469                relation_type: self.interner.lookup(r.relation_type).to_string(),
2470            });
2471        }
2472
2473        let added_count = obs_to_add.len();
2474        let redirected = redirect.len() as u32;
2475
2476        // Build every record up front so writing is the only fallible step.
2477        let mut records: Vec<(RecordKind, Vec<u8>)> = Vec::new();
2478        if !obs_to_add.is_empty() {
2479            let mut buf = Vec::new();
2480            store_enc::encode_add_observations(&mut buf, target, &obs_to_add)
2481                .map_err(MCSError::IoError)?;
2482            records.push((RecordKind::AddObservations, buf));
2483        }
2484        for r in &redirect {
2485            let mut buf = Vec::new();
2486            store_enc::encode_create_relation(&mut buf, &r.from, &r.to, &r.relation_type)
2487                .map_err(MCSError::IoError)?;
2488            records.push((RecordKind::CreateRelation, buf));
2489        }
2490        let mut del_buf = Vec::new();
2491        store_enc::encode_delete_entity(&mut del_buf, source).map_err(MCSError::IoError)?;
2492        records.push((RecordKind::DeleteEntity, del_buf));
2493
2494        // Write-ahead, transactionally: begin, all records, commit.
2495        self.store.write_record(RecordKind::TxnBegin, &[]).map_err(MCSError::IoError)?;
2496        for (kind, data) in &records {
2497            self.store.write_record(*kind, data).map_err(MCSError::IoError)?;
2498        }
2499        self.store.write_record(RecordKind::TxnCommit, &[]).map_err(MCSError::IoError)?;
2500
2501        // Logged and committed — now apply to in-memory state (no more logging).
2502        for (kind, data) in &records {
2503            Self::apply_record(
2504                *kind, data, &mut self.interner, &mut self.entity_slots, &mut self.search,
2505                &mut self.name_table, &mut self.relations,
2506            );
2507        }
2508
2509        Ok(serde_json::json!({
2510            "source": source,
2511            "target": target,
2512            "movedObservations": moved_obs_count,
2513            "addedObservations": added_count,
2514            "redirectedRelations": redirected,
2515        }))
2516    }
2517
2518    /// Extract a connected subgraph around one or more seed entity names,
2519    /// expanding out to `depth` hops along all relations (undirected). Returns
2520    /// the set of reached entities and the relations among them.
2521    pub fn extract_subgraph(&self, names: &[String], depth: u32) -> Result<KnowledgeGraphOut> {
2522        if names.is_empty() {
2523            return Ok(KnowledgeGraphOut {
2524                entities: Vec::new(),
2525                relations: Vec::new(),
2526            });
2527        }
2528        // Seed the BFS queue from any names that exist.
2529        let mut visited: AHashSet<StrId> = AHashSet::new();
2530        let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
2531        for name in names {
2532            if let Some(id) = self.interner.get_optional(name)
2533                && visited.insert(id)
2534            {
2535                queue.push_back((id, 0));
2536            }
2537        }
2538        // Build an undirected adjacency map from the incremental index.
2539        let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
2540        for (&node, edges) in &self.adjacency {
2541            let nb: Vec<StrId> = edges.iter().map(|(to, _)| *to).collect();
2542            adj.insert(node, nb);
2543        }
2544        while let Some((node, d)) = queue.pop_front() {
2545            if d >= depth {
2546                continue;
2547            }
2548            if let Some(nbrs) = adj.get(&node) {
2549                for &nb in nbrs {
2550                    if visited.insert(nb) {
2551                        queue.push_back((nb, d + 1));
2552                    }
2553                }
2554            }
2555        }
2556        let mut entities: Vec<Entity> = Vec::with_capacity(visited.len());
2557        for &nid in &visited {
2558            if let Some(e) = self.entity_by_name_id(nid) {
2559                entities.push(e);
2560            }
2561        }
2562        let relations: Vec<Relation> = self
2563            .relations
2564            .iter()
2565            .filter(|r| visited.contains(&r.from) && visited.contains(&r.to))
2566            .map(|r| self.relation_to_output(r))
2567            .collect();
2568        Ok(KnowledgeGraphOut { entities, relations })
2569    }
2570
2571    /// Return full entities for a list of names. Missing names yield `None`.
2572    pub fn batch_get_entities(&self, names: &[String]) -> Vec<Option<Entity>> {
2573        names.iter().map(|n| self.get_entity(n)).collect()
2574    }
2575
2576    /// Recursive DFS helper — collects every simple path from `current` to
2577    /// `target` up to `max_depth` hops, capped at `max_paths` results.
2578    #[allow(clippy::too_many_arguments)]
2579    fn dfs_all_paths(
2580        adj: &AHashMap<StrId, Vec<StrId>>,
2581        current: StrId,
2582        target: StrId,
2583        max_depth: usize,
2584        max_paths: usize,
2585        visited: &mut AHashSet<StrId>,
2586        current_path: &mut Vec<StrId>,
2587        all_paths: &mut Vec<Vec<StrId>>,
2588    ) {
2589        if all_paths.len() >= max_paths {
2590            return;
2591        }
2592        if current == target && current_path.len() > 1 {
2593            all_paths.push(current_path.clone());
2594            return;
2595        }
2596        if current_path.len() > max_depth {
2597            return;
2598        }
2599        if let Some(neighbors) = adj.get(&current) {
2600            for &nb in neighbors {
2601                if visited.insert(nb) {
2602                    current_path.push(nb);
2603                    Self::dfs_all_paths(
2604                        adj, nb, target, max_depth, max_paths, visited, current_path, all_paths,
2605                    );
2606                    current_path.pop();
2607                    visited.remove(&nb);
2608                }
2609            }
2610        }
2611    }
2612
2613    /// Find all simple paths between `from` and `to` up to `max_depth` hops,
2614    /// returning at most `max_paths` results. Paths are found via DFS with
2615    /// backtracking and include both endpoints.
2616    pub fn find_all_paths(
2617        &self,
2618        from: &str,
2619        to: &str,
2620        max_depth: usize,
2621        max_paths: usize,
2622    ) -> Result<Vec<Vec<String>>> {
2623        let from_id = self
2624            .interner
2625            .get_optional(from)
2626            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
2627        let to_id = self
2628            .interner
2629            .get_optional(to)
2630            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
2631        // Verify both are live.
2632        if self.lookup_live_slot(from).is_none() {
2633            return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
2634        }
2635        if self.lookup_live_slot(to).is_none() {
2636            return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
2637        }
2638        if from_id == to_id {
2639            return Ok(vec![vec![from.to_string()]]);
2640        }
2641        // Build undirected adjacency from the incremental index.
2642        let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::with_capacity(self.adjacency.len());
2643        for (&node, edges) in &self.adjacency {
2644            let nbrs: Vec<StrId> = edges.iter().map(|(to, _)| *to).collect();
2645            adj.insert(node, nbrs);
2646        }
2647        let mut all_paths: Vec<Vec<StrId>> = Vec::new();
2648        let mut current_path = Vec::new();
2649        let mut visited: AHashSet<StrId> = AHashSet::new();
2650        visited.insert(from_id);
2651        current_path.push(from_id);
2652        Self::dfs_all_paths(
2653            &adj,
2654            from_id,
2655            to_id,
2656            max_depth,
2657            max_paths,
2658            &mut visited,
2659            &mut current_path,
2660            &mut all_paths,
2661        );
2662        if all_paths.is_empty() {
2663            return Err(MCSError::MemoryError(format!(
2664                "No path found between '{from}' and '{to}'"
2665            )));
2666        }
2667        let result: Vec<Vec<String>> = all_paths
2668            .into_iter()
2669            .map(|path| {
2670                path.into_iter()
2671                    .map(|id| self.interner.lookup(id).to_string())
2672                    .collect()
2673            })
2674            .collect();
2675        Ok(result)
2676    }
2677
2678    // --- Snapshot ---
2679
2680    /// Create a wait-free read snapshot (item 2 in plan).
2681    /// Freezes entity_slots and relations into `Arc<[_]>` and clones the rest.
2682    pub fn snapshot(&self) -> ReadSnapshot {
2683        ReadSnapshot {
2684            interner: self.interner.clone(),
2685            entity_slots: Arc::from_iter(self.entity_slots.iter().cloned()),
2686            free_slots: self.free_slots.clone(),
2687            name_table: self.name_table.clone(),
2688            relations: Arc::from_iter(self.relations.iter().cloned()),
2689            adjacency: self.adjacency.clone(),
2690            search: self.search.clone(),
2691        }
2692    }
2693
2694    // --- Flush & sync ---
2695
2696    /// Flush the `BufWriter` to the kernel buffer (process-crash safe).
2697    pub fn flush(&mut self) -> Result<()> {
2698        self.store.flush().map_err(MCSError::IoError)
2699    }
2700
2701    /// `fsync` the log to disk (OS-crash safe). Called by the background sync
2702    /// thread in [`GraphHandle`]; most callers should use `flush()` instead.
2703    pub fn sync(&mut self) -> Result<()> {
2704        self.store.sync().map_err(MCSError::IoError)
2705    }
2706
2707    /// Flush + fsync (legacy; prefer [`flush`](Self::flush) for production use).
2708    pub fn flush_and_sync(&mut self) -> Result<()> {
2709        self.store.flush_and_sync().map_err(MCSError::IoError)
2710    }
2711}
2712
2713
2714
2715// ---------------------------------------------------------------------------
2716// GraphHandle – wait-free read / serialized-write handle.
2717// ---------------------------------------------------------------------------
2718
2719/// Wait-free read / serialized-write handle to the graph.
2720///
2721/// Readers load a frozen [`ReadSnapshot`] via [`read`](GraphHandle::read)
2722/// (lock-free via `ArcSwap`). Writers take a [`Mutex`] lock, mutate the
2723/// underlying [`KnowledgeGraph`], and publish a fresh snapshot on unlock
2724/// via the [`WriteGuard`] drop glue.
2725///
2726/// A background thread calls `fsync` on the WAL file every 1 second so that
2727/// write handlers never block on disk I/O. The thread is stopped on `Drop`.
2728///
2729/// The sync thread uses its own `Arc<File>` handle (cloned from the WAL file)
2730/// so that `fsync` never contends with the graph mutex. A [`Condvar`] notifies
2731/// the thread immediately after every write, ensuring low-latency sync without
2732/// polling.
2733pub struct GraphHandle {
2734    inner: Arc<parking_lot::Mutex<KnowledgeGraph>>,
2735    snapshot: ArcSwap<ReadSnapshot>,
2736    /// Cached JSON of the full `read_graph` output. Invalidated on every write.
2737    read_cache: ArcSwap<Option<Arc<str>>>,
2738    /// Notifies the background sync thread when a write has flushed data to the
2739    /// kernel buffer. The thread also wakes on a 1-second timeout as a fallback.
2740    /// The `bool` is `true` when there is pending data to sync.
2741    sync_notify: Arc<(StdMutex<bool>, Condvar)>,
2742    /// Signal the background sync thread to stop. Set on `Drop`.
2743    stop_sync: Arc<AtomicBool>,
2744}
2745
2746/// RAII guard that publishes a fresh [`ReadSnapshot`] on drop.
2747pub struct WriteGuard<'a> {
2748    guard: parking_lot::MutexGuard<'a, KnowledgeGraph>,
2749    snapshot: &'a ArcSwap<ReadSnapshot>,
2750    read_cache: &'a ArcSwap<Option<Arc<str>>>,
2751    sync_notify: &'a (StdMutex<bool>, Condvar),
2752    did_publish: bool,
2753}
2754
2755impl WriteGuard<'_> {
2756    /// Publish a snapshot now (eager, before drop). Also called by Drop.
2757    /// Invalidates the serialized read cache. Flushes the WAL to kernel buffer
2758    /// (the background sync thread in [`GraphHandle`] handles the actual `fsync`).
2759    pub fn publish(&mut self) {
2760        if let Err(e) = self.guard.flush() {
2761            tracing::error!("WAL flush failed: {e}");
2762        }
2763        let snap = Arc::new(self.guard.snapshot());
2764        self.snapshot.store(snap);
2765        self.read_cache.store(Arc::new(None));
2766        self.did_publish = true;
2767        // Wake the sync thread — data is in the kernel buffer waiting for fsync.
2768        let (lock, cvar) = self.sync_notify;
2769        let mut pending = lock.lock().unwrap_or_else(|e| e.into_inner());
2770        *pending = true;
2771        cvar.notify_one();
2772    }
2773
2774    /// Access the underlying `KnowledgeGraph` for mutation.
2775    pub fn graph(&mut self) -> &mut KnowledgeGraph {
2776        &mut self.guard
2777    }
2778}
2779
2780impl std::ops::Deref for WriteGuard<'_> {
2781    type Target = KnowledgeGraph;
2782    fn deref(&self) -> &KnowledgeGraph {
2783        &self.guard
2784    }
2785}
2786
2787impl std::ops::DerefMut for WriteGuard<'_> {
2788    fn deref_mut(&mut self) -> &mut KnowledgeGraph {
2789        &mut self.guard
2790    }
2791}
2792
2793impl Drop for WriteGuard<'_> {
2794    fn drop(&mut self) {
2795        if !self.did_publish {
2796            self.publish();
2797        }
2798    }
2799}
2800
2801impl Drop for GraphHandle {
2802    fn drop(&mut self) {
2803        self.stop_sync.store(true, Ordering::Relaxed);
2804        // Wake the sync thread by setting pending=true so the
2805        // wait_timeout_while(|p| !*p) condition breaks immediately.
2806        let (lock, cvar) = &*self.sync_notify;
2807        let mut pending = lock.lock().unwrap_or_else(|e| e.into_inner());
2808        *pending = true;
2809        cvar.notify_one();
2810    }
2811}
2812
2813impl GraphHandle {
2814    /// Open or create the graph at `path`, seeding the initial snapshot.
2815    /// Spawns a background thread that `fsync`s the WAL so that write handlers
2816    /// never block on disk I/O.
2817    pub fn new(path: &Path) -> std::io::Result<Self> {
2818        let kg = KnowledgeGraph::new(path)?;
2819        let snapshot = Arc::new(kg.snapshot());
2820        // Clone the sync file handle before moving `kg` into the Mutex.
2821        let sync_file = Arc::clone(&kg.store.sync_file);
2822        let inner = Arc::new(parking_lot::Mutex::new(kg));
2823
2824        let sync_notify: Arc<(StdMutex<bool>, Condvar)> =
2825            Arc::new((StdMutex::new(false), Condvar::new()));
2826        let notify = Arc::clone(&sync_notify);
2827        let stop_sync = Arc::new(AtomicBool::new(false));
2828
2829        // Background sync thread — calls fsync on a dedicated `Arc<File>`
2830        // handle, never touching the graph mutex. Woken by Condvar after every
2831        // write; falls back to a 1-second timeout.
2832        let sync_stop = Arc::clone(&stop_sync);
2833        std::thread::Builder::new()
2834            .name("mcp-memory-sync".into())
2835            .spawn(move || {
2836                let (lock, cvar) = &*notify;
2837                loop {
2838                    // Wait while there is nothing to sync. Woken by publish()
2839                    // or by the 1-second timeout.
2840                    let mut guard = cvar
2841                        .wait_timeout_while(
2842                            lock.lock().unwrap_or_else(|e| e.into_inner()),
2843                            std::time::Duration::from_secs(1),
2844                            |p| !*p,
2845                        )
2846                        .unwrap_or_else(|e| e.into_inner())
2847                        .0;
2848
2849                    let should_sync = *guard;
2850                    *guard = false;
2851                    // Release the StdMutex before fsync so publish() is not
2852                    // blocked on setting the next pending flag.
2853                    drop(guard);
2854
2855                    if should_sync {
2856                        if let Err(e) = sync_file.sync_data() {
2857                            tracing::error!("WAL fsync failed: {e}");
2858                        }
2859                    }
2860
2861                    if sync_stop.load(Ordering::Relaxed) {
2862                        // One final fsync before exiting.
2863                        if let Err(e) = sync_file.sync_data() {
2864                            tracing::error!("WAL final fsync failed: {e}");
2865                        }
2866                        break;
2867                    }
2868                }
2869            })
2870            .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
2871
2872        Ok(Self {
2873            inner,
2874            snapshot: ArcSwap::new(snapshot),
2875            read_cache: ArcSwap::new(Arc::new(None)),
2876            sync_notify,
2877            stop_sync,
2878        })
2879    }
2880
2881    /// Return the cached full-graph JSON, or build and cache it on first call.
2882    /// Invalidated by any write (see [`WriteGuard::publish`]).
2883    pub fn read_graph_cached(&self) -> Arc<str> {
2884        if let Some(cached) = self.read_cache.load().as_ref() {
2885            return cached.clone();
2886        }
2887        let graph = self.read();
2888        let json: Arc<str> = Arc::from(graph.read_graph_json().into_boxed_str());
2889        self.read_cache.store(Arc::new(Some(json.clone())));
2890        json
2891    }
2892
2893    /// Lock-free read snapshot. Holds an `Arc` reference to the frozen graph data.
2894    pub fn read(&self) -> ReadSnapshot {
2895        (**self.snapshot.load()).clone()
2896    }
2897
2898    /// Serialised write access. Returns a guard that publishes a fresh snapshot
2899    /// when dropped (or when [`WriteGuard::publish`] is called eagerly).
2900    pub fn write(&self) -> WriteGuard<'_> {
2901        WriteGuard {
2902            guard: self.inner.lock(),
2903            snapshot: &self.snapshot,
2904            read_cache: &self.read_cache,
2905            sync_notify: &self.sync_notify,
2906            did_publish: false,
2907        }
2908    }
2909}
2910
2911