Skip to main content

mcp_memory/
kg.rs

1use std::collections::VecDeque;
2use std::path::Path;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::{Arc, Condvar, Mutex as StdMutex};
5
6use ahash::{AHashMap, AHashSet};
7use arc_swap::ArcSwap;
8
9use serde::ser::{Serialize, SerializeSeq, SerializeStruct, Serializer};
10
11use crate::config::Durability;
12use crate::errors::{MCSError, Result};
13use crate::intern::{StrId, StringInterner};
14use crate::types::{Entity, Relation, KnowledgeGraphOut};
15use crate::search::SearchIndex;
16use crate::store::{self as store_enc, BinaryStore, RecordKind};
17
18const NAME_TABLE_SHARDS: usize = 4;
19
20// ---------------------------------------------------------------------------
21// Prefetch helper – issues a non-binding software prefetch hint to pull a
22// cache-line into L1/L2 while we finish probing the current entry.
23// ---------------------------------------------------------------------------
24#[cfg(target_arch = "x86_64")]
25#[inline(always)]
26unsafe fn prefetch_addr(addr: *const u8) {
27    // _MM_HINT_T0 = 3  (temporal prefetch to all cache levels)
28    std::arch::x86_64::_mm_prefetch::<3>(addr);
29}
30
31#[cfg(not(target_arch = "x86_64"))]
32#[inline(always)]
33const unsafe fn prefetch_addr(_addr: *const u8) {}
34
35/// fsync the directory containing `path` so a rename/create inside it is durable.
36/// On platforms where a directory cannot be opened/synced this is a no-op.
37fn sync_parent_dir(path: &Path) -> std::io::Result<()> {
38    let dir = path.parent().filter(|p| !p.as_os_str().is_empty());
39    let dir = match dir {
40        Some(d) => d,
41        None => Path::new("."),
42    };
43    match std::fs::File::open(dir) {
44        Ok(f) => match f.sync_all() {
45            Ok(()) => Ok(()),
46            // Some filesystems disallow fsync on a directory handle; tolerate it.
47            Err(e) if e.kind() == std::io::ErrorKind::InvalidInput => Ok(()),
48            Err(e) => Err(e),
49        },
50        Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
51        Err(e) => Err(e),
52    }
53}
54
55// ---------------------------------------------------------------------------
56// StoredEntity / StoredRelation – internal representations using StrId.
57// ---------------------------------------------------------------------------
58// Default layout: 40 B / align 8 (Rust packs `state` into the Vec's padding and
59// the `Option` niche is free). With `cache_align`, the slot is rounded to a full
60// 64-byte line so a point lookup/mutation (name_table -> slot index) touches
61// exactly one cache line instead of occasionally straddling two. Costs +60%
62// memory and a wider stride on bulk scans — measure before enabling.
63#[derive(Clone)]
64#[cfg_attr(feature = "cache_align", repr(align(64)))]
65pub(crate) struct StoredEntity {
66    pub(crate) name: StrId,
67    pub(crate) entity_type: StrId,
68    pub(crate) observations: Vec<StrId>,
69}
70
71// Default layout: 12 B / align 4 → ~1 in 5 records straddles a 64-byte line.
72// With `cache_align`, align(16) rounds the size to 16 B so 4 records fill a line
73// exactly (no straddle, AVX2-load-friendly) for +33% memory.
74#[derive(Clone)]
75#[cfg_attr(feature = "cache_align", repr(align(16)))]
76pub(crate) struct StoredRelation {
77    pub(crate) from: StrId,
78    pub(crate) to: StrId,
79    pub(crate) relation_type: StrId,
80}
81
82// ---------------------------------------------------------------------------
83// Borrowing serialization views (M6).
84//
85// Read tools used to build owned `Entity`/`Relation` vecs (a fresh `String`
86// per name/type/observation) and *then* serialize them — roughly 2-3x the
87// graph resident at once. These views instead hold references to the selected
88// stored records and emit their interned `&str` directly during
89// serialization, with no intermediate owned strings. The emitted JSON is
90// byte-for-byte identical to serializing `KnowledgeGraphOut`.
91// ---------------------------------------------------------------------------
92
93/// A borrowing view over a selected slice of the graph. Serializes to
94/// `{"entities":[...],"relations":[...]}`.
95pub struct GraphView<'a> {
96    kg: &'a KnowledgeGraph,
97    entities: Vec<&'a StoredEntity>,
98    relations: Vec<&'a StoredRelation>,
99}
100
101impl GraphView<'_> {
102    /// Materialize into the owned [`KnowledgeGraphOut`]. Used by the direct
103    /// (non-serializing) callers and tests; the server's read handlers
104    /// serialize the view directly instead.
105    pub fn to_owned_out(&self) -> KnowledgeGraphOut {
106        KnowledgeGraphOut {
107            entities: self.entities.iter().map(|e| self.kg.entity_to_output(e)).collect(),
108            relations: self.relations.iter().map(|r| self.kg.relation_to_output(r)).collect(),
109        }
110    }
111}
112
113impl Serialize for GraphView<'_> {
114    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
115        let mut st = s.serialize_struct("KnowledgeGraphOut", 2)?;
116        st.serialize_field("entities", &EntityListRef { kg: self.kg, items: &self.entities })?;
117        st.serialize_field("relations", &RelationListRef { kg: self.kg, items: &self.relations })?;
118        st.end()
119    }
120}
121
122struct EntityListRef<'a> { kg: &'a KnowledgeGraph, items: &'a [&'a StoredEntity] }
123impl Serialize for EntityListRef<'_> {
124    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
125        let mut seq = s.serialize_seq(Some(self.items.len()))?;
126        for &e in self.items {
127            seq.serialize_element(&EntityRef { kg: self.kg, e })?;
128        }
129        seq.end()
130    }
131}
132
133struct RelationListRef<'a> { kg: &'a KnowledgeGraph, items: &'a [&'a StoredRelation] }
134impl Serialize for RelationListRef<'_> {
135    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
136        let mut seq = s.serialize_seq(Some(self.items.len()))?;
137        for &r in self.items {
138            seq.serialize_element(&RelationRef { kg: self.kg, r })?;
139        }
140        seq.end()
141    }
142}
143
144struct EntityRef<'a> { kg: &'a KnowledgeGraph, e: &'a StoredEntity }
145impl Serialize for EntityRef<'_> {
146    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
147        let mut st = s.serialize_struct("Entity", 3)?;
148        st.serialize_field("name", self.kg.interner.lookup(self.e.name))?;
149        st.serialize_field("entityType", self.kg.interner.lookup(self.e.entity_type))?;
150        st.serialize_field("observations", &ObsRef { kg: self.kg, obs: &self.e.observations })?;
151        st.end()
152    }
153}
154
155struct ObsRef<'a> { kg: &'a KnowledgeGraph, obs: &'a [StrId] }
156impl Serialize for ObsRef<'_> {
157    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
158        let mut seq = s.serialize_seq(Some(self.obs.len()))?;
159        for &o in self.obs {
160            seq.serialize_element(self.kg.interner.lookup(o))?;
161        }
162        seq.end()
163    }
164}
165
166struct RelationRef<'a> { kg: &'a KnowledgeGraph, r: &'a StoredRelation }
167impl Serialize for RelationRef<'_> {
168    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
169        let mut st = s.serialize_struct("Relation", 3)?;
170        st.serialize_field("from", self.kg.interner.lookup(self.r.from))?;
171        st.serialize_field("to", self.kg.interner.lookup(self.r.to))?;
172        st.serialize_field("relationType", self.kg.interner.lookup(self.r.relation_type))?;
173        st.end()
174    }
175}
176
177/// Edge-following direction for neighborhood queries.
178#[derive(Clone, Copy, PartialEq, Eq, Debug)]
179pub enum Direction {
180    /// Follow `from -> to` (outgoing edges).
181    Out,
182    /// Follow `to -> from` (incoming edges).
183    In,
184    /// Follow edges regardless of orientation.
185    Both,
186}
187
188impl Direction {
189    /// Parse a direction string; anything other than `"out"`/`"in"` is `Both`.
190    pub fn parse(s: Option<&str>) -> Self {
191        match s {
192            Some("out") => Direction::Out,
193            Some("in") => Direction::In,
194            _ => Direction::Both,
195        }
196    }
197}
198
199/// Escape a string for embedding inside a Mermaid/DOT quoted label.
200fn sanitize_label(s: &str) -> String {
201    let mut out = String::with_capacity(s.len());
202    for c in s.chars() {
203        match c {
204            '"' => out.push('\''),
205            '\n' | '\r' => out.push(' '),
206            _ => out.push(c),
207        }
208    }
209    out
210}
211
212// ---------------------------------------------------------------------------
213// ShardedNameTable – open-addressing hash map split into N independent shards.
214//
215// Each shard uses **ctrl-byte bucket** approach: a 1-byte metadata array
216// stores the 7-bit hash stamp (h2) for each slot, with `0xFF` = EMPTY.
217// On probe, the first memory access is a single byte (ctrl). The full key
218// (StrId) is only compared when the stamp matches — ~127/128 of probe steps
219// touch nothing but the ctrl byte.  See also SwissTable / hashbrown.
220// ---------------------------------------------------------------------------
221const EMPTY_SLOT: u8 = 0xFF;
222
223#[inline(always)]
224const fn h2(hash: u64) -> u8 {
225    (hash & 0x7F) as u8
226}
227
228#[inline(always)]
229const fn h1(hash: u64, mask: usize) -> usize {
230    ((hash >> 7) as usize) & mask
231}
232
233#[derive(Clone)]
234struct NameTableShard {
235    ctrl: Vec<u8>,      // 0xFF = empty; 0x00-0x7F = h2 stamp (bit 7 always clear)
236    names: Vec<StrId>,
237    slots: Vec<u32>,
238    mask: usize,
239    count: usize,
240}
241
242impl NameTableShard {
243    fn new(capacity: usize) -> Self {
244        let cap = capacity.next_power_of_two().max(16);
245        Self {
246            ctrl: vec![EMPTY_SLOT; cap],
247            names: vec![StrId::EMPTY; cap],
248            slots: vec![u32::MAX; cap],
249            mask: cap - 1,
250            count: 0,
251        }
252    }
253
254    #[inline(always)]
255    fn lookup(&self, hash: u64, name: StrId) -> Option<u32> {
256        let stamp = h2(hash);
257        let mask = self.mask;
258        let mut idx = h1(hash, mask);
259        let ctrl = self.ctrl.as_ptr();
260        let names = self.names.as_ptr();
261        let slots = self.slots.as_ptr();
262        let len = self.ctrl.len();
263
264        for _ in 0..len {
265            // Prefetch the ctrl byte 4 slots ahead — overlaps memory latency.
266            let prefetch_idx = idx.wrapping_add(4) & mask;
267            unsafe { prefetch_addr(ctrl.add(prefetch_idx)) };
268
269            // SAFETY: idx always < len because of &mask on each iteration.
270            unsafe {
271                let c = *ctrl.add(idx);
272                // Bit 7 set → EMPTY → key not present.
273                if c & 0x80 != 0 {
274                    return None;
275                }
276                // Stamp match → compare full key (rare: ~1/128 probes).
277                if c == stamp && *names.add(idx) == name {
278                    return Some(*slots.add(idx));
279                }
280            }
281            idx = (idx + 1) & mask;
282        }
283        None
284    }
285
286    fn insert(&mut self, interner: &StringInterner, hash: u64, name: StrId, slot: u32) {
287        if self.count * 4 > self.ctrl.len() * 3 {
288            self.grow(interner);
289        }
290        let stamp = h2(hash);
291        let mask = self.mask;
292        let mut idx = h1(hash, mask);
293        loop {
294            // SAFETY: idx & mask always < len for power-of-two capacity.
295            unsafe {
296                if *self.ctrl.get_unchecked(idx) & 0x80 != 0 {
297                    *self.ctrl.get_unchecked_mut(idx) = stamp;
298                    *self.names.get_unchecked_mut(idx) = name;
299                    *self.slots.get_unchecked_mut(idx) = slot;
300                    self.count += 1;
301                    return;
302                }
303            }
304            idx = (idx + 1) & mask;
305        }
306    }
307
308    fn remove(&mut self, interner: &StringInterner, hash: u64, name: StrId) {
309        let stamp = h2(hash);
310        let mask = self.mask;
311        let mut idx = h1(hash, mask);
312        let len = self.ctrl.len();
313        for _ in 0..len {
314            if self.ctrl[idx] & 0x80 != 0 {
315                return;
316            }
317            if self.ctrl[idx] == stamp && self.names[idx] == name {
318                // Found — remove with shift-back to preserve probe chains.
319                self.ctrl[idx] = EMPTY_SLOT;
320                self.names[idx] = StrId::EMPTY;
321                self.slots[idx] = u32::MAX;
322                self.count -= 1;
323
324                let mut next = (idx + 1) & mask;
325                while self.ctrl[next] & 0x80 == 0 {
326                    let nn = self.names[next];
327                    let ns = self.slots[next];
328                    // Hash is no longer stored (M4) — recompute it from the
329                    // interned name to find the entry's ideal bucket.
330                    let nh = interner.get_hash(nn);
331                    self.ctrl[next] = EMPTY_SLOT;
332                    self.names[next] = StrId::EMPTY;
333                    self.slots[next] = u32::MAX;
334                    self.count -= 1;
335
336                    // Re-insert at its ideal bucket.
337                    let nstamp = h2(nh);
338                    let mut re_idx = h1(nh, mask);
339                    while self.ctrl[re_idx] & 0x80 == 0 {
340                        re_idx = (re_idx + 1) & mask;
341                    }
342                    self.ctrl[re_idx] = nstamp;
343                    self.names[re_idx] = nn;
344                    self.slots[re_idx] = ns;
345                    self.count += 1;
346
347                    next = (next + 1) & mask;
348                }
349                return;
350            }
351            idx = (idx + 1) & mask;
352        }
353    }
354
355    fn grow(&mut self, interner: &StringInterner) {
356        let new_cap = self.ctrl.len() * 2;
357        let new_mask = new_cap - 1;
358        let mut new_ctrl = vec![EMPTY_SLOT; new_cap];
359        let mut new_names = vec![StrId::EMPTY; new_cap];
360        let mut new_slots = vec![u32::MAX; new_cap];
361
362        for i in 0..self.ctrl.len() {
363            if self.ctrl[i] & 0x80 == 0 {
364                // Recompute the hash from the interned name (M4: not stored).
365                let name = self.names[i];
366                let hash = interner.get_hash(name);
367                let stamp = h2(hash);
368                let mut idx = h1(hash, new_mask);
369                while new_ctrl[idx] & 0x80 == 0 {
370                    idx = (idx + 1) & new_mask;
371                }
372                new_ctrl[idx] = stamp;
373                new_names[idx] = name;
374                new_slots[idx] = self.slots[i];
375            }
376        }
377
378        self.ctrl = new_ctrl;
379        self.names = new_names;
380        self.slots = new_slots;
381        self.mask = new_mask;
382    }
383}
384
385#[derive(Clone)]
386struct ShardedNameTable {
387    shards: [NameTableShard; NAME_TABLE_SHARDS],
388}
389
390impl ShardedNameTable {
391    fn new(capacity_per_shard: usize) -> Self {
392        Self {
393            shards: [
394                NameTableShard::new(capacity_per_shard),
395                NameTableShard::new(capacity_per_shard),
396                NameTableShard::new(capacity_per_shard),
397                NameTableShard::new(capacity_per_shard),
398            ],
399        }
400    }
401
402    #[inline(always)]
403    const fn shard(hash: u64) -> usize {
404        (hash as usize) & (NAME_TABLE_SHARDS - 1)
405    }
406
407    #[inline(always)]
408    fn lookup(&self, hash: u64, name: StrId) -> Option<u32> {
409        self.shards[Self::shard(hash)].lookup(hash, name)
410    }
411
412    #[inline(always)]
413    fn insert(&mut self, interner: &StringInterner, hash: u64, name: StrId, slot: u32) {
414        self.shards[Self::shard(hash)].insert(interner, hash, name, slot);
415    }
416
417    #[inline(always)]
418    fn remove(&mut self, interner: &StringInterner, hash: u64, name: StrId) {
419        self.shards[Self::shard(hash)].remove(interner, hash, name);
420    }
421}
422
423// ---------------------------------------------------------------------------
424// KnowledgeGraph – the central type.
425// ---------------------------------------------------------------------------
426pub struct KnowledgeGraph {
427    interner: StringInterner,
428    entity_slots: Vec<Option<StoredEntity>>,
429    /// Tombstoned slot indices available for reuse on the next create (M2),
430    /// so create/delete churn doesn't grow `entity_slots` without bound.
431    free_slots: Vec<u32>,
432    name_table: ShardedNameTable,
433    relations: Vec<StoredRelation>,
434    /// Incremental adjacency index: StrId → outgoing (to, type) pairs.
435    /// Updated on every create_relations/delete_relations/delete_entities.
436    /// Traversals use this instead of rebuilding from scratch (item 3 in plan).
437    adjacency: AHashMap<StrId, Vec<(StrId, StrId)>>,
438    search: SearchIndex,
439    store: BinaryStore,
440}
441
442// ---------------------------------------------------------------------------
443// ReadSnapshot – wait-free, lock-free frozen view of the graph for readers.
444// Created by KnowledgeGraph::snapshot() after each write transaction.
445// ---------------------------------------------------------------------------
446#[derive(Clone)]
447pub struct ReadSnapshot {
448    pub(crate) interner: Arc<StringInterner>,
449    pub(crate) entity_slots: Arc<[Option<StoredEntity>]>,
450    name_table: Arc<ShardedNameTable>,
451    pub(crate) relations: Arc<[StoredRelation]>,
452    adjacency: Arc<AHashMap<StrId, Vec<(StrId, StrId)>>>,
453    search: Arc<SearchIndex>,
454}
455
456// Fast JSON string escaper — writes escaped string into the buffer without
457// allocating an intermediate string for the escape pass.
458pub(crate) fn push_json_str(buf: &mut String, s: &str) {
459    buf.push('"');
460    for c in s.chars() {
461        match c {
462            '"' => buf.push_str("\\\""),
463            '\\' => buf.push_str("\\\\"),
464            '\n' => buf.push_str("\\n"),
465            '\r' => buf.push_str("\\r"),
466            '\t' => buf.push_str("\\t"),
467            c if c.is_control() => {
468                use std::fmt::Write;
469                write!(buf, "\\u{:04x}", c as u32).unwrap();
470            }
471            c => buf.push(c),
472        }
473    }
474    buf.push('"');
475}
476
477/// A borrowing view over a selection of entities/relations from a ReadSnapshot.
478/// Serializes without intermediate owned String allocations.
479pub struct ReadGraphView<'a> {
480    snap: &'a ReadSnapshot,
481    entities: Vec<&'a StoredEntity>,
482    relations: Vec<&'a StoredRelation>,
483}
484
485impl Serialize for ReadGraphView<'_> {
486    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
487        let mut st = s.serialize_struct("KnowledgeGraphOut", 2)?;
488        st.serialize_field("entities", &ReadEntityListRef { snap: self.snap, items: &self.entities })?;
489        st.serialize_field("relations", &ReadRelationListRef { snap: self.snap, items: &self.relations })?;
490        st.end()
491    }
492}
493
494struct ReadEntityListRef<'a> { snap: &'a ReadSnapshot, items: &'a [&'a StoredEntity] }
495impl Serialize for ReadEntityListRef<'_> {
496    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
497        let mut seq = s.serialize_seq(Some(self.items.len()))?;
498        for &e in self.items {
499            seq.serialize_element(&ReadEntityRef { snap: self.snap, e })?;
500        }
501        seq.end()
502    }
503}
504
505struct ReadRelationListRef<'a> { snap: &'a ReadSnapshot, items: &'a [&'a StoredRelation] }
506impl Serialize for ReadRelationListRef<'_> {
507    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
508        let mut seq = s.serialize_seq(Some(self.items.len()))?;
509        for &r in self.items {
510            seq.serialize_element(&ReadRelationRef { snap: self.snap, r })?;
511        }
512        seq.end()
513    }
514}
515
516struct ReadEntityRef<'a> { snap: &'a ReadSnapshot, e: &'a StoredEntity }
517impl Serialize for ReadEntityRef<'_> {
518    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
519        let mut st = s.serialize_struct("Entity", 3)?;
520        st.serialize_field("name", self.snap.interner.lookup(self.e.name))?;
521        st.serialize_field("entityType", self.snap.interner.lookup(self.e.entity_type))?;
522        st.serialize_field("observations", &ReadObsRef { snap: self.snap, obs: &self.e.observations })?;
523        st.end()
524    }
525}
526
527struct ReadObsRef<'a> { snap: &'a ReadSnapshot, obs: &'a [StrId] }
528impl Serialize for ReadObsRef<'_> {
529    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
530        let mut seq = s.serialize_seq(Some(self.obs.len()))?;
531        for &o in self.obs {
532            seq.serialize_element(self.snap.interner.lookup(o))?;
533        }
534        seq.end()
535    }
536}
537
538struct ReadRelationRef<'a> { snap: &'a ReadSnapshot, r: &'a StoredRelation }
539impl Serialize for ReadRelationRef<'_> {
540    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
541        let mut st = s.serialize_struct("Relation", 3)?;
542        st.serialize_field("from", self.snap.interner.lookup(self.r.from))?;
543        st.serialize_field("to", self.snap.interner.lookup(self.r.to))?;
544        st.serialize_field("relationType", self.snap.interner.lookup(self.r.relation_type))?;
545        st.end()
546    }
547}
548
549// --- ReadSnapshot helpers (mirrors KnowledgeGraph helpers) ---
550impl ReadSnapshot {
551
552    /// Serialize the full graph directly to a JSON string, avoiding intermediate
553    /// owned `Entity`/`Relation` allocations. This is the fast path for handlers.
554    pub fn read_graph_json(&self) -> String {
555        // Rough capacity: live entities × ~60 B + relations × ~55 B.
556        let live = self.entity_slots.iter().filter(|s| s.is_some()).count();
557        let cap = live * 64 + self.relations.len() * 60 + 128;
558        let mut buf = String::with_capacity(cap);
559
560        // entities array
561        buf.push_str(r#"{"entities":["#);
562        let mut first = true;
563        for slot in self.entity_slots.iter() {
564            let Some(e) = slot.as_ref() else { continue };
565            if first { first = false } else { buf.push(',') }
566            buf.push('{');
567            // name
568            buf.push_str(r#""name":"#);
569            push_json_str(&mut buf, self.interner.lookup(e.name));
570            buf.push(',');
571            // entityType
572            buf.push_str(r#""entityType":"#);
573            push_json_str(&mut buf, self.interner.lookup(e.entity_type));
574            buf.push(',');
575            // observations
576            buf.push_str(r#""observations":["#);
577            for (oi, o) in e.observations.iter().enumerate() {
578                if oi > 0 { buf.push(',') }
579                push_json_str(&mut buf, self.interner.lookup(*o));
580            }
581            buf.push_str("]}");
582        }
583
584        // relations array
585        buf.push_str(r#"],"relations":["#);
586        first = true;
587        for r in self.relations.iter() {
588            if first { first = false } else { buf.push(',') }
589            buf.push('{');
590            buf.push_str(r#""from":"#);
591            push_json_str(&mut buf, self.interner.lookup(r.from));
592            buf.push(',');
593            buf.push_str(r#""to":"#);
594            push_json_str(&mut buf, self.interner.lookup(r.to));
595            buf.push(',');
596            buf.push_str(r#""relationType":"#);
597            push_json_str(&mut buf, self.interner.lookup(r.relation_type));
598            buf.push('}');
599        }
600        buf.push_str("]}");
601
602        buf
603    }
604
605    /// Borrowing view of the full graph, suitable for serde-serialization without
606    /// intermediate owned String allocations.
607    pub fn read_graph_view(&self) -> ReadGraphView<'_> {
608        let entities: Vec<&StoredEntity> = self
609            .entity_slots
610            .iter()
611            .filter_map(|s| s.as_ref())
612            .collect();
613        let relations: Vec<&StoredRelation> = self.relations.iter().collect();
614        ReadGraphView { snap: self, entities, relations }
615    }
616
617    fn lookup_live_slot(&self, name: &str) -> Option<u32> {
618        let name_id = self.interner.get_optional(name)?;
619        let hash = self.interner.get_hash(name_id);
620        let slot = self.name_table.lookup(hash, name_id)?;
621        self.entity_slots
622            .get(slot as usize)
623            .and_then(|s| s.as_ref())
624            ?;
625        Some(slot)
626    }
627
628    fn entity_by_name_id(&self, name_id: StrId) -> Option<Entity> {
629        let hash = self.interner.get_hash(name_id);
630        let slot = self.name_table.lookup(hash, name_id)?;
631        let e = self.entity_slots.get(slot as usize)?.as_ref()?;
632        Some(self.entity_to_output(e))
633    }
634
635    pub(crate) fn entity_to_output(&self, e: &StoredEntity) -> Entity {
636        Entity {
637            name: self.interner.lookup(e.name).to_string(),
638            entity_type: self.interner.lookup(e.entity_type).to_string(),
639            observations: e
640                .observations
641                .iter()
642                .map(|o| self.interner.lookup(*o).to_string())
643                .collect(),
644        }
645    }
646
647    pub(crate) fn relation_to_output(&self, r: &StoredRelation) -> Relation {
648        Relation {
649            from: self.interner.lookup(r.from).to_string(),
650            to: self.interner.lookup(r.to).to_string(),
651            relation_type: self.interner.lookup(r.relation_type).to_string(),
652        }
653    }
654
655    /// Open named entities + incident relations.
656    pub fn open_nodes(&self, names: &[String]) -> KnowledgeGraphOut {
657        let name_ids: std::collections::HashSet<StrId> = names
658            .iter()
659            .filter_map(|n| self.interner.get_optional(n))
660            .collect();
661        let entities: Vec<Entity> = self
662            .entity_slots
663            .iter()
664            .filter_map(|s| {
665                let e = s.as_ref()?;
666                if name_ids.contains(&e.name) {
667                    Some(self.entity_to_output(e))
668                } else {
669                    None
670                }
671            })
672            .collect();
673        let matched: std::collections::HashSet<StrId> = entities.iter()
674            .filter_map(|e| self.interner.get_optional(&e.name))
675            .collect();
676        let relations: Vec<Relation> = self
677            .relations
678            .iter()
679            .filter(|r| matched.contains(&r.from) || matched.contains(&r.to))
680            .map(|r| self.relation_to_output(r))
681            .collect();
682        KnowledgeGraphOut { entities, relations }
683    }
684
685    /// Full graph read. Returns all entities and relations.
686    pub fn read_graph(&self) -> KnowledgeGraphOut {
687        let entities: Vec<Entity> = self
688            .entity_slots
689            .iter()
690            .filter_map(|s| s.as_ref())
691            .map(|e| self.entity_to_output(e))
692            .collect();
693        let relations: Vec<Relation> = self
694            .relations
695            .iter()
696            .map(|r| self.relation_to_output(r))
697            .collect();
698        KnowledgeGraphOut { entities, relations }
699    }
700
701    /// Get a single entity by name.
702    pub fn get_entity(&self, name: &str) -> Option<Entity> {
703        self.lookup_live_slot(name)?;
704        let name_id = self.interner.get_optional(name)?;
705        self.entity_by_name_id(name_id)
706    }
707
708    /// Neighborhood expansion (same logic as KnowledgeGraph::neighbors but read-only).
709    pub fn neighbors(
710        &self,
711        name: &str,
712        direction: Direction,
713        rtype: Option<&str>,
714        depth: u32,
715    ) -> Result<KnowledgeGraphOut> {
716        self.lookup_live_slot(name)
717            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
718        let start = self.interner.get_optional(name).unwrap();
719
720        let rtype_id = match rtype {
721            Some(r) => match self.interner.get_optional(r) {
722                Some(id) => Some(id),
723                None => {
724                    let entities = self.entity_by_name_id(start).into_iter().collect();
725                    return Ok(KnowledgeGraphOut { entities, relations: Vec::new() });
726                }
727            },
728            None => None,
729        };
730
731        let mut visited: AHashSet<StrId> = AHashSet::new();
732        visited.insert(start);
733
734        let type_ok = |r: &StoredRelation, rt: Option<StrId>| rt.is_none_or(|rt_id| r.relation_type == rt_id);
735
736        if depth == 1 {
737            for r in self.relations.iter().filter(|r| type_ok(r, rtype_id)) {
738                match direction {
739                    Direction::Out => {
740                        if r.from == start { visited.insert(r.to); }
741                    }
742                    Direction::In => {
743                        if r.to == start { visited.insert(r.from); }
744                    }
745                    Direction::Both => {
746                        if r.from == start { visited.insert(r.to); }
747                        else if r.to == start { visited.insert(r.from); }
748                    }
749                }
750            }
751        } else if depth >= 2 {
752            let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
753            match direction {
754                Direction::Both => {
755                    for (&node, edges) in &*self.adjacency {
756                        for &(nb, rt) in edges {
757                            if rtype_id.is_none_or(|rt_id| rt == rt_id) {
758                                adj.entry(node).or_default().push(nb);
759                            }
760                        }
761                    }
762                }
763                Direction::Out | Direction::In => {
764                    for r in self.relations.iter().filter(|r| type_ok(r, rtype_id)) {
765                        match direction {
766                            Direction::Out => adj.entry(r.from).or_default().push(r.to),
767                            Direction::In => adj.entry(r.to).or_default().push(r.from),
768                            _ => unreachable!(),
769                        }
770                    }
771                }
772            }
773            let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
774            queue.push_back((start, 0));
775            while let Some((node, d)) = queue.pop_front() {
776                if d >= depth { continue; }
777                if let Some(nbrs) = adj.get(&node) {
778                    for &nb in nbrs {
779                        if visited.insert(nb) {
780                            queue.push_back((nb, d + 1));
781                        }
782                    }
783                }
784            }
785        }
786
787        let mut entities = Vec::with_capacity(visited.len());
788        for &nid in &visited {
789            if let Some(e) = self.entity_by_name_id(nid) {
790                entities.push(e);
791            }
792        }
793        let relations: Vec<Relation> = self
794            .relations
795            .iter()
796            .filter(|r| type_ok(r, rtype_id) && visited.contains(&r.from) && visited.contains(&r.to))
797            .map(|r| self.relation_to_output(r))
798            .collect();
799        Ok(KnowledgeGraphOut { entities, relations })
800    }
801
802    /// Describe an entity: entity details, incident relations, neighbors, degree.
803    pub fn describe_entity(&self, name: &str) -> Result<serde_json::Value> {
804        let name_id = self
805            .interner
806            .get_optional(name)
807            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
808        let entity = self
809            .entity_by_name_id(name_id)
810            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
811
812        let mut incident: Vec<Relation> = Vec::new();
813        let mut neighbor_seen: AHashSet<StrId> = AHashSet::new();
814        let mut neighbors: Vec<&str> = Vec::new();
815        for r in self.relations.iter() {
816            if r.from == name_id || r.to == name_id {
817                incident.push(self.relation_to_output(r));
818                let other = if r.from == name_id { r.to } else { r.from };
819                if other != name_id && neighbor_seen.insert(other) {
820                    neighbors.push(self.interner.lookup(other));
821                }
822            }
823        }
824
825        Ok(serde_json::json!({
826            "entity": entity,
827            "relations": incident,
828            "neighbors": neighbors,
829            "degree": incident.len(),
830        }))
831    }
832
833    /// Find the shortest path between two entities.
834    pub fn find_path(&self, from: &str, to: &str) -> Result<Vec<String>> {
835        let from_id = self
836            .interner
837            .get_optional(from)
838            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
839        let to_id = self
840            .interner
841            .get_optional(to)
842            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
843        if self.lookup_live_slot(from).is_none() {
844            return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
845        }
846        if self.lookup_live_slot(to).is_none() {
847            return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
848        }
849
850        // BFS over incremental adjacency index.
851        let mut visited: AHashSet<StrId> = AHashSet::new();
852        let mut parent: AHashMap<StrId, StrId> = AHashMap::new();
853        let mut queue: VecDeque<StrId> = VecDeque::new();
854
855        visited.insert(from_id);
856        queue.push_back(from_id);
857
858        while let Some(current) = queue.pop_front() {
859            if current == to_id { break; }
860            if let Some(neighbors) = self.adjacency.get(&current) {
861                for &(neighbor, _) in neighbors {
862                    if visited.insert(neighbor) {
863                        parent.insert(neighbor, current);
864                        queue.push_back(neighbor);
865                    }
866                }
867            }
868        }
869
870        if !visited.contains(&to_id) {
871            return Err(MCSError::MemoryError(format!(
872                "No path found between '{from}' and '{to}'"
873            )));
874        }
875
876        let mut path = Vec::new();
877        let mut cur = to_id;
878        path.push(self.interner.lookup(cur).to_string());
879        while let Some(&p) = parent.get(&cur) {
880            path.push(self.interner.lookup(p).to_string());
881            cur = p;
882        }
883        path.reverse();
884        Ok(path)
885    }
886
887    /// Extract a subgraph around the given entities.
888    pub fn extract_subgraph(&self, names: &[String], depth: u32) -> Result<KnowledgeGraphOut> {
889        if names.is_empty() {
890            return Ok(KnowledgeGraphOut { entities: Vec::new(), relations: Vec::new() });
891        }
892        let mut visited: AHashSet<StrId> = AHashSet::new();
893        let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
894        for name in names {
895            if let Some(id) = self.interner.get_optional(name)
896                && visited.insert(id)
897            {
898                queue.push_back((id, 0));
899            }
900        }
901        let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
902        for (&node, edges) in &*self.adjacency {
903            let nbrs: Vec<StrId> = edges.iter().map(|(to, _)| *to).collect();
904            adj.insert(node, nbrs);
905        }
906        while let Some((node, d)) = queue.pop_front() {
907            if d >= depth { continue; }
908            if let Some(nbrs) = adj.get(&node) {
909                for &nb in nbrs {
910                    if visited.insert(nb) {
911                        queue.push_back((nb, d + 1));
912                    }
913                }
914            }
915        }
916        let mut entities: Vec<Entity> = Vec::with_capacity(visited.len());
917        for &nid in &visited {
918            if let Some(e) = self.entity_by_name_id(nid) {
919                entities.push(e);
920            }
921        }
922        let relations: Vec<Relation> = self
923            .relations
924            .iter()
925            .filter(|r| visited.contains(&r.from) && visited.contains(&r.to))
926            .map(|r| self.relation_to_output(r))
927            .collect();
928        Ok(KnowledgeGraphOut { entities, relations })
929    }
930
931    /// Batch get entities.
932    pub fn batch_get_entities(&self, names: &[String]) -> Vec<Option<Entity>> {
933        names.iter().map(|n| self.get_entity(n)).collect()
934    }
935
936    /// Graph statistics.
937    pub fn graph_stats(&self) -> serde_json::Value {
938        let entity_count = self
939            .entity_slots
940            .iter()
941            .filter(|s| s.is_some())
942            .count();
943        let relation_count = self.relations.len();
944        let type_counts = self.entity_type_counts();
945        let relation_type_counts = self.relation_type_counts();
946        serde_json::json!({
947            "entities": entity_count,
948            "relations": relation_count,
949            "entityTypes": type_counts,
950            "relationTypes": relation_type_counts,
951        })
952    }
953
954    /// Search relations by from/to/type filters.
955    pub fn search_relations(&self, from: Option<&str>, to: Option<&str>, rtype: Option<&str>) -> Vec<Relation> {
956        let from_id = from.and_then(|n| self.interner.get_optional(n));
957        let to_id = to.and_then(|n| self.interner.get_optional(n));
958        let rtype_id = rtype.and_then(|n| self.interner.get_optional(n));
959        self.relations
960            .iter()
961            .filter(|r| {
962                from_id.is_none_or(|id| r.from == id)
963                    && to_id.is_none_or(|id| r.to == id)
964                    && rtype_id.is_none_or(|id| r.relation_type == id)
965            })
966            .map(|r| self.relation_to_output(r))
967            .collect()
968    }
969
970    /// Count entities per type.
971    pub fn entity_type_counts(&self) -> Vec<(String, usize)> {
972        let mut counts: AHashMap<StrId, usize> = AHashMap::new();
973        for slot in self.entity_slots.iter() {
974            if let Some(e) = slot.as_ref() {
975                *counts.entry(e.entity_type).or_default() += 1;
976            }
977        }
978        let mut result: Vec<(String, usize)> = counts
979            .into_iter()
980            .map(|(id, c)| (self.interner.lookup(id).to_string(), c))
981            .collect();
982        result.sort_by(|a, b| a.0.cmp(&b.0));
983        result
984    }
985
986    /// Count relations per type.
987    pub fn relation_type_counts(&self) -> Vec<(String, usize)> {
988        let mut counts: AHashMap<StrId, usize> = AHashMap::new();
989        for r in self.relations.iter() {
990            *counts.entry(r.relation_type).or_default() += 1;
991        }
992        let mut result: Vec<(String, usize)> = counts
993            .into_iter()
994            .map(|(id, c)| (self.interner.lookup(id).to_string(), c))
995            .collect();
996        result.sort_by(|a, b| a.0.cmp(&b.0));
997        result
998    }
999
1000    /// Export the graph in one of: json, mermaid, dot.
1001    pub fn export(&self, format: &str) -> Result<String> {
1002        match format {
1003            "json" => serde_json::to_string(&self.read_graph()).map_err(MCSError::JsonError),
1004            "mermaid" => Ok(self.export_mermaid()),
1005            "dot" => Ok(self.export_dot()),
1006            other => Err(MCSError::InvalidParams(format!(
1007                "Unknown export format '{other}' (expected json|mermaid|dot)"
1008            ))),
1009        }
1010    }
1011
1012    fn export_mermaid(&self) -> String {
1013        let mut out = String::with_capacity(4096);
1014        out.push_str("graph LR\n");
1015        for r in self.relations.iter() {
1016            let from = sanitize_label(self.interner.lookup(r.from));
1017            let to = sanitize_label(self.interner.lookup(r.to));
1018            let rt = sanitize_label(self.interner.lookup(r.relation_type));
1019            out.push_str(&format!("    {} -- \"{}\" --> {}\n", from, rt, to));
1020        }
1021        out
1022    }
1023
1024    fn export_dot(&self) -> String {
1025        let mut out = String::with_capacity(4096);
1026        out.push_str("digraph KG {\n");
1027        out.push_str("    rankdir=LR;\n");
1028        for slot in self.entity_slots.iter() {
1029            if let Some(e) = slot.as_ref() {
1030                let name = sanitize_label(self.interner.lookup(e.name));
1031                let etype = sanitize_label(self.interner.lookup(e.entity_type));
1032                out.push_str(&format!("    \"{}\" [label=\"{}\n({})\"];\n", name, name, etype));
1033            }
1034        }
1035        for r in self.relations.iter() {
1036            let from = sanitize_label(self.interner.lookup(r.from));
1037            let to = sanitize_label(self.interner.lookup(r.to));
1038            let rt = sanitize_label(self.interner.lookup(r.relation_type));
1039            out.push_str(&format!("    \"{}\" -> \"{}\" [label=\"{}\"];\n", from, to, rt));
1040        }
1041        out.push_str("}\n");
1042        out
1043    }
1044
1045    /// Find all paths between two entities.
1046    pub fn find_all_paths(
1047        &self,
1048        from: &str,
1049        to: &str,
1050        max_depth: usize,
1051        max_paths: usize,
1052    ) -> Result<Vec<Vec<String>>> {
1053        let from_id = self
1054            .interner
1055            .get_optional(from)
1056            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
1057        let to_id = self
1058            .interner
1059            .get_optional(to)
1060            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
1061        if self.lookup_live_slot(from).is_none() {
1062            return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
1063        }
1064        if self.lookup_live_slot(to).is_none() {
1065            return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
1066        }
1067        if from_id == to_id {
1068            return Ok(vec![vec![from.to_string()]]);
1069        }
1070        let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::with_capacity(self.adjacency.len());
1071        for (&node, edges) in &*self.adjacency {
1072            let nbrs: Vec<StrId> = edges.iter().map(|(to, _)| *to).collect();
1073            adj.insert(node, nbrs);
1074        }
1075        let mut all_paths: Vec<Vec<StrId>> = Vec::new();
1076        let mut current_path = Vec::new();
1077        let mut visited: AHashSet<StrId> = AHashSet::new();
1078        visited.insert(from_id);
1079        current_path.push(from_id);
1080        Self::dfs_all_paths(
1081            &adj,
1082            from_id,
1083            to_id,
1084            max_depth,
1085            max_paths,
1086            &mut visited,
1087            &mut current_path,
1088            &mut all_paths,
1089        );
1090        if all_paths.is_empty() {
1091            return Err(MCSError::MemoryError(format!(
1092                "No path found between '{from}' and '{to}'"
1093            )));
1094        }
1095        let result: Vec<Vec<String>> = all_paths
1096            .into_iter()
1097            .map(|path| {
1098                path.into_iter()
1099                    .map(|id| self.interner.lookup(id).to_string())
1100                    .collect()
1101            })
1102            .collect();
1103        Ok(result)
1104    }
1105
1106    fn dfs_all_paths(
1107        adj: &AHashMap<StrId, Vec<StrId>>,
1108        current: StrId,
1109        target: StrId,
1110        max_depth: usize,
1111        max_paths: usize,
1112        visited: &mut AHashSet<StrId>,
1113        current_path: &mut Vec<StrId>,
1114        all_paths: &mut Vec<Vec<StrId>>,
1115    ) {
1116        if all_paths.len() >= max_paths { return; }
1117        if current == target && current_path.len() > 1 {
1118            all_paths.push(current_path.clone());
1119            return;
1120        }
1121        if current_path.len() > max_depth { return; }
1122        if let Some(neighbors) = adj.get(&current) {
1123            for &nb in neighbors {
1124                if !visited.contains(&nb) {
1125                    visited.insert(nb);
1126                    current_path.push(nb);
1127                    Self::dfs_all_paths(adj, nb, target, max_depth, max_paths, visited, current_path, all_paths);
1128                    current_path.pop();
1129                    visited.remove(&nb);
1130                }
1131            }
1132        }
1133    }
1134
1135    /// Search for entities whose name/type/observations contain `query`.
1136    pub fn search_entities(&self, query: &str) -> Result<Vec<Entity>> {
1137        let token = query.to_lowercase();
1138        let matching = self.search.search(&token, &self.interner);
1139        Ok(matching
1140            .iter()
1141            .filter_map(|idx| {
1142                self.entity_slots
1143                    .get(*idx as usize)?
1144                    .as_ref()
1145                    .map(|e| self.entity_to_output(e))
1146            })
1147            .collect())
1148    }
1149}
1150
1151impl KnowledgeGraph {
1152    pub fn new(path: &Path) -> std::io::Result<Self> {
1153        Self::open(path, None)
1154    }
1155
1156    /// Open the graph, optionally reusing an existing background-sync slot so the
1157    /// fsync thread tracks the current file across a `compact` (D1).
1158    fn open(
1159        path: &Path,
1160        sync_slot: Option<Arc<arc_swap::ArcSwap<std::fs::File>>>,
1161    ) -> std::io::Result<Self> {
1162        let store = BinaryStore::new_with_slot(path, sync_slot)?;
1163
1164        // Replay into local collections, then assign into self — no raw pointers needed (X3).
1165        let mut interner = StringInterner::with_capacity(65536, 1024);
1166        let mut entity_slots: Vec<Option<StoredEntity>> = Vec::with_capacity(256);
1167        let mut name_table = ShardedNameTable::new(64);
1168        let mut relations: Vec<StoredRelation> = Vec::with_capacity(64);
1169        let mut search = SearchIndex::new();
1170
1171        // Transaction buffer: while `Some`, records are accumulated and only
1172        // applied on `TxnCommit`. An unclosed transaction at EOF is discarded,
1173        // which is what makes multi-record operations (e.g. `merge_entities`)
1174        // crash-atomic.
1175        let mut pending: Option<Vec<(RecordKind, Vec<u8>)>> = None;
1176        store.replay(|kind, data| {
1177            match kind {
1178                RecordKind::TxnBegin => pending = Some(Vec::new()),
1179                RecordKind::TxnCommit => {
1180                    if let Some(buffered) = pending.take() {
1181                        for (k, d) in &buffered {
1182                            Self::apply_record(
1183                                *k, d, &mut interner, &mut entity_slots, &mut search,
1184                                &mut name_table, &mut relations,
1185                            );
1186                        }
1187                    }
1188                }
1189                other => match pending.as_mut() {
1190                    Some(buffered) => buffered.push((other, data.to_vec())),
1191                    None => Self::apply_record(
1192                        other, data, &mut interner, &mut entity_slots, &mut search,
1193                        &mut name_table, &mut relations,
1194                    ),
1195                },
1196            }
1197        })?;
1198
1199        // Slots tombstoned by deletes during replay are available for reuse (M2).
1200        let free_slots: Vec<u32> = entity_slots
1201            .iter()
1202            .enumerate()
1203            .filter(|(_, s)| s.is_none())
1204            .map(|(i, _)| i as u32)
1205            .collect();
1206
1207        let mut adjacency: AHashMap<StrId, Vec<(StrId, StrId)>> = AHashMap::new();
1208        for rel in &relations {
1209            adjacency.entry(rel.from).or_default().push((rel.to, rel.relation_type));
1210            adjacency.entry(rel.to).or_default().push((rel.from, rel.relation_type));
1211        }
1212
1213        Ok(Self {
1214            interner,
1215            entity_slots,
1216            free_slots,
1217            name_table,
1218            relations,
1219            adjacency,
1220            search,
1221            store,
1222        })
1223    }
1224
1225    // -----------------------------------------------------------------------
1226    // Replay helpers (static to avoid borrow issues in the closure)
1227    // -----------------------------------------------------------------------
1228
1229    /// Apply one already-decoded log record to the in-memory collections.
1230    /// Shared by direct replay and by transaction commit. `TxnBegin`/`TxnCommit`
1231    /// are handled by the caller and are no-ops here.
1232    #[allow(clippy::too_many_arguments)]
1233    fn apply_record(
1234        kind: RecordKind,
1235        data: &[u8],
1236        interner: &mut StringInterner,
1237        entity_slots: &mut Vec<Option<StoredEntity>>,
1238        search: &mut SearchIndex,
1239        name_table: &mut ShardedNameTable,
1240        relations: &mut Vec<StoredRelation>,
1241    ) {
1242        match kind {
1243            RecordKind::CreateEntity => {
1244                if let Some((name, etype, obs)) = store_enc::decode_create_entity(data) {
1245                    Self::replay_create_entity(
1246                        interner, entity_slots, search, name_table, name, etype, &obs,
1247                    );
1248                }
1249            }
1250            RecordKind::CreateRelation => {
1251                if let Some((from, to, rtype)) = store_enc::decode_create_relation(data) {
1252                    let from_id = interner.intern(from);
1253                    let to_id = interner.intern(to);
1254                    let type_id = interner.intern(rtype);
1255                    relations.push(StoredRelation {
1256                        from: from_id,
1257                        to: to_id,
1258                        relation_type: type_id,
1259                    });
1260                }
1261            }
1262            RecordKind::AddObservations => {
1263                if let Some((name, obs)) = store_enc::decode_add_observations(data) {
1264                    Self::replay_add_observations(
1265                        interner, entity_slots, search, name_table, name, &obs,
1266                    );
1267                }
1268            }
1269            RecordKind::DeleteEntity => {
1270                if let Some(name) = store_enc::decode_delete_entity(data) {
1271                    Self::replay_delete_entity(
1272                        interner, entity_slots, relations, search, name_table, name,
1273                    );
1274                }
1275            }
1276            RecordKind::DeleteObservations => {
1277                if let Some((name, obs)) = store_enc::decode_delete_observations(data) {
1278                    Self::replay_delete_observations(
1279                        interner, entity_slots, search, name_table, name, &obs,
1280                    );
1281                }
1282            }
1283            RecordKind::DeleteRelation => {
1284                if let Some((from, to, rtype)) = store_enc::decode_delete_relation(data) {
1285                    let from_id = interner.intern(from);
1286                    let to_id = interner.intern(to);
1287                    let type_id = interner.intern(rtype);
1288                    relations.retain(|r| {
1289                        !(r.from == from_id && r.to == to_id && r.relation_type == type_id)
1290                    });
1291                }
1292            }
1293            RecordKind::TxnBegin | RecordKind::TxnCommit => {}
1294        }
1295    }
1296
1297    #[allow(clippy::ptr_arg)]
1298    fn replay_create_entity(
1299        interner: &mut StringInterner,
1300        entities: &mut Vec<Option<StoredEntity>>,
1301        search: &mut SearchIndex,
1302        name_table: &mut ShardedNameTable,
1303        name: &str,
1304        etype: &str,
1305        observations: &[&str],
1306    ) {
1307        let name_id = interner.intern(name);
1308        let type_id = interner.intern(etype);
1309        let obs_ids: Vec<StrId> = observations.iter().map(|o| interner.intern(o)).collect();
1310        let slot = entities.len() as u32;
1311        entities.push(Some(StoredEntity {
1312            name: name_id,
1313            entity_type: type_id,
1314            observations: obs_ids.clone(),
1315        }));
1316        let hash = interner.get_hash(name_id);
1317        name_table.insert(&*interner, hash, name_id, slot);
1318        search.index_entity(interner, slot, name_id, type_id, &obs_ids);
1319    }
1320
1321    fn replay_add_observations(
1322        interner: &mut StringInterner,
1323        entities: &mut [Option<StoredEntity>],
1324        search: &mut SearchIndex,
1325        name_table: &mut ShardedNameTable,
1326        name: &str,
1327        observations: &[&str],
1328    ) {
1329        let name_id = interner.intern(name);
1330        let hash = interner.get_hash(name_id);
1331        if let Some(slot) = name_table.lookup(hash, name_id)
1332            && let Some(Some(entity)) = entities.get_mut(slot as usize)
1333        {
1334            for &o in observations {
1335                let oid = interner.intern(o);
1336                if !entity.observations.contains(&oid) {
1337                    entity.observations.push(oid);
1338                }
1339            }
1340            search.remove_entity(slot);
1341            search.index_entity(
1342                interner,
1343                slot,
1344                entity.name,
1345                entity.entity_type,
1346                &entity.observations,
1347            );
1348        }
1349    }
1350
1351    fn replay_delete_entity(
1352        interner: &mut StringInterner,
1353        entities: &mut [Option<StoredEntity>],
1354        rels: &mut Vec<StoredRelation>,
1355        search: &mut SearchIndex,
1356        name_table: &mut ShardedNameTable,
1357        name: &str,
1358    ) {
1359        let name_id = interner.intern(name);
1360        let hash = interner.get_hash(name_id);
1361        if let Some(slot) = name_table.lookup(hash, name_id)
1362            && let Some(Some(_)) = entities.get(slot as usize)
1363        {
1364            entities[slot as usize] = None;
1365            search.remove_entity(slot);
1366            name_table.remove(&*interner, hash, name_id);
1367        }
1368        rels.retain(|r| r.from != name_id && r.to != name_id);
1369    }
1370
1371    fn replay_delete_observations(
1372        interner: &mut StringInterner,
1373        entities: &mut [Option<StoredEntity>],
1374        search: &mut SearchIndex,
1375        name_table: &mut ShardedNameTable,
1376        name: &str,
1377        observations: &[&str],
1378    ) {
1379        let name_id = interner.intern(name);
1380        let hash = interner.get_hash(name_id);
1381        if let Some(slot) = name_table.lookup(hash, name_id)
1382            && let Some(Some(entity)) = entities.get_mut(slot as usize)
1383        {
1384            let remove_ids: Vec<StrId> = observations.iter().map(|o| interner.intern(o)).collect();
1385            entity.observations.retain(|o| !remove_ids.contains(o));
1386            search.remove_entity(slot);
1387            search.index_entity(
1388                interner,
1389                slot,
1390                entity.name,
1391                entity.entity_type,
1392                &entity.observations,
1393            );
1394        }
1395    }
1396
1397    // -----------------------------------------------------------------------
1398    // Public API
1399    // -----------------------------------------------------------------------
1400
1401    pub const fn interner(&self) -> &StringInterner {
1402        &self.interner
1403    }
1404
1405    /// Return a single entity by exact name match.
1406    pub fn get_entity(&self, name: &str) -> Option<Entity> {
1407        let name_id = self.interner.get_optional(name)?;
1408        let hash = self.interner.get_hash(name_id);
1409        let slot = self.name_table.lookup(hash, name_id)?;
1410        let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
1411        Some(self.entity_to_output(stored))
1412    }
1413
1414    /// Return aggregate statistics about the graph.
1415    pub fn graph_stats(&self) -> serde_json::Value {
1416        let live_entities = self
1417            .entity_slots
1418            .iter()
1419            .filter(|s| s.is_some())
1420            .count();
1421        let total_relations = self.relations.len();
1422        let index_entries = self.search.len();
1423        let total_obs: usize = self
1424            .entity_slots
1425            .iter()
1426            .filter_map(|s| s.as_ref())
1427            .map(|e| e.observations.len())
1428            .sum();
1429
1430        serde_json::json!({
1431            "entities": live_entities,
1432            "relations": total_relations,
1433            "totalObservations": total_obs,
1434            "searchIndexEntries": index_entries,
1435            "internedStrings": self.interner.len(),
1436            "internedBytes": self.interner.total_bytes(),
1437        })
1438    }
1439
1440    /// Search relations by optional filters: `from`, `to`, `relationType`.
1441    /// Any filter that is absent matches everything. A filter value that does
1442    /// not exist in the graph returns empty results.
1443    pub fn search_relations(&self, from: Option<&str>, to: Option<&str>, rtype: Option<&str>) -> Vec<Relation> {
1444        let from_id = match from {
1445            Some(f) => match self.interner.get_optional(f) {
1446                Some(id) => Some(id),
1447                None => return Vec::new(),
1448            },
1449            None => None,
1450        };
1451        let to_id = match to {
1452            Some(t) => match self.interner.get_optional(t) {
1453                Some(id) => Some(id),
1454                None => return Vec::new(),
1455            },
1456            None => None,
1457        };
1458        let rtype_id = match rtype {
1459            Some(r) => match self.interner.get_optional(r) {
1460                Some(id) => Some(id),
1461                None => return Vec::new(),
1462            },
1463            None => None,
1464        };
1465
1466        self.relations
1467            .iter()
1468            .filter(|r| {
1469                from_id.is_none_or(|f| r.from == f)
1470                    && to_id.is_none_or(|t| r.to == t)
1471                    && rtype_id.is_none_or(|rt| r.relation_type == rt)
1472            })
1473            .map(|r| Relation {
1474                from: self.interner.lookup(r.from).to_string(),
1475                to: self.interner.lookup(r.to).to_string(),
1476                relation_type: self.interner.lookup(r.relation_type).to_string(),
1477            })
1478            .collect()
1479    }
1480
1481    /// BFS shortest-path between two entity names. Returns the sequence of
1482    /// entity names along the path (inclusive of both endpoints).
1483    pub fn find_path(&self, from: &str, to: &str) -> Result<Vec<String>> {
1484        let from_id = self.interner.get_optional(from)
1485            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
1486        let to_id = self.interner.get_optional(to)
1487            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
1488        let hash_from = self.interner.get_hash(from_id);
1489        let hash_to = self.interner.get_hash(to_id);
1490
1491        if self.name_table.lookup(hash_from, from_id).is_none() {
1492            return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
1493        }
1494        if self.name_table.lookup(hash_to, to_id).is_none() {
1495            return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
1496        }
1497        if from_id == to_id {
1498            return Ok(vec![from.to_string()]);
1499        }
1500
1501        // Use incremental adjacency index — O(degree) per hop, no rebuild.
1502        let mut visited: AHashSet<StrId> = AHashSet::new();
1503        let mut parent: AHashMap<StrId, StrId> = AHashMap::new();
1504        let mut queue: VecDeque<StrId> = VecDeque::new();
1505
1506        visited.insert(from_id);
1507        queue.push_back(from_id);
1508
1509        while let Some(current) = queue.pop_front() {
1510            if current == to_id {
1511                break;
1512            }
1513
1514            if let Some(neighbors) = self.adjacency.get(&current) {
1515                for &(neighbor, _) in neighbors {
1516                    if visited.insert(neighbor) {
1517                        parent.insert(neighbor, current);
1518                        queue.push_back(neighbor);
1519                    }
1520                }
1521            }
1522        }
1523
1524        if !parent.contains_key(&to_id) && from_id != to_id {
1525            return Err(MCSError::MemoryError(format!(
1526                "No path found between '{from}' and '{to}'"
1527            )));
1528        }
1529
1530        // Reconstruct path
1531        let mut path: Vec<String> = Vec::new();
1532        let mut cur = to_id;
1533        loop {
1534            path.push(self.interner.lookup(cur).to_string());
1535            if cur == from_id {
1536                break;
1537            }
1538            cur = *parent.get(&cur).ok_or_else(|| {
1539                MCSError::MemoryError("Path reconstruction failed".into())
1540            })?;
1541        }
1542        path.reverse();
1543        Ok(path)
1544    }
1545
1546    /// Rewrite the binary log from the current in-memory state.
1547    /// After compaction the log contains only the minimal set of records
1548    /// needed to reconstruct the graph (all creates, no deletes).
1549    /// Crash-safe: writes to a temp file, then atomically renames (C3).
1550    pub fn compact(&mut self) -> Result<()> {
1551        // 1. Collect current state as create-records
1552        let mut create_entities: Vec<Entity> = Vec::new();
1553        let mut create_relations: Vec<Relation> = Vec::new();
1554
1555        for slot in &self.entity_slots {
1556            if let Some(stored) = slot.as_ref() {
1557                create_entities.push(self.entity_to_output(stored));
1558            }
1559        }
1560        for rel in &self.relations {
1561            create_relations.push(Relation {
1562                from: self.interner.lookup(rel.from).to_string(),
1563                to: self.interner.lookup(rel.to).to_string(),
1564                relation_type: self.interner.lookup(rel.relation_type).to_string(),
1565            });
1566        }
1567
1568        // 2. Write to a temp file first.
1569        //    Remove any stale temp left by a previously-interrupted compact:
1570        //    `BinaryStore::new` opens in append mode and only writes the MAGIC
1571        //    header when the file does not already exist, so appending to a
1572        //    leftover temp would produce a duplicated, header-corrupted log
1573        //    once renamed over the real one (C1).
1574        let tmp_path = self.store.path().with_extension("tmp");
1575        if let Err(e) = std::fs::remove_file(&tmp_path)
1576            && e.kind() != std::io::ErrorKind::NotFound
1577        {
1578            return Err(MCSError::IoError(e));
1579        }
1580        let mut tmp_store = BinaryStore::new(&tmp_path).map_err(MCSError::IoError)?;
1581        for entity in &create_entities {
1582            let mut buf = Vec::new();
1583            store_enc::encode_create_entity(&mut buf, &entity.name, &entity.entity_type, &entity.observations)
1584                .map_err(MCSError::IoError)?;
1585            tmp_store.write_record(RecordKind::CreateEntity, &buf).map_err(MCSError::IoError)?;
1586        }
1587        for relation in &create_relations {
1588            let mut buf = Vec::new();
1589            store_enc::encode_create_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
1590                .map_err(MCSError::IoError)?;
1591            tmp_store.write_record(RecordKind::CreateRelation, &buf).map_err(MCSError::IoError)?;
1592        }
1593        tmp_store.flush_and_sync().map_err(MCSError::IoError)?;
1594        drop(tmp_store);
1595
1596        // 3. Atomically rename over the original (atomic on POSIX), then fsync
1597        //    the containing directory so the rename itself is durable across a
1598        //    crash — content swap is atomic, but the directory entry update is
1599        //    not durable until the dir is synced (C2).
1600        std::fs::rename(&tmp_path, self.store.path()).map_err(MCSError::IoError)?;
1601        sync_parent_dir(self.store.path()).map_err(MCSError::IoError)?;
1602
1603        // 4. Rebuild the entire in-memory graph from the compacted log (M1/M2).
1604        //    Replaying into fresh structures reclaims the interner arena (stale
1605        //    strings from deleted/edited entities), tombstoned entity slots, and
1606        //    stale search-index entries — none of which the old reopen-only path
1607        //    reclaimed.
1608        let path = self.store.path().clone();
1609        // Reuse the existing sync slot so the background fsync thread follows the
1610        // compacted file instead of the renamed-away original (D1).
1611        let sync_slot = Arc::clone(&self.store.sync_slot);
1612        *self = KnowledgeGraph::open(&path, Some(sync_slot)).map_err(MCSError::IoError)?;
1613
1614        Ok(())
1615    }
1616
1617    /// Compact if the tombstones / deleted-string waste exceeds a heuristic
1618    /// threshold — otherwise do nothing.
1619    pub fn compact_if_needed(&mut self) -> Result<()> {
1620        // When more than 30% of entity slots are tombstones, compaction is
1621        // likely to reclaim significant interner arena space that is only
1622        // reachable from deleted entities. The check is O(1).
1623        let total = self.entity_slots.len();
1624        let tombstones = self.free_slots.len();
1625        if total > 16 && tombstones * 10 > total * 3 {
1626            self.compact()?;
1627        }
1628        Ok(())
1629    }
1630
1631    // ---- Public API with write-ahead log (C1) and error propagation ----
1632
1633    pub fn create_entities(&mut self, entities: &[Entity]) -> Result<Vec<Entity>> {
1634        // Validate up front so an invalid entity never produces partial writes.
1635        for entity in entities {
1636            if entity.name.is_empty() {
1637                return Err(MCSError::InvalidParams(
1638                    "Entity name must not be empty".into(),
1639                ));
1640            }
1641        }
1642        let mut records: Vec<(RecordKind, Vec<u8>)> = Vec::new();
1643        let mut created = Vec::new();
1644        for entity in entities {
1645            let existing = self.interner.get_optional(&entity.name)
1646                .and_then(|id| {
1647                    let hash = self.interner.get_hash(id);
1648                    self.name_table.lookup(hash, id)
1649                });
1650            if existing.is_some() {
1651                continue;
1652            }
1653            let mut buf = Vec::new();
1654            store_enc::encode_create_entity(&mut buf, &entity.name, &entity.entity_type, &entity.observations)
1655                .map_err(MCSError::IoError)?;
1656            records.push((RecordKind::CreateEntity, buf));
1657            created.push(entity.clone());
1658        }
1659        if records.is_empty() {
1660            return Ok(created);
1661        }
1662        // Write-ahead, transactionally: begin, all records, commit.
1663        self.store.write_record(RecordKind::TxnBegin, &[]).map_err(MCSError::IoError)?;
1664        for (kind, data) in &records {
1665            self.store.write_record(*kind, data).map_err(MCSError::IoError)?;
1666        }
1667        self.store.write_record(RecordKind::TxnCommit, &[]).map_err(MCSError::IoError)?;
1668
1669        // Logged and committed — now apply to in-memory state.
1670        for entity in &created {
1671            let name_id = self.interner.intern(&entity.name);
1672            let hash = self.interner.get_hash(name_id);
1673            let type_id = self.interner.intern(&entity.entity_type);
1674            let obs_ids: Vec<StrId> = entity
1675                .observations
1676                .iter()
1677                .map(|o| self.interner.intern(o))
1678                .collect();
1679            let reused = self.free_slots.pop();
1680            let slot = reused.unwrap_or(self.entity_slots.len() as u32);
1681            self.search
1682                .index_entity(&mut self.interner, slot, name_id, type_id, &obs_ids);
1683            let stored = Some(StoredEntity {
1684                name: name_id,
1685                entity_type: type_id,
1686                observations: obs_ids,
1687            });
1688            match reused {
1689                Some(s) => self.entity_slots[s as usize] = stored,
1690                None => self.entity_slots.push(stored),
1691            }
1692            self.name_table.insert(&self.interner, hash, name_id, slot);
1693        }
1694        Ok(created)
1695    }
1696
1697    pub fn create_relations(&mut self, relations: &[Relation]) -> Result<Vec<Relation>> {
1698        // Validate up front so an invalid relation never produces partial writes.
1699        for relation in relations {
1700            if relation.from.is_empty() || relation.to.is_empty() {
1701                return Err(MCSError::InvalidParams(
1702                    "Relation endpoints must not be empty".into(),
1703                ));
1704            }
1705        }
1706        // Build a dedup set for O(1) duplicate checks (P5)
1707        let mut rel_set: AHashSet<(StrId, StrId, StrId)> = AHashSet::new();
1708        for rel in &self.relations {
1709            rel_set.insert((rel.from, rel.to, rel.relation_type));
1710        }
1711        let mut records: Vec<(RecordKind, Vec<u8>)> = Vec::new();
1712        let mut interned: Vec<StoredRelation> = Vec::new();
1713        for relation in relations {
1714            let from_id = self.interner.intern(&relation.from);
1715            let to_id = self.interner.intern(&relation.to);
1716            let type_id = self.interner.intern(&relation.relation_type);
1717            if !rel_set.insert((from_id, to_id, type_id)) {
1718                continue;
1719            }
1720            let mut buf = Vec::new();
1721            store_enc::encode_create_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
1722                .map_err(MCSError::IoError)?;
1723            records.push((RecordKind::CreateRelation, buf));
1724            interned.push(StoredRelation {
1725                from: from_id,
1726                to: to_id,
1727                relation_type: type_id,
1728            });
1729        }
1730        if records.is_empty() {
1731            return Ok(Vec::new());
1732        }
1733        // Write-ahead, transactionally.
1734        self.store.write_record(RecordKind::TxnBegin, &[]).map_err(MCSError::IoError)?;
1735        for (kind, data) in &records {
1736            self.store.write_record(*kind, data).map_err(MCSError::IoError)?;
1737        }
1738        self.store.write_record(RecordKind::TxnCommit, &[]).map_err(MCSError::IoError)?;
1739
1740        // Logged and committed — now apply to in-memory state.
1741        let mut created = Vec::new();
1742        for sr in &interned {
1743            self.relations.push(StoredRelation {
1744                from: sr.from,
1745                to: sr.to,
1746                relation_type: sr.relation_type,
1747            });
1748            self.adjacency.entry(sr.from).or_default().push((sr.to, sr.relation_type));
1749            self.adjacency.entry(sr.to).or_default().push((sr.from, sr.relation_type));
1750            created.push(Relation {
1751                from: self.interner.lookup(sr.from).to_string(),
1752                to: self.interner.lookup(sr.to).to_string(),
1753                relation_type: self.interner.lookup(sr.relation_type).to_string(),
1754            });
1755        }
1756        Ok(created)
1757    }
1758
1759    pub fn add_observations(&mut self, entity_name: &str, contents: &[String]) -> Result<Vec<String>> {
1760        let name_id = self.interner.get_optional(entity_name)
1761            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1762        let hash = self.interner.get_hash(name_id);
1763        let slot = self
1764            .name_table
1765            .lookup(hash, name_id)
1766            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1767        // Snapshot the current observations so we can compute the deduplicated
1768        // additions *without* mutating in-memory state yet.
1769        let existing: AHashSet<StrId> = self
1770            .entity_slots
1771            .get(slot as usize)
1772            .and_then(|e| e.as_ref())
1773            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?
1774            .observations
1775            .iter()
1776            .copied()
1777            .collect();
1778
1779        // Deduplicate against existing observations *and* within this batch, so
1780        // the live result matches what replay (which dedups one-by-one) rebuilds.
1781        let mut added = Vec::new();
1782        let mut interned_added = Vec::new();
1783        let mut seen: AHashSet<StrId> = AHashSet::new();
1784        for content in contents {
1785            let cid = self.interner.intern(content);
1786            if existing.contains(&cid) || !seen.insert(cid) {
1787                continue;
1788            }
1789            interned_added.push(cid);
1790            added.push(content.clone());
1791        }
1792        if added.is_empty() {
1793            return Ok(added);
1794        }
1795
1796        // Write-ahead: the record must hit the log *before* any in-memory
1797        // mutation, so a failed write leaves memory and disk in agreement (C3).
1798        let mut buf = Vec::new();
1799        store_enc::encode_add_observations(&mut buf, entity_name, &added)
1800            .map_err(MCSError::IoError)?;
1801        self.store.write_record(RecordKind::AddObservations, &buf)
1802            .map_err(MCSError::IoError)?;
1803
1804        // Logged — now apply to in-memory state.
1805        let stored = self
1806            .entity_slots
1807            .get_mut(slot as usize)
1808            .and_then(|e| e.as_mut())
1809            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1810        stored.observations.extend_from_slice(&interned_added);
1811
1812        // Incrementally index only the new observation tokens (P3) — no
1813        // full remove + re-index of the whole entity.
1814        self.search
1815            .index_additional(&mut self.interner, slot, &interned_added);
1816        Ok(added)
1817    }
1818
1819    pub fn delete_entities(&mut self, entity_names: &[String]) -> Result<()> {
1820        let mut records: Vec<(RecordKind, Vec<u8>)> = Vec::new();
1821        let mut deleted_names = Vec::new();
1822        for name in entity_names {
1823            let name_id_opt = self.interner.get_optional(name);
1824            if let Some(name_id) = name_id_opt {
1825                let hash = self.interner.get_hash(name_id);
1826                if let Some(slot) = self.name_table.lookup(hash, name_id)
1827                    && let Some(Some(_)) = self.entity_slots.get(slot as usize)
1828                {
1829                    let mut buf = Vec::new();
1830                    store_enc::encode_delete_entity(&mut buf, name)
1831                        .map_err(MCSError::IoError)?;
1832                    records.push((RecordKind::DeleteEntity, buf));
1833                    deleted_names.push((name.clone(), name_id, hash, slot));
1834                }
1835            }
1836        }
1837        if records.is_empty() {
1838            return Ok(());
1839        }
1840        // Write-ahead, transactionally.
1841        self.store.write_record(RecordKind::TxnBegin, &[]).map_err(MCSError::IoError)?;
1842        for (kind, data) in &records {
1843            self.store.write_record(*kind, data).map_err(MCSError::IoError)?;
1844        }
1845        self.store.write_record(RecordKind::TxnCommit, &[]).map_err(MCSError::IoError)?;
1846
1847        // Logged and committed — now apply to in-memory state.
1848        for (_name, _name_id, _hash, slot) in &deleted_names {
1849            self.entity_slots[*slot as usize] = None;
1850            self.free_slots.push(*slot);
1851            self.search.remove_entity(*slot);
1852            self.name_table.remove(&self.interner, *_hash, *_name_id);
1853        }
1854        let deleted_ids: AHashSet<StrId> = deleted_names.iter()
1855            .map(|(_, id, _, _)| *id)
1856            .collect();
1857        self.relations
1858            .retain(|r| !deleted_ids.contains(&r.from) && !deleted_ids.contains(&r.to));
1859        for id in &deleted_ids {
1860            self.adjacency.remove(id);
1861        }
1862        for list in self.adjacency.values_mut() {
1863            list.retain(|(to, _)| !deleted_ids.contains(to));
1864        }
1865        self.compact_if_needed()?;
1866        Ok(())
1867    }
1868
1869    pub fn delete_observations(&mut self, entity_name: &str, observations: &[String]) -> Result<()> {
1870        let name_id = self.interner.get_optional(entity_name)
1871            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1872        let hash = self.interner.get_hash(name_id);
1873        let slot = self
1874            .name_table
1875            .lookup(hash, name_id)
1876            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1877        // Confirm the slot is live before logging.
1878        self.entity_slots
1879            .get(slot as usize)
1880            .and_then(|e| e.as_ref())
1881            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1882        let remove_ids: AHashSet<StrId> = observations.iter().map(|o| self.interner.intern(o)).collect();
1883
1884        // Write-ahead: log before touching in-memory state (C3).
1885        let mut buf = Vec::new();
1886        store_enc::encode_delete_observations(&mut buf, entity_name, observations)
1887            .map_err(MCSError::IoError)?;
1888        self.store.write_record(RecordKind::DeleteObservations, &buf)
1889            .map_err(MCSError::IoError)?;
1890
1891        // Logged — now apply.
1892        let stored = self
1893            .entity_slots
1894            .get_mut(slot as usize)
1895            .and_then(|e| e.as_mut())
1896            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1897        stored.observations.retain(|o| !remove_ids.contains(o));
1898        self.search.remove_entity(slot);
1899        self.search
1900            .index_entity(&mut self.interner, slot, stored.name, stored.entity_type, &stored.observations);
1901        Ok(())
1902    }
1903
1904    pub fn delete_relations(&mut self, relations: &[Relation]) -> Result<()> {
1905        // Collect targets into a AHashSet for O(1) retain checks (P5)
1906        let rels: AHashSet<(StrId, StrId, StrId)> = relations
1907            .iter()
1908            .map(|r| {
1909                (
1910                    self.interner.intern(&r.from),
1911                    self.interner.intern(&r.to),
1912                    self.interner.intern(&r.relation_type),
1913                )
1914            })
1915            .collect();
1916        if rels.is_empty() {
1917            return Ok(());
1918        }
1919        // Write-ahead, transactionally.
1920        let mut records: Vec<(RecordKind, Vec<u8>)> = Vec::new();
1921        for relation in relations {
1922            let mut buf = Vec::new();
1923            store_enc::encode_delete_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
1924                .map_err(MCSError::IoError)?;
1925            records.push((RecordKind::DeleteRelation, buf));
1926        }
1927        self.store.write_record(RecordKind::TxnBegin, &[]).map_err(MCSError::IoError)?;
1928        for (kind, data) in &records {
1929            self.store.write_record(*kind, data).map_err(MCSError::IoError)?;
1930        }
1931        self.store.write_record(RecordKind::TxnCommit, &[]).map_err(MCSError::IoError)?;
1932
1933        // Logged and committed — now apply to in-memory state.
1934        self.relations
1935            .retain(|r| !rels.contains(&(r.from, r.to, r.relation_type)));
1936        for (f, t, rt) in &rels {
1937            if let Some(edges) = self.adjacency.get_mut(f) {
1938                edges.retain(|(to, rtype)| to != t || rtype != rt);
1939                if edges.is_empty() {
1940                    self.adjacency.remove(f);
1941                }
1942            }
1943            if let Some(edges) = self.adjacency.get_mut(t) {
1944                edges.retain(|(to, rtype)| to != f || rtype != rt);
1945                if edges.is_empty() {
1946                    self.adjacency.remove(t);
1947                }
1948            }
1949        }
1950        Ok(())
1951    }
1952
1953    pub fn read_graph(&self) -> KnowledgeGraphOut {
1954        self.read_graph_view().to_owned_out()
1955    }
1956
1957    /// Borrowing, allocation-light view of the full graph (M6). Serializing it
1958    /// streams interned `&str` directly instead of materializing a `String`
1959    /// per name/type/observation.
1960    pub fn read_graph_view(&self) -> GraphView<'_> {
1961        let entities: Vec<&StoredEntity> = self
1962            .entity_slots
1963            .iter()
1964            .filter_map(|s| s.as_ref())
1965            .collect();
1966        let relations: Vec<&StoredRelation> = self.relations.iter().collect();
1967        GraphView { kg: self, entities, relations }
1968    }
1969
1970    /// Relevance-ranked substring search returning all matches (no pagination).
1971    /// Equivalent to `search_nodes_filtered(query, None, 0, usize::MAX)`.
1972    pub fn search_nodes(&self, query: &str) -> KnowledgeGraphOut {
1973        self.search_nodes_filtered(query, None, 0, usize::MAX)
1974    }
1975
1976    pub fn open_nodes(&self, names: &[String]) -> KnowledgeGraphOut {
1977        self.open_nodes_view(names).to_owned_out()
1978    }
1979
1980    /// Borrowing view variant of [`open_nodes`] (M6).
1981    pub fn open_nodes_view(&self, names: &[String]) -> GraphView<'_> {
1982        let name_ids: AHashSet<StrId> = names.iter()
1983            .filter_map(|n| self.interner.get_optional(n))
1984            .collect();
1985        let entities: Vec<&StoredEntity> = self
1986            .entity_slots
1987            .iter()
1988            .filter_map(|s| {
1989                s.as_ref()
1990                    .filter(|stored| name_ids.contains(&stored.name))
1991            })
1992            .collect();
1993        let matched_names: AHashSet<StrId> = entities.iter().map(|e| e.name).collect();
1994        let relations: Vec<&StoredRelation> = self
1995            .relations
1996            .iter()
1997            .filter(|r| matched_names.contains(&r.from) || matched_names.contains(&r.to))
1998            .collect();
1999        GraphView { kg: self, entities, relations }
2000    }
2001
2002    // -----------------------------------------------------------------------
2003    // Internal helpers
2004    // -----------------------------------------------------------------------
2005
2006    fn entity_to_output(&self, stored: &StoredEntity) -> Entity {
2007        Entity {
2008            name: self.interner.lookup(stored.name).to_string(),
2009            entity_type: self.interner.lookup(stored.entity_type).to_string(),
2010            observations: stored
2011                .observations
2012                .iter()
2013                .map(|o| self.interner.lookup(*o).to_string())
2014                .collect(),
2015        }
2016    }
2017
2018    #[inline]
2019    fn relation_to_output(&self, r: &StoredRelation) -> Relation {
2020        Relation {
2021            from: self.interner.lookup(r.from).to_string(),
2022            to: self.interner.lookup(r.to).to_string(),
2023            relation_type: self.interner.lookup(r.relation_type).to_string(),
2024        }
2025    }
2026
2027    /// Resolve a name to its live entity slot, or `None` if absent/deleted.
2028    fn lookup_live_slot(&self, name: &str) -> Option<u32> {
2029        let name_id = self.interner.get_optional(name)?;
2030        let hash = self.interner.get_hash(name_id);
2031        let slot = self.name_table.lookup(hash, name_id)?;
2032        self.entity_slots.get(slot as usize)?.as_ref()?;
2033        Some(slot)
2034    }
2035
2036    /// Materialize a live entity from its interned name id.
2037    fn entity_by_name_id(&self, name_id: StrId) -> Option<Entity> {
2038        let hash = self.interner.get_hash(name_id);
2039        let slot = self.name_table.lookup(hash, name_id)?;
2040        let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
2041        Some(self.entity_to_output(stored))
2042    }
2043
2044    /// Tally distinct entity types and their live-entity counts, ranked by
2045    /// count descending (ties broken by name). One linear pass over the dense
2046    /// slot vec; only the final names are allocated.
2047    pub fn entity_type_counts(&self) -> Vec<(String, usize)> {
2048        let mut counts: AHashMap<StrId, usize> = AHashMap::new();
2049        for st in self
2050            .entity_slots
2051            .iter()
2052            .filter_map(|s| s.as_ref())
2053        {
2054            *counts.entry(st.entity_type).or_insert(0) += 1;
2055        }
2056        self.rank_counts(counts)
2057    }
2058
2059    /// Tally distinct relation types and their counts, ranked by count desc.
2060    pub fn relation_type_counts(&self) -> Vec<(String, usize)> {
2061        let mut counts: AHashMap<StrId, usize> = AHashMap::new();
2062        for r in &self.relations {
2063            *counts.entry(r.relation_type).or_insert(0) += 1;
2064        }
2065        self.rank_counts(counts)
2066    }
2067
2068    fn rank_counts(&self, counts: AHashMap<StrId, usize>) -> Vec<(String, usize)> {
2069        let mut out: Vec<(String, usize)> = counts
2070            .into_iter()
2071            .map(|(id, c)| (self.interner.lookup(id).to_string(), c))
2072            .collect();
2073        out.sort_unstable_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
2074        out
2075    }
2076
2077    /// Relevance-ranked, optionally type-filtered, paginated node search.
2078    /// Entities come back best-match-first (see [`SearchIndex::search_ranked`]).
2079    /// Relations touching any returned entity (either endpoint) are included.
2080    pub fn search_nodes_filtered(
2081        &self,
2082        query: &str,
2083        entity_type: Option<&str>,
2084        offset: usize,
2085        limit: usize,
2086    ) -> KnowledgeGraphOut {
2087        self.search_nodes_view(query, entity_type, offset, limit).to_owned_out()
2088    }
2089
2090    /// Borrowing view variant of [`search_nodes_filtered`] (M6).
2091    pub fn search_nodes_view(
2092        &self,
2093        query: &str,
2094        entity_type: Option<&str>,
2095        offset: usize,
2096        limit: usize,
2097    ) -> GraphView<'_> {
2098        let type_id = match entity_type {
2099            Some(t) => match self.interner.get_optional(t) {
2100                Some(id) => Some(id),
2101                None => return GraphView { kg: self, entities: Vec::new(), relations: Vec::new() },
2102            },
2103            None => None,
2104        };
2105
2106        let ranked = self.search.search_ranked(query, &self.interner);
2107        let mut selected: AHashSet<StrId> = AHashSet::new();
2108        let mut entities: Vec<&StoredEntity> = Vec::new();
2109        let mut skipped = 0usize;
2110        for (slot, _score) in ranked {
2111            let Some(st) = self
2112                .entity_slots
2113                .get(slot as usize)
2114                .and_then(|s| s.as_ref())
2115                
2116            else {
2117                continue;
2118            };
2119            if type_id.is_some_and(|tid| st.entity_type != tid) {
2120                continue;
2121            }
2122            if skipped < offset {
2123                skipped += 1;
2124                continue;
2125            }
2126            if entities.len() >= limit {
2127                break;
2128            }
2129            selected.insert(st.name);
2130            entities.push(st);
2131        }
2132
2133        let relations: Vec<&StoredRelation> = self
2134            .relations
2135            .iter()
2136            .filter(|r| selected.contains(&r.from) || selected.contains(&r.to))
2137            .collect();
2138        GraphView { kg: self, entities, relations }
2139    }
2140
2141    /// Type-filtered, paginated view of the whole graph. Unlike [`read_graph`],
2142    /// relations are restricted to those whose **both** endpoints fall in the
2143    /// returned entity page, so the slice is internally consistent.
2144    pub fn read_graph_filtered(
2145        &self,
2146        entity_type: Option<&str>,
2147        offset: usize,
2148        limit: usize,
2149    ) -> KnowledgeGraphOut {
2150        self.read_graph_filtered_view(entity_type, offset, limit).to_owned_out()
2151    }
2152
2153    /// Borrowing view variant of [`read_graph_filtered`] (M6).
2154    pub fn read_graph_filtered_view(
2155        &self,
2156        entity_type: Option<&str>,
2157        offset: usize,
2158        limit: usize,
2159    ) -> GraphView<'_> {
2160        let type_id = match entity_type {
2161            Some(t) => match self.interner.get_optional(t) {
2162                Some(id) => Some(id),
2163                None => return GraphView { kg: self, entities: Vec::new(), relations: Vec::new() },
2164            },
2165            None => None,
2166        };
2167
2168        let mut selected: AHashSet<StrId> = AHashSet::new();
2169        let mut entities: Vec<&StoredEntity> = Vec::new();
2170        let mut skipped = 0usize;
2171        for st in self
2172            .entity_slots
2173            .iter()
2174            .filter_map(|s| s.as_ref())
2175        {
2176            if type_id.is_some_and(|tid| st.entity_type != tid) {
2177                continue;
2178            }
2179            if skipped < offset {
2180                skipped += 1;
2181                continue;
2182            }
2183            if entities.len() >= limit {
2184                break;
2185            }
2186            selected.insert(st.name);
2187            entities.push(st);
2188        }
2189
2190        let relations: Vec<&StoredRelation> = self
2191            .relations
2192            .iter()
2193            .filter(|r| selected.contains(&r.from) && selected.contains(&r.to))
2194            .collect();
2195        GraphView { kg: self, entities, relations }
2196    }
2197
2198    /// Neighborhood expansion around `name` out to `depth` hops, following
2199    /// edges in the requested [`Direction`] and (optionally) of one relation
2200    /// type. Returns the origin plus reached entities, and every relation
2201    /// (passing the type filter) whose endpoints are both inside that set.
2202    ///
2203    /// `depth == 1` (the common case) is a single linear pass over the flat
2204    /// relation vec; deeper queries build an adjacency map once (O(E)) and BFS.
2205    pub fn neighbors(
2206        &self,
2207        name: &str,
2208        direction: Direction,
2209        rtype: Option<&str>,
2210        depth: u32,
2211    ) -> Result<KnowledgeGraphOut> {
2212        self.lookup_live_slot(name)
2213            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
2214        // Safe: lookup_live_slot succeeded, so the name is interned.
2215        let start = self.interner.get_optional(name).unwrap();
2216
2217        // An unknown relation-type filter can match nothing: return just origin.
2218        let rtype_id = match rtype {
2219            Some(r) => match self.interner.get_optional(r) {
2220                Some(id) => Some(id),
2221                None => {
2222                    let entities = self.entity_by_name_id(start).into_iter().collect();
2223                    return Ok(KnowledgeGraphOut { entities, relations: Vec::new() });
2224                }
2225            },
2226            None => None,
2227        };
2228
2229        let mut visited: AHashSet<StrId> = AHashSet::new();
2230        visited.insert(start);
2231
2232        let type_ok = |r: &StoredRelation| rtype_id.is_none_or(|rt| r.relation_type == rt);
2233
2234        if depth == 1 {
2235            for r in self.relations.iter().filter(|r| type_ok(r)) {
2236                match direction {
2237                    Direction::Out => {
2238                        if r.from == start {
2239                            visited.insert(r.to);
2240                        }
2241                    }
2242                    Direction::In => {
2243                        if r.to == start {
2244                            visited.insert(r.from);
2245                        }
2246                    }
2247                    Direction::Both => {
2248                        if r.from == start {
2249                            visited.insert(r.to);
2250                        } else if r.to == start {
2251                            visited.insert(r.from);
2252                        }
2253                    }
2254                }
2255            }
2256        } else if depth >= 2 {
2257            // Build a direction-aware adjacency map once, then BFS.
2258            // For Direction::Both we use the incremental adjacency index;
2259            // for Direction::Out/In we filter relations directly.
2260            let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
2261            match direction {
2262                Direction::Both => {
2263                    for (&node, edges) in &self.adjacency {
2264                        for &(nb, rt) in edges {
2265                            if rtype_id.is_none_or(|rt_id| rt == rt_id) {
2266                                adj.entry(node).or_default().push(nb);
2267                            }
2268                        }
2269                    }
2270                }
2271                Direction::Out | Direction::In => {
2272                    for r in self.relations.iter().filter(|r| type_ok(r)) {
2273                        match direction {
2274                            Direction::Out => adj.entry(r.from).or_default().push(r.to),
2275                            Direction::In => adj.entry(r.to).or_default().push(r.from),
2276                            _ => unreachable!(),
2277                        }
2278                    }
2279                }
2280            }
2281            let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
2282            queue.push_back((start, 0));
2283            while let Some((node, d)) = queue.pop_front() {
2284                if d >= depth {
2285                    continue;
2286                }
2287                if let Some(nbrs) = adj.get(&node) {
2288                    for &nb in nbrs {
2289                        if visited.insert(nb) {
2290                            queue.push_back((nb, d + 1));
2291                        }
2292                    }
2293                }
2294            }
2295        }
2296
2297        let mut entities = Vec::with_capacity(visited.len());
2298        for &nid in &visited {
2299            if let Some(e) = self.entity_by_name_id(nid) {
2300                entities.push(e);
2301            }
2302        }
2303        let relations = self
2304            .relations
2305            .iter()
2306            .filter(|r| type_ok(r) && visited.contains(&r.from) && visited.contains(&r.to))
2307            .map(|r| self.relation_to_output(r))
2308            .collect();
2309        Ok(KnowledgeGraphOut { entities, relations })
2310    }
2311
2312    /// One-shot context bundle for a single entity: the entity itself, every
2313    /// incident relation, its distinct neighbor names, and its degree. Saves an
2314    /// agent the get_entity + two search_relations round-trips.
2315    pub fn describe_entity(&self, name: &str) -> Result<serde_json::Value> {
2316        let name_id = self
2317            .interner
2318            .get_optional(name)
2319            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
2320        let entity = self
2321            .entity_by_name_id(name_id)
2322            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
2323
2324        let mut incident: Vec<Relation> = Vec::new();
2325        let mut neighbor_seen: AHashSet<StrId> = AHashSet::new();
2326        let mut neighbors: Vec<&str> = Vec::new();
2327        for r in &self.relations {
2328            if r.from == name_id || r.to == name_id {
2329                incident.push(self.relation_to_output(r));
2330                let other = if r.from == name_id { r.to } else { r.from };
2331                if other != name_id && neighbor_seen.insert(other) {
2332                    neighbors.push(self.interner.lookup(other));
2333                }
2334            }
2335        }
2336
2337        Ok(serde_json::json!({
2338            "entity": entity,
2339            "relations": incident,
2340            "neighbors": neighbors,
2341            "degree": incident.len(),
2342        }))
2343    }
2344
2345    /// Create-or-merge a batch of entities idempotently. Missing entities are
2346    /// created; existing ones keep their type and gain any new observations
2347    /// (deduplicated). Returns a per-entity outcome. The caller is responsible
2348    /// for flushing — every underlying op is already write-ahead logged.
2349    pub fn upsert_entities(&mut self, entities: &[Entity]) -> Result<Vec<serde_json::Value>> {
2350        for e in entities {
2351            if e.name.is_empty() {
2352                return Err(MCSError::InvalidParams(
2353                    "Entity name must not be empty".into(),
2354                ));
2355            }
2356        }
2357        let mut out = Vec::with_capacity(entities.len());
2358        for e in entities {
2359            if self.lookup_live_slot(&e.name).is_some() {
2360                let added = self.add_observations(&e.name, &e.observations)?;
2361                out.push(serde_json::json!({
2362                    "name": e.name,
2363                    "created": false,
2364                    "addedObservations": added,
2365                }));
2366            } else {
2367                let created = self.create_entities(std::slice::from_ref(e))?;
2368                out.push(serde_json::json!({
2369                    "name": e.name,
2370                    "created": !created.is_empty(),
2371                    "addedObservations": e.observations,
2372                }));
2373            }
2374        }
2375        Ok(out)
2376    }
2377
2378    /// Serialize the graph in one of: `json` (read_graph), `mermaid`, `dot`.
2379    pub fn export(&self, format: &str) -> Result<String> {
2380        match format {
2381            "json" => serde_json::to_string(&self.read_graph()).map_err(MCSError::JsonError),
2382            "mermaid" => Ok(self.export_mermaid()),
2383            "dot" => Ok(self.export_dot()),
2384            other => Err(MCSError::InvalidParams(format!(
2385                "Unknown export format '{other}' (expected json|mermaid|dot)"
2386            ))),
2387        }
2388    }
2389
2390    /// Assign each live entity a stable `n{k}` node id for diagram output.
2391    fn diagram_node_ids(&self) -> (AHashMap<StrId, usize>, Vec<(usize, StrId)>) {
2392        let mut ids: AHashMap<StrId, usize> = AHashMap::new();
2393        let mut order: Vec<(usize, StrId)> = Vec::new();
2394        for st in self
2395            .entity_slots
2396            .iter()
2397            .filter_map(|s| s.as_ref())
2398        {
2399            let n = ids.len();
2400            ids.insert(st.name, n);
2401            order.push((n, st.name));
2402        }
2403        (ids, order)
2404    }
2405
2406    fn export_mermaid(&self) -> String {
2407        let (ids, order) = self.diagram_node_ids();
2408        let mut s = String::with_capacity(64 + order.len() * 32 + self.relations.len() * 32);
2409        s.push_str("graph LR\n");
2410        for (n, name_id) in &order {
2411            let label = sanitize_label(self.interner.lookup(*name_id));
2412            s.push_str(&format!("  n{n}[\"{label}\"]\n"));
2413        }
2414        for r in &self.relations {
2415            if let (Some(&a), Some(&b)) = (ids.get(&r.from), ids.get(&r.to)) {
2416                let rel = sanitize_label(self.interner.lookup(r.relation_type));
2417                s.push_str(&format!("  n{a} -->|{rel}| n{b}\n"));
2418            }
2419        }
2420        s
2421    }
2422
2423    fn export_dot(&self) -> String {
2424        let (ids, order) = self.diagram_node_ids();
2425        let mut s = String::with_capacity(64 + order.len() * 32 + self.relations.len() * 32);
2426        s.push_str("digraph G {\n");
2427        for (n, name_id) in &order {
2428            let label = sanitize_label(self.interner.lookup(*name_id));
2429            s.push_str(&format!("  n{n} [label=\"{label}\"];\n"));
2430        }
2431        for r in &self.relations {
2432            if let (Some(&a), Some(&b)) = (ids.get(&r.from), ids.get(&r.to)) {
2433                let rel = sanitize_label(self.interner.lookup(r.relation_type));
2434                s.push_str(&format!("  n{a} -> n{b} [label=\"{rel}\"];\n"));
2435            }
2436        }
2437        s.push_str("}\n");
2438        s
2439    }
2440
2441    // ------ High-level productivity tools ------
2442
2443    /// Merge `source` entity into `target` entity. All observations from
2444    /// source are moved to target (deduplicated), all relations involving
2445    /// source are redirected to target (deduplicated), and source is then
2446    /// deleted.
2447    ///
2448    /// The whole merge is **atomic**: every sub-record is written to the log
2449    /// inside a single `TxnBegin`…`TxnCommit` transaction *before* any in-memory
2450    /// mutation. A failed or torn write therefore leaves both memory and the
2451    /// log untouched (an uncommitted transaction is discarded on replay), so the
2452    /// graph can never observe a half-applied merge. Caller flushes.
2453    pub fn merge_entities(&mut self, source: &str, target: &str) -> Result<serde_json::Value> {
2454        if source == target {
2455            return Err(MCSError::InvalidParams(
2456                "Source and target must be different entities".into(),
2457            ));
2458        }
2459        self.lookup_live_slot(source).ok_or_else(|| {
2460            MCSError::InvalidParams(format!("Source entity '{source}' not found"))
2461        })?;
2462        let target_slot = self.lookup_live_slot(target).ok_or_else(|| {
2463            MCSError::InvalidParams(format!("Target entity '{target}' not found"))
2464        })?;
2465
2466        let source_entity = self.get_entity(source).unwrap();
2467        let moved_obs_count = source_entity.observations.len();
2468        let source_id = self.interner.get_optional(source).unwrap();
2469        let target_id = self.interner.get_optional(target).unwrap();
2470
2471        // Observations to move: dedup against target's existing set and within
2472        // the batch (matching what `add_observations` would have done).
2473        let target_existing: AHashSet<StrId> = self.entity_slots[target_slot as usize]
2474            .as_ref()
2475            .unwrap()
2476            .observations
2477            .iter()
2478            .copied()
2479            .collect();
2480        let mut obs_seen: AHashSet<StrId> = AHashSet::new();
2481        let mut obs_to_add: Vec<String> = Vec::new();
2482        for o in &source_entity.observations {
2483            if let Some(oid) = self.interner.get_optional(o)
2484                && !target_existing.contains(&oid)
2485                && obs_seen.insert(oid)
2486            {
2487                obs_to_add.push(o.clone());
2488            }
2489        }
2490
2491        // Relations to redirect: replace source with target, drop self-loops,
2492        // and dedup against existing relations and within the batch.
2493        let existing_rels: AHashSet<(StrId, StrId, StrId)> =
2494            self.relations.iter().map(|r| (r.from, r.to, r.relation_type)).collect();
2495        let mut rel_seen: AHashSet<(StrId, StrId, StrId)> = AHashSet::new();
2496        let mut redirect: Vec<Relation> = Vec::new();
2497        for r in &self.relations {
2498            if r.from != source_id && r.to != source_id {
2499                continue;
2500            }
2501            let new_from = if r.from == source_id { target_id } else { r.from };
2502            let new_to = if r.to == source_id { target_id } else { r.to };
2503            if new_from == new_to {
2504                continue; // self-loop after redirect
2505            }
2506            let key = (new_from, new_to, r.relation_type);
2507            if existing_rels.contains(&key) || !rel_seen.insert(key) {
2508                continue;
2509            }
2510            redirect.push(Relation {
2511                from: self.interner.lookup(new_from).to_string(),
2512                to: self.interner.lookup(new_to).to_string(),
2513                relation_type: self.interner.lookup(r.relation_type).to_string(),
2514            });
2515        }
2516
2517        let added_count = obs_to_add.len();
2518        let redirected = redirect.len() as u32;
2519
2520        // Build every record up front so writing is the only fallible step.
2521        let mut records: Vec<(RecordKind, Vec<u8>)> = Vec::new();
2522        if !obs_to_add.is_empty() {
2523            let mut buf = Vec::new();
2524            store_enc::encode_add_observations(&mut buf, target, &obs_to_add)
2525                .map_err(MCSError::IoError)?;
2526            records.push((RecordKind::AddObservations, buf));
2527        }
2528        for r in &redirect {
2529            let mut buf = Vec::new();
2530            store_enc::encode_create_relation(&mut buf, &r.from, &r.to, &r.relation_type)
2531                .map_err(MCSError::IoError)?;
2532            records.push((RecordKind::CreateRelation, buf));
2533        }
2534        let mut del_buf = Vec::new();
2535        store_enc::encode_delete_entity(&mut del_buf, source).map_err(MCSError::IoError)?;
2536        records.push((RecordKind::DeleteEntity, del_buf));
2537
2538        // Write-ahead, transactionally: begin, all records, commit.
2539        self.store.write_record(RecordKind::TxnBegin, &[]).map_err(MCSError::IoError)?;
2540        for (kind, data) in &records {
2541            self.store.write_record(*kind, data).map_err(MCSError::IoError)?;
2542        }
2543        self.store.write_record(RecordKind::TxnCommit, &[]).map_err(MCSError::IoError)?;
2544
2545        // Logged and committed — now apply to in-memory state (no more logging).
2546        for (kind, data) in &records {
2547            Self::apply_record(
2548                *kind, data, &mut self.interner, &mut self.entity_slots, &mut self.search,
2549                &mut self.name_table, &mut self.relations,
2550            );
2551        }
2552
2553        // `apply_record` maintains `relations` but NOT the incremental
2554        // `adjacency` index (it is built from scratch on load, where this never
2555        // mattered). For a *live* merge we must keep adjacency in sync, or every
2556        // adjacency consumer — find_path, neighbors(depth>=2), extract_subgraph,
2557        // find_all_paths — would return stale results until the next restart
2558        // (M2). The merge removed `source` and added the `redirect` edges, so
2559        // mirror exactly those changes.
2560        self.adjacency.remove(&source_id);
2561        for list in self.adjacency.values_mut() {
2562            list.retain(|(to, _)| *to != source_id);
2563        }
2564        for r in &redirect {
2565            // All endpoints were interned above, so these lookups are infallible.
2566            let from_id = self.interner.get_optional(&r.from).unwrap();
2567            let to_id = self.interner.get_optional(&r.to).unwrap();
2568            let type_id = self.interner.get_optional(&r.relation_type).unwrap();
2569            self.adjacency.entry(from_id).or_default().push((to_id, type_id));
2570            self.adjacency.entry(to_id).or_default().push((from_id, type_id));
2571        }
2572
2573        self.compact_if_needed()?;
2574
2575        Ok(serde_json::json!({
2576            "source": source,
2577            "target": target,
2578            "movedObservations": moved_obs_count,
2579            "addedObservations": added_count,
2580            "redirectedRelations": redirected,
2581        }))
2582    }
2583
2584    /// Extract a connected subgraph around one or more seed entity names,
2585    /// expanding out to `depth` hops along all relations (undirected). Returns
2586    /// the set of reached entities and the relations among them.
2587    pub fn extract_subgraph(&self, names: &[String], depth: u32) -> Result<KnowledgeGraphOut> {
2588        if names.is_empty() {
2589            return Ok(KnowledgeGraphOut {
2590                entities: Vec::new(),
2591                relations: Vec::new(),
2592            });
2593        }
2594        // Seed the BFS queue from any names that exist.
2595        let mut visited: AHashSet<StrId> = AHashSet::new();
2596        let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
2597        for name in names {
2598            if let Some(id) = self.interner.get_optional(name)
2599                && visited.insert(id)
2600            {
2601                queue.push_back((id, 0));
2602            }
2603        }
2604        // Build an undirected adjacency map from the incremental index.
2605        let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
2606        for (&node, edges) in &self.adjacency {
2607            let nb: Vec<StrId> = edges.iter().map(|(to, _)| *to).collect();
2608            adj.insert(node, nb);
2609        }
2610        while let Some((node, d)) = queue.pop_front() {
2611            if d >= depth {
2612                continue;
2613            }
2614            if let Some(nbrs) = adj.get(&node) {
2615                for &nb in nbrs {
2616                    if visited.insert(nb) {
2617                        queue.push_back((nb, d + 1));
2618                    }
2619                }
2620            }
2621        }
2622        let mut entities: Vec<Entity> = Vec::with_capacity(visited.len());
2623        for &nid in &visited {
2624            if let Some(e) = self.entity_by_name_id(nid) {
2625                entities.push(e);
2626            }
2627        }
2628        let relations: Vec<Relation> = self
2629            .relations
2630            .iter()
2631            .filter(|r| visited.contains(&r.from) && visited.contains(&r.to))
2632            .map(|r| self.relation_to_output(r))
2633            .collect();
2634        Ok(KnowledgeGraphOut { entities, relations })
2635    }
2636
2637    /// Return full entities for a list of names. Missing names yield `None`.
2638    pub fn batch_get_entities(&self, names: &[String]) -> Vec<Option<Entity>> {
2639        names.iter().map(|n| self.get_entity(n)).collect()
2640    }
2641
2642    /// Recursive DFS helper — collects every simple path from `current` to
2643    /// `target` up to `max_depth` hops, capped at `max_paths` results.
2644    #[allow(clippy::too_many_arguments)]
2645    fn dfs_all_paths(
2646        adj: &AHashMap<StrId, Vec<StrId>>,
2647        current: StrId,
2648        target: StrId,
2649        max_depth: usize,
2650        max_paths: usize,
2651        visited: &mut AHashSet<StrId>,
2652        current_path: &mut Vec<StrId>,
2653        all_paths: &mut Vec<Vec<StrId>>,
2654    ) {
2655        if all_paths.len() >= max_paths {
2656            return;
2657        }
2658        if current == target && current_path.len() > 1 {
2659            all_paths.push(current_path.clone());
2660            return;
2661        }
2662        if current_path.len() > max_depth {
2663            return;
2664        }
2665        if let Some(neighbors) = adj.get(&current) {
2666            for &nb in neighbors {
2667                if visited.insert(nb) {
2668                    current_path.push(nb);
2669                    Self::dfs_all_paths(
2670                        adj, nb, target, max_depth, max_paths, visited, current_path, all_paths,
2671                    );
2672                    current_path.pop();
2673                    visited.remove(&nb);
2674                }
2675            }
2676        }
2677    }
2678
2679    /// Find all simple paths between `from` and `to` up to `max_depth` hops,
2680    /// returning at most `max_paths` results. Paths are found via DFS with
2681    /// backtracking and include both endpoints.
2682    pub fn find_all_paths(
2683        &self,
2684        from: &str,
2685        to: &str,
2686        max_depth: usize,
2687        max_paths: usize,
2688    ) -> Result<Vec<Vec<String>>> {
2689        let from_id = self
2690            .interner
2691            .get_optional(from)
2692            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
2693        let to_id = self
2694            .interner
2695            .get_optional(to)
2696            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
2697        // Verify both are live.
2698        if self.lookup_live_slot(from).is_none() {
2699            return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
2700        }
2701        if self.lookup_live_slot(to).is_none() {
2702            return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
2703        }
2704        if from_id == to_id {
2705            return Ok(vec![vec![from.to_string()]]);
2706        }
2707        // Build undirected adjacency from the incremental index.
2708        let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::with_capacity(self.adjacency.len());
2709        for (&node, edges) in &self.adjacency {
2710            let nbrs: Vec<StrId> = edges.iter().map(|(to, _)| *to).collect();
2711            adj.insert(node, nbrs);
2712        }
2713        let mut all_paths: Vec<Vec<StrId>> = Vec::new();
2714        let mut current_path = Vec::new();
2715        let mut visited: AHashSet<StrId> = AHashSet::new();
2716        visited.insert(from_id);
2717        current_path.push(from_id);
2718        Self::dfs_all_paths(
2719            &adj,
2720            from_id,
2721            to_id,
2722            max_depth,
2723            max_paths,
2724            &mut visited,
2725            &mut current_path,
2726            &mut all_paths,
2727        );
2728        if all_paths.is_empty() {
2729            return Err(MCSError::MemoryError(format!(
2730                "No path found between '{from}' and '{to}'"
2731            )));
2732        }
2733        let result: Vec<Vec<String>> = all_paths
2734            .into_iter()
2735            .map(|path| {
2736                path.into_iter()
2737                    .map(|id| self.interner.lookup(id).to_string())
2738                    .collect()
2739            })
2740            .collect();
2741        Ok(result)
2742    }
2743
2744    // --- Snapshot ---
2745
2746    /// Create a wait-free read snapshot (item 2 in plan).
2747    /// Freezes entity_slots and relations into `Arc<[_]>` and clones the rest.
2748    pub fn snapshot(&self) -> ReadSnapshot {
2749        ReadSnapshot {
2750            interner: Arc::new(self.interner.clone()),
2751            entity_slots: Arc::from_iter(self.entity_slots.iter().cloned()),
2752            name_table: Arc::new(self.name_table.clone()),
2753            relations: Arc::from_iter(self.relations.iter().cloned()),
2754            adjacency: Arc::new(self.adjacency.clone()),
2755            search: Arc::new(self.search.clone()),
2756        }
2757    }
2758
2759    // --- Flush & sync ---
2760
2761    /// Flush the `BufWriter` to the kernel buffer (process-crash safe).
2762    pub fn flush(&mut self) -> Result<()> {
2763        self.store.flush().map_err(MCSError::IoError)
2764    }
2765
2766    /// `fsync` the log to disk (OS-crash safe). Called by the background sync
2767    /// thread in [`GraphHandle`]; most callers should use `flush()` instead.
2768    pub fn sync(&mut self) -> Result<()> {
2769        self.store.sync().map_err(MCSError::IoError)
2770    }
2771
2772    /// Flush + fsync (legacy; prefer [`flush`](Self::flush) for production use).
2773    pub fn flush_and_sync(&mut self) -> Result<()> {
2774        self.store.flush_and_sync().map_err(MCSError::IoError)
2775    }
2776}
2777
2778
2779
2780// ---------------------------------------------------------------------------
2781// GraphHandle – wait-free read / serialized-write handle.
2782// ---------------------------------------------------------------------------
2783
2784/// Wait-free read / serialized-write handle to the graph.
2785///
2786/// Readers load a frozen [`ReadSnapshot`] via [`read`](GraphHandle::read)
2787/// (lock-free via `ArcSwap`). Writers take a [`Mutex`] lock, mutate the
2788/// underlying [`KnowledgeGraph`], and publish a fresh snapshot on unlock
2789/// via the [`WriteGuard`] drop glue.
2790///
2791/// A background thread calls `fsync` on the WAL file every 1 second so that
2792/// write handlers never block on disk I/O. The thread is stopped on `Drop`.
2793///
2794/// The sync thread uses its own `Arc<File>` handle (cloned from the WAL file)
2795/// so that `fsync` never contends with the graph mutex. A [`Condvar`] notifies
2796/// the thread immediately after every write, ensuring low-latency sync without
2797/// polling.
2798pub struct GraphHandle {
2799    inner: Arc<parking_lot::Mutex<KnowledgeGraph>>,
2800    snapshot: ArcSwap<ReadSnapshot>,
2801    /// Cached JSON of the full `read_graph` output. Invalidated on every write.
2802    read_cache: ArcSwap<Option<Arc<str>>>,
2803    /// Notifies the background sync thread when a write has flushed data to the
2804    /// kernel buffer. The thread also wakes on a 1-second timeout as a fallback.
2805    /// The `bool` is `true` when there is pending data to sync.
2806    sync_notify: Arc<(StdMutex<bool>, Condvar)>,
2807    /// Signal the background sync thread to stop. Set on `Drop`.
2808    stop_sync: Arc<AtomicBool>,
2809    durability: Durability,
2810}
2811
2812/// RAII guard that publishes a fresh [`ReadSnapshot`] on drop.
2813pub struct WriteGuard<'a> {
2814    guard: parking_lot::MutexGuard<'a, KnowledgeGraph>,
2815    snapshot: &'a ArcSwap<ReadSnapshot>,
2816    read_cache: &'a ArcSwap<Option<Arc<str>>>,
2817    sync_notify: &'a (StdMutex<bool>, Condvar),
2818    durability: Durability,
2819    did_publish: bool,
2820}
2821
2822impl WriteGuard<'_> {
2823    /// Publish a snapshot now (eager, before drop). Also called by Drop.
2824    /// Invalidates the serialized read cache. Flushes the WAL to kernel buffer
2825    /// (the background sync thread in [`GraphHandle`] handles the actual `fsync`).
2826    pub fn publish(&mut self) {
2827        if self.durability.is_sync() {
2828            if let Err(e) = self.guard.flush_and_sync() {
2829                tracing::error!("WAL sync failed: {e}");
2830            }
2831        } else if let Err(e) = self.guard.flush() {
2832            tracing::error!("WAL flush failed: {e}");
2833        }
2834        let snap = Arc::new(self.guard.snapshot());
2835        self.snapshot.store(snap);
2836        self.read_cache.store(Arc::new(None));
2837        self.did_publish = true;
2838        // Wake the sync thread — data is in the kernel buffer waiting for fsync.
2839        let (lock, cvar) = self.sync_notify;
2840        let mut pending = lock.lock().unwrap_or_else(|e| e.into_inner());
2841        *pending = true;
2842        cvar.notify_one();
2843    }
2844
2845    /// Access the underlying `KnowledgeGraph` for mutation.
2846    pub fn graph(&mut self) -> &mut KnowledgeGraph {
2847        &mut self.guard
2848    }
2849}
2850
2851impl std::ops::Deref for WriteGuard<'_> {
2852    type Target = KnowledgeGraph;
2853    fn deref(&self) -> &KnowledgeGraph {
2854        &self.guard
2855    }
2856}
2857
2858impl std::ops::DerefMut for WriteGuard<'_> {
2859    fn deref_mut(&mut self) -> &mut KnowledgeGraph {
2860        &mut self.guard
2861    }
2862}
2863
2864impl Drop for WriteGuard<'_> {
2865    fn drop(&mut self) {
2866        if !self.did_publish {
2867            self.publish();
2868        }
2869    }
2870}
2871
2872impl Drop for GraphHandle {
2873    fn drop(&mut self) {
2874        self.stop_sync.store(true, Ordering::Relaxed);
2875        // Wake the sync thread by setting pending=true so the
2876        // wait_timeout_while(|p| !*p) condition breaks immediately.
2877        let (lock, cvar) = &*self.sync_notify;
2878        let mut pending = lock.lock().unwrap_or_else(|e| e.into_inner());
2879        *pending = true;
2880        cvar.notify_one();
2881    }
2882}
2883
2884impl GraphHandle {
2885    /// Open or create the graph at `path`, seeding the initial snapshot.
2886    /// Spawns a background thread that `fsync`s the WAL so that write handlers
2887    /// never block on disk I/O.
2888    pub fn new(path: &Path, durability: Durability) -> std::io::Result<Self> {
2889        let kg = KnowledgeGraph::new(path)?;
2890        let snapshot = Arc::new(kg.snapshot());
2891        // Clone the shared sync slot before moving `kg` into the Mutex. The slot
2892        // is updated in place by `compact`/`reopen_truncated`, so the thread
2893        // always fsyncs the current file (D1).
2894        let sync_slot = Arc::clone(&kg.store.sync_slot);
2895        let inner = Arc::new(parking_lot::Mutex::new(kg));
2896
2897        let sync_notify: Arc<(StdMutex<bool>, Condvar)> =
2898            Arc::new((StdMutex::new(false), Condvar::new()));
2899        let notify = Arc::clone(&sync_notify);
2900        let stop_sync = Arc::new(AtomicBool::new(false));
2901
2902        // Background sync thread — calls fsync on a dedicated `Arc<File>`
2903        // handle, never touching the graph mutex. Woken by Condvar after every
2904        // write; falls back to a 1-second timeout.
2905        let sync_stop = Arc::clone(&stop_sync);
2906        std::thread::Builder::new()
2907            .name("mcp-memory-sync".into())
2908            .spawn(move || {
2909                let (lock, cvar) = &*notify;
2910                loop {
2911                    // Wait while there is nothing to sync. Woken by publish()
2912                    // or by the 1-second timeout.
2913                    let mut guard = cvar
2914                        .wait_timeout_while(
2915                            lock.lock().unwrap_or_else(|e| e.into_inner()),
2916                            std::time::Duration::from_secs(1),
2917                            |p| !*p,
2918                        )
2919                        .unwrap_or_else(|e| e.into_inner())
2920                        .0;
2921
2922                    let should_sync = *guard;
2923                    *guard = false;
2924                    // Release the StdMutex before fsync so publish() is not
2925                    // blocked on setting the next pending flag.
2926                    drop(guard);
2927
2928                    if should_sync {
2929                        // Load the current handle each cycle so a compaction
2930                        // that swapped the underlying file is picked up (D1).
2931                        if let Err(e) = sync_slot.load().sync_data() {
2932                            tracing::error!("WAL fsync failed: {e}");
2933                        }
2934                    }
2935
2936                    if sync_stop.load(Ordering::Relaxed) {
2937                        // One final fsync before exiting.
2938                        if let Err(e) = sync_slot.load().sync_data() {
2939                            tracing::error!("WAL final fsync failed: {e}");
2940                        }
2941                        break;
2942                    }
2943                }
2944            })
2945            .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
2946
2947        Ok(Self {
2948            inner,
2949            snapshot: ArcSwap::new(snapshot),
2950            read_cache: ArcSwap::new(Arc::new(None)),
2951            sync_notify,
2952            stop_sync,
2953            durability,
2954        })
2955    }
2956
2957    /// Return the cached full-graph JSON, or build and cache it on first call.
2958    /// Invalidated by any write (see [`WriteGuard::publish`]).
2959    pub fn read_graph_cached(&self) -> Arc<str> {
2960        if let Some(cached) = self.read_cache.load().as_ref() {
2961            return cached.clone();
2962        }
2963        let graph = self.read();
2964        let json: Arc<str> = Arc::from(graph.read_graph_json().into_boxed_str());
2965        self.read_cache.store(Arc::new(Some(json.clone())));
2966        json
2967    }
2968
2969    /// Lock-free read snapshot. Holds an `Arc` reference to the frozen graph data.
2970    pub fn read(&self) -> ReadSnapshot {
2971        (**self.snapshot.load()).clone()
2972    }
2973
2974    /// Serialised write access. Returns a guard that publishes a fresh snapshot
2975    /// when dropped (or when [`WriteGuard::publish`] is called eagerly).
2976    pub fn write(&self) -> WriteGuard<'_> {
2977        WriteGuard {
2978            guard: self.inner.lock(),
2979            snapshot: &self.snapshot,
2980            read_cache: &self.read_cache,
2981            sync_notify: &self.sync_notify,
2982            durability: self.durability,
2983            did_publish: false,
2984        }
2985    }
2986}
2987
2988