Skip to main content

mcp_memory/
kg.rs

1use std::collections::VecDeque;
2use std::sync::Arc;
3
4use ahash::{AHashMap, AHashSet};
5use arc_swap::ArcSwap;
6use parking_lot::Mutex;
7use std::path::Path;
8
9use serde::ser::{Serialize, SerializeSeq, SerializeStruct, Serializer};
10
11use crate::errors::{MCSError, Result};
12use crate::intern::{StrId, StringInterner};
13use crate::types::{Entity, Relation, KnowledgeGraphOut};
14use crate::search::SearchIndex;
15use crate::store::{self as store_enc, BinaryStore, RecordKind};
16
17const ENTITY_SLOT_LIVE: u8 = 1;
18const NAME_TABLE_SHARDS: usize = 4;
19
20// ---------------------------------------------------------------------------
21// Prefetch helper – issues a non-binding software prefetch hint to pull a
22// cache-line into L1/L2 while we finish probing the current entry.
23// ---------------------------------------------------------------------------
24#[cfg(target_arch = "x86_64")]
25#[inline(always)]
26unsafe fn prefetch_addr(addr: *const u8) {
27    // _MM_HINT_T0 = 3  (temporal prefetch to all cache levels)
28    std::arch::x86_64::_mm_prefetch::<3>(addr);
29}
30
31#[cfg(not(target_arch = "x86_64"))]
32#[inline(always)]
33const unsafe fn prefetch_addr(_addr: *const u8) {}
34
35/// fsync the directory containing `path` so a rename/create inside it is durable.
36/// On platforms where a directory cannot be opened/synced this is a no-op.
37fn sync_parent_dir(path: &Path) -> std::io::Result<()> {
38    let dir = path.parent().filter(|p| !p.as_os_str().is_empty());
39    let dir = match dir {
40        Some(d) => d,
41        None => Path::new("."),
42    };
43    match std::fs::File::open(dir) {
44        Ok(f) => match f.sync_all() {
45            Ok(()) => Ok(()),
46            // Some filesystems disallow fsync on a directory handle; tolerate it.
47            Err(e) if e.kind() == std::io::ErrorKind::InvalidInput => Ok(()),
48            Err(e) => Err(e),
49        },
50        Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
51        Err(e) => Err(e),
52    }
53}
54
55// ---------------------------------------------------------------------------
56// StoredEntity / StoredRelation – internal representations using StrId.
57// ---------------------------------------------------------------------------
58// Default layout: 40 B / align 8 (Rust packs `state` into the Vec's padding and
59// the `Option` niche is free). With `cache_align`, the slot is rounded to a full
60// 64-byte line so a point lookup/mutation (name_table -> slot index) touches
61// exactly one cache line instead of occasionally straddling two. Costs +60%
62// memory and a wider stride on bulk scans — measure before enabling.
63#[derive(Clone)]
64#[cfg_attr(feature = "cache_align", repr(align(64)))]
65pub(crate) struct StoredEntity {
66    state: u8,
67    pub(crate) name: StrId,
68    pub(crate) entity_type: StrId,
69    pub(crate) observations: Vec<StrId>,
70}
71
72impl StoredEntity {
73    pub(crate) const fn is_live(&self) -> bool {
74        self.state == ENTITY_SLOT_LIVE
75    }
76}
77
78// Default layout: 12 B / align 4 → ~1 in 5 records straddles a 64-byte line.
79// With `cache_align`, align(16) rounds the size to 16 B so 4 records fill a line
80// exactly (no straddle, AVX2-load-friendly) for +33% memory.
81#[derive(Clone)]
82#[cfg_attr(feature = "cache_align", repr(align(16)))]
83pub(crate) struct StoredRelation {
84    pub(crate) from: StrId,
85    pub(crate) to: StrId,
86    pub(crate) relation_type: StrId,
87}
88
89// ---------------------------------------------------------------------------
90// Borrowing serialization views (M6).
91//
92// Read tools used to build owned `Entity`/`Relation` vecs (a fresh `String`
93// per name/type/observation) and *then* serialize them — roughly 2-3x the
94// graph resident at once. These views instead hold references to the selected
95// stored records and emit their interned `&str` directly during
96// serialization, with no intermediate owned strings. The emitted JSON is
97// byte-for-byte identical to serializing `KnowledgeGraphOut`.
98// ---------------------------------------------------------------------------
99
100/// A borrowing view over a selected slice of the graph. Serializes to
101/// `{"entities":[...],"relations":[...]}`.
102pub struct GraphView<'a> {
103    kg: &'a KnowledgeGraph,
104    entities: Vec<&'a StoredEntity>,
105    relations: Vec<&'a StoredRelation>,
106}
107
108impl GraphView<'_> {
109    /// Materialize into the owned [`KnowledgeGraphOut`]. Used by the direct
110    /// (non-serializing) callers and tests; the server's read handlers
111    /// serialize the view directly instead.
112    pub fn to_owned_out(&self) -> KnowledgeGraphOut {
113        KnowledgeGraphOut {
114            entities: self.entities.iter().map(|e| self.kg.entity_to_output(e)).collect(),
115            relations: self.relations.iter().map(|r| self.kg.relation_to_output(r)).collect(),
116        }
117    }
118}
119
120impl Serialize for GraphView<'_> {
121    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
122        let mut st = s.serialize_struct("KnowledgeGraphOut", 2)?;
123        st.serialize_field("entities", &EntityListRef { kg: self.kg, items: &self.entities })?;
124        st.serialize_field("relations", &RelationListRef { kg: self.kg, items: &self.relations })?;
125        st.end()
126    }
127}
128
129struct EntityListRef<'a> { kg: &'a KnowledgeGraph, items: &'a [&'a StoredEntity] }
130impl Serialize for EntityListRef<'_> {
131    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
132        let mut seq = s.serialize_seq(Some(self.items.len()))?;
133        for &e in self.items {
134            seq.serialize_element(&EntityRef { kg: self.kg, e })?;
135        }
136        seq.end()
137    }
138}
139
140struct RelationListRef<'a> { kg: &'a KnowledgeGraph, items: &'a [&'a StoredRelation] }
141impl Serialize for RelationListRef<'_> {
142    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
143        let mut seq = s.serialize_seq(Some(self.items.len()))?;
144        for &r in self.items {
145            seq.serialize_element(&RelationRef { kg: self.kg, r })?;
146        }
147        seq.end()
148    }
149}
150
151struct EntityRef<'a> { kg: &'a KnowledgeGraph, e: &'a StoredEntity }
152impl Serialize for EntityRef<'_> {
153    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
154        let mut st = s.serialize_struct("Entity", 3)?;
155        st.serialize_field("name", self.kg.interner.lookup(self.e.name))?;
156        st.serialize_field("entityType", self.kg.interner.lookup(self.e.entity_type))?;
157        st.serialize_field("observations", &ObsRef { kg: self.kg, obs: &self.e.observations })?;
158        st.end()
159    }
160}
161
162struct ObsRef<'a> { kg: &'a KnowledgeGraph, obs: &'a [StrId] }
163impl Serialize for ObsRef<'_> {
164    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
165        let mut seq = s.serialize_seq(Some(self.obs.len()))?;
166        for &o in self.obs {
167            seq.serialize_element(self.kg.interner.lookup(o))?;
168        }
169        seq.end()
170    }
171}
172
173struct RelationRef<'a> { kg: &'a KnowledgeGraph, r: &'a StoredRelation }
174impl Serialize for RelationRef<'_> {
175    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
176        let mut st = s.serialize_struct("Relation", 3)?;
177        st.serialize_field("from", self.kg.interner.lookup(self.r.from))?;
178        st.serialize_field("to", self.kg.interner.lookup(self.r.to))?;
179        st.serialize_field("relationType", self.kg.interner.lookup(self.r.relation_type))?;
180        st.end()
181    }
182}
183
184/// Edge-following direction for neighborhood queries.
185#[derive(Clone, Copy, PartialEq, Eq, Debug)]
186pub enum Direction {
187    /// Follow `from -> to` (outgoing edges).
188    Out,
189    /// Follow `to -> from` (incoming edges).
190    In,
191    /// Follow edges regardless of orientation.
192    Both,
193}
194
195impl Direction {
196    /// Parse a direction string; anything other than `"out"`/`"in"` is `Both`.
197    pub fn parse(s: Option<&str>) -> Self {
198        match s {
199            Some("out") => Direction::Out,
200            Some("in") => Direction::In,
201            _ => Direction::Both,
202        }
203    }
204}
205
206/// Escape a string for embedding inside a Mermaid/DOT quoted label.
207fn sanitize_label(s: &str) -> String {
208    let mut out = String::with_capacity(s.len());
209    for c in s.chars() {
210        match c {
211            '"' => out.push('\''),
212            '\n' | '\r' => out.push(' '),
213            _ => out.push(c),
214        }
215    }
216    out
217}
218
219// ---------------------------------------------------------------------------
220// ShardedNameTable – open-addressing hash map split into N independent shards.
221//
222// Each shard uses **ctrl-byte bucket** approach: a 1-byte metadata array
223// stores the 7-bit hash stamp (h2) for each slot, with `0xFF` = EMPTY.
224// On probe, the first memory access is a single byte (ctrl). The full key
225// (StrId) is only compared when the stamp matches — ~127/128 of probe steps
226// touch nothing but the ctrl byte.  See also SwissTable / hashbrown.
227// ---------------------------------------------------------------------------
228const EMPTY_SLOT: u8 = 0xFF;
229
230#[inline(always)]
231const fn h2(hash: u64) -> u8 {
232    (hash & 0x7F) as u8
233}
234
235#[inline(always)]
236const fn h1(hash: u64, mask: usize) -> usize {
237    ((hash >> 7) as usize) & mask
238}
239
240#[derive(Clone)]
241struct NameTableShard {
242    ctrl: Vec<u8>,      // 0xFF = empty; 0x00-0x7F = h2 stamp (bit 7 always clear)
243    names: Vec<StrId>,
244    slots: Vec<u32>,
245    mask: usize,
246    count: usize,
247}
248
249impl NameTableShard {
250    fn new(capacity: usize) -> Self {
251        let cap = capacity.next_power_of_two().max(16);
252        Self {
253            ctrl: vec![EMPTY_SLOT; cap],
254            names: vec![StrId::EMPTY; cap],
255            slots: vec![u32::MAX; cap],
256            mask: cap - 1,
257            count: 0,
258        }
259    }
260
261    #[inline(always)]
262    fn lookup(&self, hash: u64, name: StrId) -> Option<u32> {
263        let stamp = h2(hash);
264        let mask = self.mask;
265        let mut idx = h1(hash, mask);
266        let ctrl = self.ctrl.as_ptr();
267        let names = self.names.as_ptr();
268        let slots = self.slots.as_ptr();
269        let len = self.ctrl.len();
270
271        for _ in 0..len {
272            // Prefetch the ctrl byte 4 slots ahead — overlaps memory latency.
273            let prefetch_idx = idx.wrapping_add(4) & mask;
274            unsafe { prefetch_addr(ctrl.add(prefetch_idx)) };
275
276            // SAFETY: idx always < len because of &mask on each iteration.
277            unsafe {
278                let c = *ctrl.add(idx);
279                // Bit 7 set → EMPTY → key not present.
280                if c & 0x80 != 0 {
281                    return None;
282                }
283                // Stamp match → compare full key (rare: ~1/128 probes).
284                if c == stamp && *names.add(idx) == name {
285                    return Some(*slots.add(idx));
286                }
287            }
288            idx = (idx + 1) & mask;
289        }
290        None
291    }
292
293    fn insert(&mut self, interner: &StringInterner, hash: u64, name: StrId, slot: u32) {
294        if self.count * 4 > self.ctrl.len() * 3 {
295            self.grow(interner);
296        }
297        let stamp = h2(hash);
298        let mask = self.mask;
299        let mut idx = h1(hash, mask);
300        loop {
301            // SAFETY: idx & mask always < len for power-of-two capacity.
302            unsafe {
303                if *self.ctrl.get_unchecked(idx) & 0x80 != 0 {
304                    *self.ctrl.get_unchecked_mut(idx) = stamp;
305                    *self.names.get_unchecked_mut(idx) = name;
306                    *self.slots.get_unchecked_mut(idx) = slot;
307                    self.count += 1;
308                    return;
309                }
310            }
311            idx = (idx + 1) & mask;
312        }
313    }
314
315    fn remove(&mut self, interner: &StringInterner, hash: u64, name: StrId) {
316        let stamp = h2(hash);
317        let mask = self.mask;
318        let mut idx = h1(hash, mask);
319        let len = self.ctrl.len();
320        for _ in 0..len {
321            if self.ctrl[idx] & 0x80 != 0 {
322                return;
323            }
324            if self.ctrl[idx] == stamp && self.names[idx] == name {
325                // Found — remove with shift-back to preserve probe chains.
326                self.ctrl[idx] = EMPTY_SLOT;
327                self.names[idx] = StrId::EMPTY;
328                self.slots[idx] = u32::MAX;
329                self.count -= 1;
330
331                let mut next = (idx + 1) & mask;
332                while self.ctrl[next] & 0x80 == 0 {
333                    let nn = self.names[next];
334                    let ns = self.slots[next];
335                    // Hash is no longer stored (M4) — recompute it from the
336                    // interned name to find the entry's ideal bucket.
337                    let nh = interner.get_hash(nn);
338                    self.ctrl[next] = EMPTY_SLOT;
339                    self.names[next] = StrId::EMPTY;
340                    self.slots[next] = u32::MAX;
341                    self.count -= 1;
342
343                    // Re-insert at its ideal bucket.
344                    let nstamp = h2(nh);
345                    let mut re_idx = h1(nh, mask);
346                    while self.ctrl[re_idx] & 0x80 == 0 {
347                        re_idx = (re_idx + 1) & mask;
348                    }
349                    self.ctrl[re_idx] = nstamp;
350                    self.names[re_idx] = nn;
351                    self.slots[re_idx] = ns;
352                    self.count += 1;
353
354                    next = (next + 1) & mask;
355                }
356                return;
357            }
358            idx = (idx + 1) & mask;
359        }
360    }
361
362    fn grow(&mut self, interner: &StringInterner) {
363        let new_cap = self.ctrl.len() * 2;
364        let new_mask = new_cap - 1;
365        let mut new_ctrl = vec![EMPTY_SLOT; new_cap];
366        let mut new_names = vec![StrId::EMPTY; new_cap];
367        let mut new_slots = vec![u32::MAX; new_cap];
368
369        for i in 0..self.ctrl.len() {
370            if self.ctrl[i] & 0x80 == 0 {
371                // Recompute the hash from the interned name (M4: not stored).
372                let name = self.names[i];
373                let hash = interner.get_hash(name);
374                let stamp = h2(hash);
375                let mut idx = h1(hash, new_mask);
376                while new_ctrl[idx] & 0x80 == 0 {
377                    idx = (idx + 1) & new_mask;
378                }
379                new_ctrl[idx] = stamp;
380                new_names[idx] = name;
381                new_slots[idx] = self.slots[i];
382            }
383        }
384
385        self.ctrl = new_ctrl;
386        self.names = new_names;
387        self.slots = new_slots;
388        self.mask = new_mask;
389    }
390}
391
392#[derive(Clone)]
393struct ShardedNameTable {
394    shards: [NameTableShard; NAME_TABLE_SHARDS],
395}
396
397impl ShardedNameTable {
398    fn new(capacity_per_shard: usize) -> Self {
399        Self {
400            shards: [
401                NameTableShard::new(capacity_per_shard),
402                NameTableShard::new(capacity_per_shard),
403                NameTableShard::new(capacity_per_shard),
404                NameTableShard::new(capacity_per_shard),
405            ],
406        }
407    }
408
409    #[inline(always)]
410    const fn shard(hash: u64) -> usize {
411        (hash as usize) & (NAME_TABLE_SHARDS - 1)
412    }
413
414    #[inline(always)]
415    fn lookup(&self, hash: u64, name: StrId) -> Option<u32> {
416        self.shards[Self::shard(hash)].lookup(hash, name)
417    }
418
419    #[inline(always)]
420    fn insert(&mut self, interner: &StringInterner, hash: u64, name: StrId, slot: u32) {
421        self.shards[Self::shard(hash)].insert(interner, hash, name, slot);
422    }
423
424    #[inline(always)]
425    fn remove(&mut self, interner: &StringInterner, hash: u64, name: StrId) {
426        self.shards[Self::shard(hash)].remove(interner, hash, name);
427    }
428}
429
430// ---------------------------------------------------------------------------
431// KnowledgeGraph – the central type.
432// ---------------------------------------------------------------------------
433pub struct KnowledgeGraph {
434    interner: StringInterner,
435    entity_slots: Vec<Option<StoredEntity>>,
436    /// Tombstoned slot indices available for reuse on the next create (M2),
437    /// so create/delete churn doesn't grow `entity_slots` without bound.
438    free_slots: Vec<u32>,
439    name_table: ShardedNameTable,
440    relations: Vec<StoredRelation>,
441    /// Incremental adjacency index: StrId → outgoing (to, type) pairs.
442    /// Updated on every create_relations/delete_relations/delete_entities.
443    /// Traversals use this instead of rebuilding from scratch (item 3 in plan).
444    adjacency: AHashMap<StrId, Vec<(StrId, StrId)>>,
445    search: SearchIndex,
446    store: BinaryStore,
447}
448
449// ---------------------------------------------------------------------------
450// ReadSnapshot – wait-free, lock-free frozen view of the graph for readers.
451// Created by KnowledgeGraph::snapshot() after each write transaction.
452// ---------------------------------------------------------------------------
453#[derive(Clone)]
454pub struct ReadSnapshot {
455    pub(crate) interner: StringInterner,
456    pub(crate) entity_slots: Arc<[Option<StoredEntity>]>,
457    #[allow(dead_code)]
458    free_slots: Vec<u32>,
459    name_table: ShardedNameTable,
460    pub(crate) relations: Arc<[StoredRelation]>,
461    adjacency: AHashMap<StrId, Vec<(StrId, StrId)>>,
462    search: SearchIndex,
463}
464
465// --- ReadSnapshot helpers (mirrors KnowledgeGraph helpers) ---
466impl ReadSnapshot {
467    fn lookup_live_slot(&self, name: &str) -> Option<u32> {
468        let name_id = self.interner.get_optional(name)?;
469        let hash = self.interner.get_hash(name_id);
470        let slot = self.name_table.lookup(hash, name_id)?;
471        self.entity_slots
472            .get(slot as usize)
473            .and_then(|s| s.as_ref())
474            .filter(|e| e.is_live())?;
475        Some(slot)
476    }
477
478    fn entity_by_name_id(&self, name_id: StrId) -> Option<Entity> {
479        let hash = self.interner.get_hash(name_id);
480        let slot = self.name_table.lookup(hash, name_id)?;
481        let e = self.entity_slots.get(slot as usize)?.as_ref()?;
482        Some(self.entity_to_output(e))
483    }
484
485    pub(crate) fn entity_to_output(&self, e: &StoredEntity) -> Entity {
486        Entity {
487            name: self.interner.lookup(e.name).to_string(),
488            entity_type: self.interner.lookup(e.entity_type).to_string(),
489            observations: e
490                .observations
491                .iter()
492                .map(|o| self.interner.lookup(*o).to_string())
493                .collect(),
494        }
495    }
496
497    pub(crate) fn relation_to_output(&self, r: &StoredRelation) -> Relation {
498        Relation {
499            from: self.interner.lookup(r.from).to_string(),
500            to: self.interner.lookup(r.to).to_string(),
501            relation_type: self.interner.lookup(r.relation_type).to_string(),
502        }
503    }
504
505    /// Open named entities + incident relations.
506    pub fn open_nodes(&self, names: &[String]) -> KnowledgeGraphOut {
507        let name_ids: std::collections::HashSet<StrId> = names
508            .iter()
509            .filter_map(|n| self.interner.get_optional(n))
510            .collect();
511        let entities: Vec<Entity> = self
512            .entity_slots
513            .iter()
514            .filter_map(|s| {
515                let e = s.as_ref()?;
516                if e.is_live() && name_ids.contains(&e.name) {
517                    Some(self.entity_to_output(e))
518                } else {
519                    None
520                }
521            })
522            .collect();
523        let matched: std::collections::HashSet<StrId> = entities.iter()
524            .filter_map(|e| self.interner.get_optional(&e.name))
525            .collect();
526        let relations: Vec<Relation> = self
527            .relations
528            .iter()
529            .filter(|r| matched.contains(&r.from) || matched.contains(&r.to))
530            .map(|r| self.relation_to_output(r))
531            .collect();
532        KnowledgeGraphOut { entities, relations }
533    }
534
535    /// Full graph read. Returns all entities and relations.
536    pub fn read_graph(&self) -> KnowledgeGraphOut {
537        let entities: Vec<Entity> = self
538            .entity_slots
539            .iter()
540            .filter_map(|s| s.as_ref().filter(|e| e.is_live()))
541            .map(|e| self.entity_to_output(e))
542            .collect();
543        let relations: Vec<Relation> = self
544            .relations
545            .iter()
546            .map(|r| self.relation_to_output(r))
547            .collect();
548        KnowledgeGraphOut { entities, relations }
549    }
550
551    /// Search entities by keyword.
552    pub fn search_entities(&self, query: &str) -> Result<Vec<Entity>> {
553        let token = query.to_lowercase();
554        let matching = self.search.search(&token, &self.interner);
555        Ok(matching
556            .iter()
557            .filter_map(|idx| {
558                self.entity_slots
559                    .get(*idx as usize)?
560                    .as_ref()
561                    .filter(|e| e.is_live())
562                    .map(|e| self.entity_to_output(e))
563            })
564            .collect())
565    }
566
567    /// Get a single entity by name.
568    pub fn get_entity(&self, name: &str) -> Option<Entity> {
569        self.lookup_live_slot(name)?;
570        let name_id = self.interner.get_optional(name)?;
571        self.entity_by_name_id(name_id)
572    }
573
574    /// Neighborhood expansion (same logic as KnowledgeGraph::neighbors but read-only).
575    pub fn neighbors(
576        &self,
577        name: &str,
578        direction: Direction,
579        rtype: Option<&str>,
580        depth: u32,
581    ) -> Result<KnowledgeGraphOut> {
582        self.lookup_live_slot(name)
583            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
584        let start = self.interner.get_optional(name).unwrap();
585
586        let rtype_id = match rtype {
587            Some(r) => match self.interner.get_optional(r) {
588                Some(id) => Some(id),
589                None => {
590                    let entities = self.entity_by_name_id(start).into_iter().collect();
591                    return Ok(KnowledgeGraphOut { entities, relations: Vec::new() });
592                }
593            },
594            None => None,
595        };
596
597        let mut visited: AHashSet<StrId> = AHashSet::new();
598        visited.insert(start);
599
600        let type_ok = |r: &StoredRelation, rt: Option<StrId>| rt.is_none_or(|rt_id| r.relation_type == rt_id);
601
602        if depth == 1 {
603            for r in self.relations.iter().filter(|r| type_ok(r, rtype_id)) {
604                match direction {
605                    Direction::Out => {
606                        if r.from == start { visited.insert(r.to); }
607                    }
608                    Direction::In => {
609                        if r.to == start { visited.insert(r.from); }
610                    }
611                    Direction::Both => {
612                        if r.from == start { visited.insert(r.to); }
613                        else if r.to == start { visited.insert(r.from); }
614                    }
615                }
616            }
617        } else if depth >= 2 {
618            let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
619            match direction {
620                Direction::Both => {
621                    for (&node, edges) in &self.adjacency {
622                        for &(nb, rt) in edges {
623                            if rtype_id.is_none_or(|rt_id| rt == rt_id) {
624                                adj.entry(node).or_default().push(nb);
625                            }
626                        }
627                    }
628                }
629                Direction::Out | Direction::In => {
630                    for r in self.relations.iter().filter(|r| type_ok(r, rtype_id)) {
631                        match direction {
632                            Direction::Out => adj.entry(r.from).or_default().push(r.to),
633                            Direction::In => adj.entry(r.to).or_default().push(r.from),
634                            _ => unreachable!(),
635                        }
636                    }
637                }
638            }
639            let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
640            queue.push_back((start, 0));
641            while let Some((node, d)) = queue.pop_front() {
642                if d >= depth { continue; }
643                if let Some(nbrs) = adj.get(&node) {
644                    for &nb in nbrs {
645                        if visited.insert(nb) {
646                            queue.push_back((nb, d + 1));
647                        }
648                    }
649                }
650            }
651        }
652
653        let mut entities = Vec::with_capacity(visited.len());
654        for &nid in &visited {
655            if let Some(e) = self.entity_by_name_id(nid) {
656                entities.push(e);
657            }
658        }
659        let relations: Vec<Relation> = self
660            .relations
661            .iter()
662            .filter(|r| type_ok(r, rtype_id) && visited.contains(&r.from) && visited.contains(&r.to))
663            .map(|r| self.relation_to_output(r))
664            .collect();
665        Ok(KnowledgeGraphOut { entities, relations })
666    }
667
668    /// Describe an entity: entity details, incident relations, neighbors, degree.
669    pub fn describe_entity(&self, name: &str) -> Result<serde_json::Value> {
670        let name_id = self
671            .interner
672            .get_optional(name)
673            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
674        let entity = self
675            .entity_by_name_id(name_id)
676            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
677
678        let mut incident: Vec<Relation> = Vec::new();
679        let mut neighbor_seen: AHashSet<StrId> = AHashSet::new();
680        let mut neighbors: Vec<&str> = Vec::new();
681        for r in self.relations.iter() {
682            if r.from == name_id || r.to == name_id {
683                incident.push(self.relation_to_output(r));
684                let other = if r.from == name_id { r.to } else { r.from };
685                if other != name_id && neighbor_seen.insert(other) {
686                    neighbors.push(self.interner.lookup(other));
687                }
688            }
689        }
690
691        Ok(serde_json::json!({
692            "entity": entity,
693            "relations": incident,
694            "neighbors": neighbors,
695            "degree": incident.len(),
696        }))
697    }
698
699    /// Find the shortest path between two entities.
700    pub fn find_path(&self, from: &str, to: &str) -> Result<Vec<String>> {
701        let from_id = self
702            .interner
703            .get_optional(from)
704            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
705        let to_id = self
706            .interner
707            .get_optional(to)
708            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
709        if self.lookup_live_slot(from).is_none() {
710            return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
711        }
712        if self.lookup_live_slot(to).is_none() {
713            return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
714        }
715
716        // BFS over incremental adjacency index.
717        let mut visited: AHashSet<StrId> = AHashSet::new();
718        let mut parent: AHashMap<StrId, StrId> = AHashMap::new();
719        let mut queue: VecDeque<StrId> = VecDeque::new();
720
721        visited.insert(from_id);
722        queue.push_back(from_id);
723
724        while let Some(current) = queue.pop_front() {
725            if current == to_id { break; }
726            if let Some(neighbors) = self.adjacency.get(&current) {
727                for &(neighbor, _) in neighbors {
728                    if visited.insert(neighbor) {
729                        parent.insert(neighbor, current);
730                        queue.push_back(neighbor);
731                    }
732                }
733            }
734        }
735
736        if !visited.contains(&to_id) {
737            return Err(MCSError::MemoryError(format!(
738                "No path found between '{from}' and '{to}'"
739            )));
740        }
741
742        let mut path = Vec::new();
743        let mut cur = to_id;
744        path.push(self.interner.lookup(cur).to_string());
745        while let Some(&p) = parent.get(&cur) {
746            path.push(self.interner.lookup(p).to_string());
747            cur = p;
748        }
749        path.reverse();
750        Ok(path)
751    }
752
753    /// Extract a subgraph around the given entities.
754    pub fn extract_subgraph(&self, names: &[String], depth: u32) -> Result<KnowledgeGraphOut> {
755        if names.is_empty() {
756            return Ok(KnowledgeGraphOut { entities: Vec::new(), relations: Vec::new() });
757        }
758        let mut visited: AHashSet<StrId> = AHashSet::new();
759        let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
760        for name in names {
761            if let Some(id) = self.interner.get_optional(name)
762                && visited.insert(id)
763            {
764                queue.push_back((id, 0));
765            }
766        }
767        let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
768        for (&node, edges) in &self.adjacency {
769            let nbrs: Vec<StrId> = edges.iter().map(|(to, _)| *to).collect();
770            adj.insert(node, nbrs);
771        }
772        while let Some((node, d)) = queue.pop_front() {
773            if d >= depth { continue; }
774            if let Some(nbrs) = adj.get(&node) {
775                for &nb in nbrs {
776                    if visited.insert(nb) {
777                        queue.push_back((nb, d + 1));
778                    }
779                }
780            }
781        }
782        let mut entities: Vec<Entity> = Vec::with_capacity(visited.len());
783        for &nid in &visited {
784            if let Some(e) = self.entity_by_name_id(nid) {
785                entities.push(e);
786            }
787        }
788        let relations: Vec<Relation> = self
789            .relations
790            .iter()
791            .filter(|r| visited.contains(&r.from) && visited.contains(&r.to))
792            .map(|r| self.relation_to_output(r))
793            .collect();
794        Ok(KnowledgeGraphOut { entities, relations })
795    }
796
797    /// Batch get entities.
798    pub fn batch_get_entities(&self, names: &[String]) -> Vec<Option<Entity>> {
799        names.iter().map(|n| self.get_entity(n)).collect()
800    }
801
802    /// Graph statistics.
803    pub fn graph_stats(&self) -> serde_json::Value {
804        let entity_count = self
805            .entity_slots
806            .iter()
807            .filter(|s| s.as_ref().is_some_and(|e| e.is_live()))
808            .count();
809        let relation_count = self.relations.len();
810        let type_counts = self.entity_type_counts();
811        let relation_type_counts = self.relation_type_counts();
812        serde_json::json!({
813            "entities": entity_count,
814            "relations": relation_count,
815            "entityTypes": type_counts,
816            "relationTypes": relation_type_counts,
817        })
818    }
819
820    /// Search relations by from/to/type filters.
821    pub fn search_relations(&self, from: Option<&str>, to: Option<&str>, rtype: Option<&str>) -> Vec<Relation> {
822        let from_id = from.and_then(|n| self.interner.get_optional(n));
823        let to_id = to.and_then(|n| self.interner.get_optional(n));
824        let rtype_id = rtype.and_then(|n| self.interner.get_optional(n));
825        self.relations
826            .iter()
827            .filter(|r| {
828                from_id.is_none_or(|id| r.from == id)
829                    && to_id.is_none_or(|id| r.to == id)
830                    && rtype_id.is_none_or(|id| r.relation_type == id)
831            })
832            .map(|r| self.relation_to_output(r))
833            .collect()
834    }
835
836    /// Count entities per type.
837    pub fn entity_type_counts(&self) -> Vec<(String, usize)> {
838        let mut counts: AHashMap<StrId, usize> = AHashMap::new();
839        for slot in self.entity_slots.iter() {
840            if let Some(e) = slot.as_ref().filter(|e| e.is_live()) {
841                *counts.entry(e.entity_type).or_default() += 1;
842            }
843        }
844        let mut result: Vec<(String, usize)> = counts
845            .into_iter()
846            .map(|(id, c)| (self.interner.lookup(id).to_string(), c))
847            .collect();
848        result.sort_by(|a, b| a.0.cmp(&b.0));
849        result
850    }
851
852    /// Count relations per type.
853    pub fn relation_type_counts(&self) -> Vec<(String, usize)> {
854        let mut counts: AHashMap<StrId, usize> = AHashMap::new();
855        for r in self.relations.iter() {
856            *counts.entry(r.relation_type).or_default() += 1;
857        }
858        let mut result: Vec<(String, usize)> = counts
859            .into_iter()
860            .map(|(id, c)| (self.interner.lookup(id).to_string(), c))
861            .collect();
862        result.sort_by(|a, b| a.0.cmp(&b.0));
863        result
864    }
865
866    /// Export the graph in one of: json, mermaid, dot.
867    pub fn export(&self, format: &str) -> Result<String> {
868        match format {
869            "json" => serde_json::to_string(&self.read_graph()).map_err(MCSError::JsonError),
870            "mermaid" => Ok(self.export_mermaid()),
871            "dot" => Ok(self.export_dot()),
872            other => Err(MCSError::InvalidParams(format!(
873                "Unknown export format '{other}' (expected json|mermaid|dot)"
874            ))),
875        }
876    }
877
878    fn export_mermaid(&self) -> String {
879        let mut out = String::with_capacity(4096);
880        out.push_str("graph LR\n");
881        for r in self.relations.iter() {
882            let from = sanitize_label(self.interner.lookup(r.from));
883            let to = sanitize_label(self.interner.lookup(r.to));
884            let rt = sanitize_label(self.interner.lookup(r.relation_type));
885            out.push_str(&format!("    {} -- \"{}\" --> {}\n", from, rt, to));
886        }
887        out
888    }
889
890    fn export_dot(&self) -> String {
891        let mut out = String::with_capacity(4096);
892        out.push_str("digraph KG {\n");
893        out.push_str("    rankdir=LR;\n");
894        for slot in self.entity_slots.iter() {
895            if let Some(e) = slot.as_ref().filter(|e| e.is_live()) {
896                let name = sanitize_label(self.interner.lookup(e.name));
897                let etype = sanitize_label(self.interner.lookup(e.entity_type));
898                out.push_str(&format!("    \"{}\" [label=\"{}\n({})\"];\n", name, name, etype));
899            }
900        }
901        for r in self.relations.iter() {
902            let from = sanitize_label(self.interner.lookup(r.from));
903            let to = sanitize_label(self.interner.lookup(r.to));
904            let rt = sanitize_label(self.interner.lookup(r.relation_type));
905            out.push_str(&format!("    \"{}\" -> \"{}\" [label=\"{}\"];\n", from, to, rt));
906        }
907        out.push_str("}\n");
908        out
909    }
910
911    /// Find all paths between two entities.
912    pub fn find_all_paths(
913        &self,
914        from: &str,
915        to: &str,
916        max_depth: usize,
917        max_paths: usize,
918    ) -> Result<Vec<Vec<String>>> {
919        let from_id = self
920            .interner
921            .get_optional(from)
922            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
923        let to_id = self
924            .interner
925            .get_optional(to)
926            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
927        if self.lookup_live_slot(from).is_none() {
928            return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
929        }
930        if self.lookup_live_slot(to).is_none() {
931            return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
932        }
933        if from_id == to_id {
934            return Ok(vec![vec![from.to_string()]]);
935        }
936        let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::with_capacity(self.adjacency.len());
937        for (&node, edges) in &self.adjacency {
938            let nbrs: Vec<StrId> = edges.iter().map(|(to, _)| *to).collect();
939            adj.insert(node, nbrs);
940        }
941        let mut all_paths: Vec<Vec<StrId>> = Vec::new();
942        let mut current_path = Vec::new();
943        let mut visited: AHashSet<StrId> = AHashSet::new();
944        visited.insert(from_id);
945        current_path.push(from_id);
946        Self::dfs_all_paths(
947            &adj,
948            from_id,
949            to_id,
950            max_depth,
951            max_paths,
952            &mut visited,
953            &mut current_path,
954            &mut all_paths,
955        );
956        if all_paths.is_empty() {
957            return Err(MCSError::MemoryError(format!(
958                "No path found between '{from}' and '{to}'"
959            )));
960        }
961        let result: Vec<Vec<String>> = all_paths
962            .into_iter()
963            .map(|path| {
964                path.into_iter()
965                    .map(|id| self.interner.lookup(id).to_string())
966                    .collect()
967            })
968            .collect();
969        Ok(result)
970    }
971
972    fn dfs_all_paths(
973        adj: &AHashMap<StrId, Vec<StrId>>,
974        current: StrId,
975        target: StrId,
976        max_depth: usize,
977        max_paths: usize,
978        visited: &mut AHashSet<StrId>,
979        current_path: &mut Vec<StrId>,
980        all_paths: &mut Vec<Vec<StrId>>,
981    ) {
982        if all_paths.len() >= max_paths { return; }
983        if current == target && current_path.len() > 1 {
984            all_paths.push(current_path.clone());
985            return;
986        }
987        if current_path.len() > max_depth { return; }
988        if let Some(neighbors) = adj.get(&current) {
989            for &nb in neighbors {
990                if !visited.contains(&nb) {
991                    visited.insert(nb);
992                    current_path.push(nb);
993                    Self::dfs_all_paths(adj, nb, target, max_depth, max_paths, visited, current_path, all_paths);
994                    current_path.pop();
995                    visited.remove(&nb);
996                }
997            }
998        }
999    }
1000}
1001
1002impl KnowledgeGraph {
1003    pub fn new(path: &Path) -> std::io::Result<Self> {
1004        let store = BinaryStore::new(path)?;
1005
1006        // Replay into local collections, then assign into self — no raw pointers needed (X3).
1007        let mut interner = StringInterner::with_capacity(65536, 1024);
1008        let mut entity_slots: Vec<Option<StoredEntity>> = Vec::with_capacity(256);
1009        let mut name_table = ShardedNameTable::new(64);
1010        let mut relations: Vec<StoredRelation> = Vec::with_capacity(64);
1011        let mut search = SearchIndex::new();
1012
1013        // Transaction buffer: while `Some`, records are accumulated and only
1014        // applied on `TxnCommit`. An unclosed transaction at EOF is discarded,
1015        // which is what makes multi-record operations (e.g. `merge_entities`)
1016        // crash-atomic.
1017        let mut pending: Option<Vec<(RecordKind, Vec<u8>)>> = None;
1018        store.replay(|kind, data| {
1019            match kind {
1020                RecordKind::TxnBegin => pending = Some(Vec::new()),
1021                RecordKind::TxnCommit => {
1022                    if let Some(buffered) = pending.take() {
1023                        for (k, d) in &buffered {
1024                            Self::apply_record(
1025                                *k, d, &mut interner, &mut entity_slots, &mut search,
1026                                &mut name_table, &mut relations,
1027                            );
1028                        }
1029                    }
1030                }
1031                other => match pending.as_mut() {
1032                    Some(buffered) => buffered.push((other, data.to_vec())),
1033                    None => Self::apply_record(
1034                        other, data, &mut interner, &mut entity_slots, &mut search,
1035                        &mut name_table, &mut relations,
1036                    ),
1037                },
1038            }
1039        })?;
1040
1041        // Slots tombstoned by deletes during replay are available for reuse (M2).
1042        let free_slots: Vec<u32> = entity_slots
1043            .iter()
1044            .enumerate()
1045            .filter(|(_, s)| s.is_none())
1046            .map(|(i, _)| i as u32)
1047            .collect();
1048
1049        let mut adjacency: AHashMap<StrId, Vec<(StrId, StrId)>> = AHashMap::new();
1050        for rel in &relations {
1051            adjacency.entry(rel.from).or_default().push((rel.to, rel.relation_type));
1052            adjacency.entry(rel.to).or_default().push((rel.from, rel.relation_type));
1053        }
1054
1055        Ok(Self {
1056            interner,
1057            entity_slots,
1058            free_slots,
1059            name_table,
1060            relations,
1061            adjacency,
1062            search,
1063            store,
1064        })
1065    }
1066
1067    // -----------------------------------------------------------------------
1068    // Replay helpers (static to avoid borrow issues in the closure)
1069    // -----------------------------------------------------------------------
1070
1071    /// Apply one already-decoded log record to the in-memory collections.
1072    /// Shared by direct replay and by transaction commit. `TxnBegin`/`TxnCommit`
1073    /// are handled by the caller and are no-ops here.
1074    #[allow(clippy::too_many_arguments)]
1075    fn apply_record(
1076        kind: RecordKind,
1077        data: &[u8],
1078        interner: &mut StringInterner,
1079        entity_slots: &mut Vec<Option<StoredEntity>>,
1080        search: &mut SearchIndex,
1081        name_table: &mut ShardedNameTable,
1082        relations: &mut Vec<StoredRelation>,
1083    ) {
1084        match kind {
1085            RecordKind::CreateEntity => {
1086                if let Some((name, etype, obs)) = store_enc::decode_create_entity(data) {
1087                    Self::replay_create_entity(
1088                        interner, entity_slots, search, name_table, name, etype, &obs,
1089                    );
1090                }
1091            }
1092            RecordKind::CreateRelation => {
1093                if let Some((from, to, rtype)) = store_enc::decode_create_relation(data) {
1094                    let from_id = interner.intern(from);
1095                    let to_id = interner.intern(to);
1096                    let type_id = interner.intern(rtype);
1097                    relations.push(StoredRelation {
1098                        from: from_id,
1099                        to: to_id,
1100                        relation_type: type_id,
1101                    });
1102                }
1103            }
1104            RecordKind::AddObservations => {
1105                if let Some((name, obs)) = store_enc::decode_add_observations(data) {
1106                    Self::replay_add_observations(
1107                        interner, entity_slots, search, name_table, name, &obs,
1108                    );
1109                }
1110            }
1111            RecordKind::DeleteEntity => {
1112                if let Some(name) = store_enc::decode_delete_entity(data) {
1113                    Self::replay_delete_entity(
1114                        interner, entity_slots, relations, search, name_table, name,
1115                    );
1116                }
1117            }
1118            RecordKind::DeleteObservations => {
1119                if let Some((name, obs)) = store_enc::decode_delete_observations(data) {
1120                    Self::replay_delete_observations(
1121                        interner, entity_slots, search, name_table, name, &obs,
1122                    );
1123                }
1124            }
1125            RecordKind::DeleteRelation => {
1126                if let Some((from, to, rtype)) = store_enc::decode_delete_relation(data) {
1127                    let from_id = interner.intern(from);
1128                    let to_id = interner.intern(to);
1129                    let type_id = interner.intern(rtype);
1130                    relations.retain(|r| {
1131                        !(r.from == from_id && r.to == to_id && r.relation_type == type_id)
1132                    });
1133                }
1134            }
1135            RecordKind::TxnBegin | RecordKind::TxnCommit => {}
1136        }
1137    }
1138
1139    #[allow(clippy::ptr_arg)]
1140    fn replay_create_entity(
1141        interner: &mut StringInterner,
1142        entities: &mut Vec<Option<StoredEntity>>,
1143        search: &mut SearchIndex,
1144        name_table: &mut ShardedNameTable,
1145        name: &str,
1146        etype: &str,
1147        observations: &[&str],
1148    ) {
1149        let name_id = interner.intern(name);
1150        let type_id = interner.intern(etype);
1151        let obs_ids: Vec<StrId> = observations.iter().map(|o| interner.intern(o)).collect();
1152        let slot = entities.len() as u32;
1153        entities.push(Some(StoredEntity {
1154            state: ENTITY_SLOT_LIVE,
1155            name: name_id,
1156            entity_type: type_id,
1157            observations: obs_ids.clone(),
1158        }));
1159        let hash = interner.get_hash(name_id);
1160        name_table.insert(&*interner, hash, name_id, slot);
1161        search.index_entity(interner, slot, name_id, type_id, &obs_ids);
1162    }
1163
1164    fn replay_add_observations(
1165        interner: &mut StringInterner,
1166        entities: &mut [Option<StoredEntity>],
1167        search: &mut SearchIndex,
1168        name_table: &mut ShardedNameTable,
1169        name: &str,
1170        observations: &[&str],
1171    ) {
1172        let name_id = interner.intern(name);
1173        let hash = interner.get_hash(name_id);
1174        if let Some(slot) = name_table.lookup(hash, name_id)
1175            && let Some(Some(entity)) = entities.get_mut(slot as usize)
1176        {
1177            for &o in observations {
1178                let oid = interner.intern(o);
1179                if !entity.observations.contains(&oid) {
1180                    entity.observations.push(oid);
1181                }
1182            }
1183            search.remove_entity(slot);
1184            search.index_entity(
1185                interner,
1186                slot,
1187                entity.name,
1188                entity.entity_type,
1189                &entity.observations,
1190            );
1191        }
1192    }
1193
1194    fn replay_delete_entity(
1195        interner: &mut StringInterner,
1196        entities: &mut [Option<StoredEntity>],
1197        rels: &mut Vec<StoredRelation>,
1198        search: &mut SearchIndex,
1199        name_table: &mut ShardedNameTable,
1200        name: &str,
1201    ) {
1202        let name_id = interner.intern(name);
1203        let hash = interner.get_hash(name_id);
1204        if let Some(slot) = name_table.lookup(hash, name_id)
1205            && let Some(Some(_)) = entities.get(slot as usize)
1206        {
1207            entities[slot as usize] = None;
1208            search.remove_entity(slot);
1209            name_table.remove(&*interner, hash, name_id);
1210        }
1211        rels.retain(|r| r.from != name_id && r.to != name_id);
1212    }
1213
1214    fn replay_delete_observations(
1215        interner: &mut StringInterner,
1216        entities: &mut [Option<StoredEntity>],
1217        search: &mut SearchIndex,
1218        name_table: &mut ShardedNameTable,
1219        name: &str,
1220        observations: &[&str],
1221    ) {
1222        let name_id = interner.intern(name);
1223        let hash = interner.get_hash(name_id);
1224        if let Some(slot) = name_table.lookup(hash, name_id)
1225            && let Some(Some(entity)) = entities.get_mut(slot as usize)
1226        {
1227            let remove_ids: Vec<StrId> = observations.iter().map(|o| interner.intern(o)).collect();
1228            entity.observations.retain(|o| !remove_ids.contains(o));
1229            search.remove_entity(slot);
1230            search.index_entity(
1231                interner,
1232                slot,
1233                entity.name,
1234                entity.entity_type,
1235                &entity.observations,
1236            );
1237        }
1238    }
1239
1240    // -----------------------------------------------------------------------
1241    // Public API
1242    // -----------------------------------------------------------------------
1243
1244    pub const fn interner(&self) -> &StringInterner {
1245        &self.interner
1246    }
1247
1248    /// Return a single entity by exact name match.
1249    pub fn get_entity(&self, name: &str) -> Option<Entity> {
1250        let name_id = self.interner.get_optional(name)?;
1251        let hash = self.interner.get_hash(name_id);
1252        let slot = self.name_table.lookup(hash, name_id)?;
1253        let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
1254        if !stored.is_live() {
1255            return None;
1256        }
1257        Some(self.entity_to_output(stored))
1258    }
1259
1260    /// Return aggregate statistics about the graph.
1261    pub fn graph_stats(&self) -> serde_json::Value {
1262        let live_entities = self
1263            .entity_slots
1264            .iter()
1265            .filter(|s| s.as_ref().is_some_and(|e| e.is_live()))
1266            .count();
1267        let total_relations = self.relations.len();
1268        let index_entries = self.search.len();
1269        let total_obs: usize = self
1270            .entity_slots
1271            .iter()
1272            .filter_map(|s| s.as_ref())
1273            .filter(|e| e.is_live())
1274            .map(|e| e.observations.len())
1275            .sum();
1276
1277        serde_json::json!({
1278            "entities": live_entities,
1279            "relations": total_relations,
1280            "totalObservations": total_obs,
1281            "searchIndexEntries": index_entries,
1282            "internedStrings": self.interner.len(),
1283            "internedBytes": self.interner.total_bytes(),
1284        })
1285    }
1286
1287    /// Search relations by optional filters: `from`, `to`, `relationType`.
1288    /// Any filter that is absent matches everything. A filter value that does
1289    /// not exist in the graph returns empty results.
1290    pub fn search_relations(&self, from: Option<&str>, to: Option<&str>, rtype: Option<&str>) -> Vec<Relation> {
1291        let from_id = match from {
1292            Some(f) => match self.interner.get_optional(f) {
1293                Some(id) => Some(id),
1294                None => return Vec::new(),
1295            },
1296            None => None,
1297        };
1298        let to_id = match to {
1299            Some(t) => match self.interner.get_optional(t) {
1300                Some(id) => Some(id),
1301                None => return Vec::new(),
1302            },
1303            None => None,
1304        };
1305        let rtype_id = match rtype {
1306            Some(r) => match self.interner.get_optional(r) {
1307                Some(id) => Some(id),
1308                None => return Vec::new(),
1309            },
1310            None => None,
1311        };
1312
1313        self.relations
1314            .iter()
1315            .filter(|r| {
1316                from_id.is_none_or(|f| r.from == f)
1317                    && to_id.is_none_or(|t| r.to == t)
1318                    && rtype_id.is_none_or(|rt| r.relation_type == rt)
1319            })
1320            .map(|r| Relation {
1321                from: self.interner.lookup(r.from).to_string(),
1322                to: self.interner.lookup(r.to).to_string(),
1323                relation_type: self.interner.lookup(r.relation_type).to_string(),
1324            })
1325            .collect()
1326    }
1327
1328    /// BFS shortest-path between two entity names. Returns the sequence of
1329    /// entity names along the path (inclusive of both endpoints).
1330    pub fn find_path(&self, from: &str, to: &str) -> Result<Vec<String>> {
1331        let from_id = self.interner.get_optional(from)
1332            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
1333        let to_id = self.interner.get_optional(to)
1334            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
1335        let hash_from = self.interner.get_hash(from_id);
1336        let hash_to = self.interner.get_hash(to_id);
1337
1338        if self.name_table.lookup(hash_from, from_id).is_none() {
1339            return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
1340        }
1341        if self.name_table.lookup(hash_to, to_id).is_none() {
1342            return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
1343        }
1344        if from_id == to_id {
1345            return Ok(vec![from.to_string()]);
1346        }
1347
1348        // Use incremental adjacency index — O(degree) per hop, no rebuild.
1349        let mut visited: AHashSet<StrId> = AHashSet::new();
1350        let mut parent: AHashMap<StrId, StrId> = AHashMap::new();
1351        let mut queue: VecDeque<StrId> = VecDeque::new();
1352
1353        visited.insert(from_id);
1354        queue.push_back(from_id);
1355
1356        while let Some(current) = queue.pop_front() {
1357            if current == to_id {
1358                break;
1359            }
1360
1361            if let Some(neighbors) = self.adjacency.get(&current) {
1362                for &(neighbor, _) in neighbors {
1363                    if visited.insert(neighbor) {
1364                        parent.insert(neighbor, current);
1365                        queue.push_back(neighbor);
1366                    }
1367                }
1368            }
1369        }
1370
1371        if !parent.contains_key(&to_id) && from_id != to_id {
1372            return Err(MCSError::MemoryError(format!(
1373                "No path found between '{from}' and '{to}'"
1374            )));
1375        }
1376
1377        // Reconstruct path
1378        let mut path: Vec<String> = Vec::new();
1379        let mut cur = to_id;
1380        loop {
1381            path.push(self.interner.lookup(cur).to_string());
1382            if cur == from_id {
1383                break;
1384            }
1385            cur = *parent.get(&cur).ok_or_else(|| {
1386                MCSError::MemoryError("Path reconstruction failed".into())
1387            })?;
1388        }
1389        path.reverse();
1390        Ok(path)
1391    }
1392
1393    /// Rewrite the binary log from the current in-memory state.
1394    /// After compaction the log contains only the minimal set of records
1395    /// needed to reconstruct the graph (all creates, no deletes).
1396    /// Crash-safe: writes to a temp file, then atomically renames (C3).
1397    pub fn compact(&mut self) -> Result<()> {
1398        // 1. Collect current state as create-records
1399        let mut create_entities: Vec<Entity> = Vec::new();
1400        let mut create_relations: Vec<Relation> = Vec::new();
1401
1402        for slot in &self.entity_slots {
1403            if let Some(stored) = slot.as_ref().filter(|e| e.is_live()) {
1404                create_entities.push(self.entity_to_output(stored));
1405            }
1406        }
1407        for rel in &self.relations {
1408            create_relations.push(Relation {
1409                from: self.interner.lookup(rel.from).to_string(),
1410                to: self.interner.lookup(rel.to).to_string(),
1411                relation_type: self.interner.lookup(rel.relation_type).to_string(),
1412            });
1413        }
1414
1415        // 2. Write to a temp file first.
1416        //    Remove any stale temp left by a previously-interrupted compact:
1417        //    `BinaryStore::new` opens in append mode and only writes the MAGIC
1418        //    header when the file does not already exist, so appending to a
1419        //    leftover temp would produce a duplicated, header-corrupted log
1420        //    once renamed over the real one (C1).
1421        let tmp_path = self.store.path().with_extension("tmp");
1422        if let Err(e) = std::fs::remove_file(&tmp_path)
1423            && e.kind() != std::io::ErrorKind::NotFound
1424        {
1425            return Err(MCSError::IoError(e));
1426        }
1427        let mut tmp_store = BinaryStore::new(&tmp_path).map_err(MCSError::IoError)?;
1428        for entity in &create_entities {
1429            let mut buf = Vec::new();
1430            store_enc::encode_create_entity(&mut buf, &entity.name, &entity.entity_type, &entity.observations)
1431                .map_err(MCSError::IoError)?;
1432            tmp_store.write_record(RecordKind::CreateEntity, &buf).map_err(MCSError::IoError)?;
1433        }
1434        for relation in &create_relations {
1435            let mut buf = Vec::new();
1436            store_enc::encode_create_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
1437                .map_err(MCSError::IoError)?;
1438            tmp_store.write_record(RecordKind::CreateRelation, &buf).map_err(MCSError::IoError)?;
1439        }
1440        tmp_store.flush_and_sync().map_err(MCSError::IoError)?;
1441        drop(tmp_store);
1442
1443        // 3. Atomically rename over the original (atomic on POSIX), then fsync
1444        //    the containing directory so the rename itself is durable across a
1445        //    crash — content swap is atomic, but the directory entry update is
1446        //    not durable until the dir is synced (C2).
1447        std::fs::rename(&tmp_path, self.store.path()).map_err(MCSError::IoError)?;
1448        sync_parent_dir(self.store.path()).map_err(MCSError::IoError)?;
1449
1450        // 4. Rebuild the entire in-memory graph from the compacted log (M1/M2).
1451        //    Replaying into fresh structures reclaims the interner arena (stale
1452        //    strings from deleted/edited entities), tombstoned entity slots, and
1453        //    stale search-index entries — none of which the old reopen-only path
1454        //    reclaimed.
1455        let path = self.store.path().clone();
1456        *self = KnowledgeGraph::new(&path).map_err(MCSError::IoError)?;
1457
1458        Ok(())
1459    }
1460
1461    // ---- Public API with write-ahead log (C1) and error propagation ----
1462
1463    pub fn create_entities(&mut self, entities: &[Entity]) -> Result<Vec<Entity>> {
1464        // Validate up front so an invalid entity never produces partial writes.
1465        for entity in entities {
1466            if entity.name.is_empty() {
1467                return Err(MCSError::InvalidParams(
1468                    "Entity name must not be empty".into(),
1469                ));
1470            }
1471        }
1472        let mut created = Vec::new();
1473        for entity in entities {
1474            // Check dedup before writing (using non-interning lookup)
1475            let existing = self.interner.get_optional(&entity.name)
1476                .and_then(|id| {
1477                    let hash = self.interner.get_hash(id);
1478                    self.name_table.lookup(hash, id)
1479                });
1480            if existing.is_some() {
1481                continue;
1482            }
1483            // Write-ahead: encode and log before mutating state
1484            let mut buf = Vec::new();
1485            store_enc::encode_create_entity(&mut buf, &entity.name, &entity.entity_type, &entity.observations)
1486                .map_err(MCSError::IoError)?;
1487            self.store.write_record(RecordKind::CreateEntity, &buf)
1488                .map_err(MCSError::IoError)?;
1489
1490            let name_id = self.interner.intern(&entity.name);
1491            let hash = self.interner.get_hash(name_id);
1492            let type_id = self.interner.intern(&entity.entity_type);
1493            let obs_ids: Vec<StrId> = entity
1494                .observations
1495                .iter()
1496                .map(|o| self.interner.intern(o))
1497                .collect();
1498            // Reuse a tombstoned slot if one is free (M2); its old search-index
1499            // entries were cleared on delete, so the slot starts clean.
1500            let reused = self.free_slots.pop();
1501            let slot = reused.unwrap_or(self.entity_slots.len() as u32);
1502            self.search
1503                .index_entity(&mut self.interner, slot, name_id, type_id, &obs_ids);
1504            let stored = Some(StoredEntity {
1505                state: ENTITY_SLOT_LIVE,
1506                name: name_id,
1507                entity_type: type_id,
1508                observations: obs_ids,
1509            });
1510            match reused {
1511                Some(s) => self.entity_slots[s as usize] = stored,
1512                None => self.entity_slots.push(stored),
1513            }
1514            self.name_table.insert(&self.interner, hash, name_id, slot);
1515            created.push(Entity {
1516                name: entity.name.clone(),
1517                entity_type: entity.entity_type.clone(),
1518                observations: entity.observations.clone(),
1519            });
1520        }
1521        Ok(created)
1522    }
1523
1524    pub fn create_relations(&mut self, relations: &[Relation]) -> Result<Vec<Relation>> {
1525        // Validate up front so an invalid relation never produces partial writes.
1526        for relation in relations {
1527            if relation.from.is_empty() || relation.to.is_empty() {
1528                return Err(MCSError::InvalidParams(
1529                    "Relation endpoints must not be empty".into(),
1530                ));
1531            }
1532        }
1533        let mut created = Vec::new();
1534        // Build a dedup set for O(1) duplicate checks (P5)
1535        let mut rel_set: AHashSet<(StrId, StrId, StrId)> = AHashSet::new();
1536        for rel in &self.relations {
1537            rel_set.insert((rel.from, rel.to, rel.relation_type));
1538        }
1539        for relation in relations {
1540            let from_id = self.interner.intern(&relation.from);
1541            let to_id = self.interner.intern(&relation.to);
1542            let type_id = self.interner.intern(&relation.relation_type);
1543            if !rel_set.insert((from_id, to_id, type_id)) {
1544                continue;
1545            }
1546            // Write-ahead: log before mutation
1547            let mut buf = Vec::new();
1548            store_enc::encode_create_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
1549                .map_err(MCSError::IoError)?;
1550            self.store.write_record(RecordKind::CreateRelation, &buf)
1551                .map_err(MCSError::IoError)?;
1552
1553            self.relations.push(StoredRelation {
1554                from: from_id,
1555                to: to_id,
1556                relation_type: type_id,
1557            });
1558            self.adjacency.entry(from_id).or_default().push((to_id, type_id));
1559            self.adjacency.entry(to_id).or_default().push((from_id, type_id));
1560            created.push(Relation {
1561                from: relation.from.clone(),
1562                to: relation.to.clone(),
1563                relation_type: relation.relation_type.clone(),
1564            });
1565        }
1566        Ok(created)
1567    }
1568
1569    pub fn add_observations(&mut self, entity_name: &str, contents: &[String]) -> Result<Vec<String>> {
1570        let name_id = self.interner.get_optional(entity_name)
1571            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1572        let hash = self.interner.get_hash(name_id);
1573        let slot = self
1574            .name_table
1575            .lookup(hash, name_id)
1576            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1577        // Snapshot the current observations so we can compute the deduplicated
1578        // additions *without* mutating in-memory state yet.
1579        let existing: AHashSet<StrId> = self
1580            .entity_slots
1581            .get(slot as usize)
1582            .and_then(|e| e.as_ref())
1583            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?
1584            .observations
1585            .iter()
1586            .copied()
1587            .collect();
1588
1589        // Deduplicate against existing observations *and* within this batch, so
1590        // the live result matches what replay (which dedups one-by-one) rebuilds.
1591        let mut added = Vec::new();
1592        let mut interned_added = Vec::new();
1593        let mut seen: AHashSet<StrId> = AHashSet::new();
1594        for content in contents {
1595            let cid = self.interner.intern(content);
1596            if existing.contains(&cid) || !seen.insert(cid) {
1597                continue;
1598            }
1599            interned_added.push(cid);
1600            added.push(content.clone());
1601        }
1602        if added.is_empty() {
1603            return Ok(added);
1604        }
1605
1606        // Write-ahead: the record must hit the log *before* any in-memory
1607        // mutation, so a failed write leaves memory and disk in agreement (C3).
1608        let mut buf = Vec::new();
1609        store_enc::encode_add_observations(&mut buf, entity_name, &added)
1610            .map_err(MCSError::IoError)?;
1611        self.store.write_record(RecordKind::AddObservations, &buf)
1612            .map_err(MCSError::IoError)?;
1613
1614        // Logged — now apply to in-memory state.
1615        let stored = self
1616            .entity_slots
1617            .get_mut(slot as usize)
1618            .and_then(|e| e.as_mut())
1619            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1620        stored.observations.extend_from_slice(&interned_added);
1621
1622        // Incrementally index only the new observation tokens (P3) — no
1623        // full remove + re-index of the whole entity.
1624        self.search
1625            .index_additional(&mut self.interner, slot, &interned_added);
1626        Ok(added)
1627    }
1628
1629    pub fn delete_entities(&mut self, entity_names: &[String]) -> Result<()> {
1630        let mut deleted_names = Vec::new();
1631        for name in entity_names {
1632            let name_id_opt = self.interner.get_optional(name);
1633            if let Some(name_id) = name_id_opt {
1634                let hash = self.interner.get_hash(name_id);
1635                if let Some(slot) = self.name_table.lookup(hash, name_id)
1636                    && let Some(Some(_)) = self.entity_slots.get(slot as usize)
1637                {
1638                    // Write-ahead: log before mutation
1639                    let mut buf = Vec::new();
1640                    store_enc::encode_delete_entity(&mut buf, name)
1641                        .map_err(MCSError::IoError)?;
1642                    self.store.write_record(RecordKind::DeleteEntity, &buf)
1643                        .map_err(MCSError::IoError)?;
1644
1645                    self.entity_slots[slot as usize] = None;
1646                    self.free_slots.push(slot);
1647                    self.search.remove_entity(slot);
1648                    self.name_table.remove(&self.interner, hash, name_id);
1649                    deleted_names.push(name.clone());
1650                }
1651            }
1652        }
1653        if !deleted_names.is_empty() {
1654            // Use a AHashSet for O(1) retain checks (P5)
1655            let deleted_ids: AHashSet<StrId> = deleted_names.iter()
1656                .map(|n| self.interner.intern(n))
1657                .collect();
1658            self.relations
1659                .retain(|r| !deleted_ids.contains(&r.from) && !deleted_ids.contains(&r.to));
1660            // Clean adjacency index
1661            for id in &deleted_ids {
1662                self.adjacency.remove(id);
1663                // Remove references from other entities' adjacency lists
1664                for list in self.adjacency.values_mut() {
1665                    list.retain(|(to, _)| !deleted_ids.contains(to));
1666                }
1667            }
1668        }
1669        Ok(())
1670    }
1671
1672    pub fn delete_observations(&mut self, entity_name: &str, observations: &[String]) -> Result<()> {
1673        let name_id = self.interner.get_optional(entity_name)
1674            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1675        let hash = self.interner.get_hash(name_id);
1676        let slot = self
1677            .name_table
1678            .lookup(hash, name_id)
1679            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1680        // Confirm the slot is live before logging.
1681        self.entity_slots
1682            .get(slot as usize)
1683            .and_then(|e| e.as_ref())
1684            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1685        let remove_ids: AHashSet<StrId> = observations.iter().map(|o| self.interner.intern(o)).collect();
1686
1687        // Write-ahead: log before touching in-memory state (C3).
1688        let mut buf = Vec::new();
1689        store_enc::encode_delete_observations(&mut buf, entity_name, observations)
1690            .map_err(MCSError::IoError)?;
1691        self.store.write_record(RecordKind::DeleteObservations, &buf)
1692            .map_err(MCSError::IoError)?;
1693
1694        // Logged — now apply.
1695        let stored = self
1696            .entity_slots
1697            .get_mut(slot as usize)
1698            .and_then(|e| e.as_mut())
1699            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1700        stored.observations.retain(|o| !remove_ids.contains(o));
1701        self.search.remove_entity(slot);
1702        self.search
1703            .index_entity(&mut self.interner, slot, stored.name, stored.entity_type, &stored.observations);
1704        Ok(())
1705    }
1706
1707    pub fn delete_relations(&mut self, relations: &[Relation]) -> Result<()> {
1708        // Collect targets into a AHashSet for O(1) retain checks (P5)
1709        let rels: AHashSet<(StrId, StrId, StrId)> = relations
1710            .iter()
1711            .map(|r| {
1712                (
1713                    self.interner.intern(&r.from),
1714                    self.interner.intern(&r.to),
1715                    self.interner.intern(&r.relation_type),
1716                )
1717            })
1718            .collect();
1719        // Write-ahead: log every deletion before mutating in-memory state (C3),
1720        // so a failed write can't leave memory ahead of the log.
1721        for relation in relations {
1722            let mut buf = Vec::new();
1723            store_enc::encode_delete_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
1724                .map_err(MCSError::IoError)?;
1725            self.store.write_record(RecordKind::DeleteRelation, &buf)
1726                .map_err(MCSError::IoError)?;
1727        }
1728        self.relations
1729            .retain(|r| !rels.contains(&(r.from, r.to, r.relation_type)));
1730        // Clean adjacency index
1731        for (f, t, rt) in &rels {
1732            if let Some(edges) = self.adjacency.get_mut(f) {
1733                edges.retain(|(to, rtype)| to != t || rtype != rt);
1734                if edges.is_empty() {
1735                    self.adjacency.remove(f);
1736                }
1737            }
1738            if let Some(edges) = self.adjacency.get_mut(t) {
1739                edges.retain(|(to, rtype)| to != f || rtype != rt);
1740                if edges.is_empty() {
1741                    self.adjacency.remove(t);
1742                }
1743            }
1744        }
1745        Ok(())
1746    }
1747
1748    pub fn read_graph(&self) -> KnowledgeGraphOut {
1749        self.read_graph_view().to_owned_out()
1750    }
1751
1752    /// Borrowing, allocation-light view of the full graph (M6). Serializing it
1753    /// streams interned `&str` directly instead of materializing a `String`
1754    /// per name/type/observation.
1755    pub fn read_graph_view(&self) -> GraphView<'_> {
1756        let entities: Vec<&StoredEntity> = self
1757            .entity_slots
1758            .iter()
1759            .filter_map(|s| s.as_ref().filter(|e| e.is_live()))
1760            .collect();
1761        let relations: Vec<&StoredRelation> = self.relations.iter().collect();
1762        GraphView { kg: self, entities, relations }
1763    }
1764
1765    /// Relevance-ranked substring search returning all matches (no pagination).
1766    /// Equivalent to `search_nodes_filtered(query, None, 0, usize::MAX)`.
1767    pub fn search_nodes(&self, query: &str) -> KnowledgeGraphOut {
1768        self.search_nodes_filtered(query, None, 0, usize::MAX)
1769    }
1770
1771    pub fn open_nodes(&self, names: &[String]) -> KnowledgeGraphOut {
1772        self.open_nodes_view(names).to_owned_out()
1773    }
1774
1775    /// Borrowing view variant of [`open_nodes`] (M6).
1776    pub fn open_nodes_view(&self, names: &[String]) -> GraphView<'_> {
1777        let name_ids: AHashSet<StrId> = names.iter()
1778            .filter_map(|n| self.interner.get_optional(n))
1779            .collect();
1780        let entities: Vec<&StoredEntity> = self
1781            .entity_slots
1782            .iter()
1783            .filter_map(|s| {
1784                s.as_ref()
1785                    .filter(|stored| stored.is_live() && name_ids.contains(&stored.name))
1786            })
1787            .collect();
1788        let matched_names: AHashSet<StrId> = entities.iter().map(|e| e.name).collect();
1789        let relations: Vec<&StoredRelation> = self
1790            .relations
1791            .iter()
1792            .filter(|r| matched_names.contains(&r.from) || matched_names.contains(&r.to))
1793            .collect();
1794        GraphView { kg: self, entities, relations }
1795    }
1796
1797    // -----------------------------------------------------------------------
1798    // Internal helpers
1799    // -----------------------------------------------------------------------
1800
1801    fn entity_to_output(&self, stored: &StoredEntity) -> Entity {
1802        Entity {
1803            name: self.interner.lookup(stored.name).to_string(),
1804            entity_type: self.interner.lookup(stored.entity_type).to_string(),
1805            observations: stored
1806                .observations
1807                .iter()
1808                .map(|o| self.interner.lookup(*o).to_string())
1809                .collect(),
1810        }
1811    }
1812
1813    #[inline]
1814    fn relation_to_output(&self, r: &StoredRelation) -> Relation {
1815        Relation {
1816            from: self.interner.lookup(r.from).to_string(),
1817            to: self.interner.lookup(r.to).to_string(),
1818            relation_type: self.interner.lookup(r.relation_type).to_string(),
1819        }
1820    }
1821
1822    /// Resolve a name to its live entity slot, or `None` if absent/deleted.
1823    fn lookup_live_slot(&self, name: &str) -> Option<u32> {
1824        let name_id = self.interner.get_optional(name)?;
1825        let hash = self.interner.get_hash(name_id);
1826        let slot = self.name_table.lookup(hash, name_id)?;
1827        let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
1828        stored.is_live().then_some(slot)
1829    }
1830
1831    /// Materialize a live entity from its interned name id.
1832    fn entity_by_name_id(&self, name_id: StrId) -> Option<Entity> {
1833        let hash = self.interner.get_hash(name_id);
1834        let slot = self.name_table.lookup(hash, name_id)?;
1835        let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
1836        stored.is_live().then(|| self.entity_to_output(stored))
1837    }
1838
1839    /// Tally distinct entity types and their live-entity counts, ranked by
1840    /// count descending (ties broken by name). One linear pass over the dense
1841    /// slot vec; only the final names are allocated.
1842    pub fn entity_type_counts(&self) -> Vec<(String, usize)> {
1843        let mut counts: AHashMap<StrId, usize> = AHashMap::new();
1844        for st in self
1845            .entity_slots
1846            .iter()
1847            .filter_map(|s| s.as_ref())
1848            .filter(|e| e.is_live())
1849        {
1850            *counts.entry(st.entity_type).or_insert(0) += 1;
1851        }
1852        self.rank_counts(counts)
1853    }
1854
1855    /// Tally distinct relation types and their counts, ranked by count desc.
1856    pub fn relation_type_counts(&self) -> Vec<(String, usize)> {
1857        let mut counts: AHashMap<StrId, usize> = AHashMap::new();
1858        for r in &self.relations {
1859            *counts.entry(r.relation_type).or_insert(0) += 1;
1860        }
1861        self.rank_counts(counts)
1862    }
1863
1864    fn rank_counts(&self, counts: AHashMap<StrId, usize>) -> Vec<(String, usize)> {
1865        let mut out: Vec<(String, usize)> = counts
1866            .into_iter()
1867            .map(|(id, c)| (self.interner.lookup(id).to_string(), c))
1868            .collect();
1869        out.sort_unstable_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
1870        out
1871    }
1872
1873    /// Relevance-ranked, optionally type-filtered, paginated node search.
1874    /// Entities come back best-match-first (see [`SearchIndex::search_ranked`]).
1875    /// Relations touching any returned entity (either endpoint) are included.
1876    pub fn search_nodes_filtered(
1877        &self,
1878        query: &str,
1879        entity_type: Option<&str>,
1880        offset: usize,
1881        limit: usize,
1882    ) -> KnowledgeGraphOut {
1883        self.search_nodes_view(query, entity_type, offset, limit).to_owned_out()
1884    }
1885
1886    /// Borrowing view variant of [`search_nodes_filtered`] (M6).
1887    pub fn search_nodes_view(
1888        &self,
1889        query: &str,
1890        entity_type: Option<&str>,
1891        offset: usize,
1892        limit: usize,
1893    ) -> GraphView<'_> {
1894        let type_id = match entity_type {
1895            Some(t) => match self.interner.get_optional(t) {
1896                Some(id) => Some(id),
1897                None => return GraphView { kg: self, entities: Vec::new(), relations: Vec::new() },
1898            },
1899            None => None,
1900        };
1901
1902        let ranked = self.search.search_ranked(query, &self.interner);
1903        let mut selected: AHashSet<StrId> = AHashSet::new();
1904        let mut entities: Vec<&StoredEntity> = Vec::new();
1905        let mut skipped = 0usize;
1906        for (slot, _score) in ranked {
1907            let Some(st) = self
1908                .entity_slots
1909                .get(slot as usize)
1910                .and_then(|s| s.as_ref())
1911                .filter(|e| e.is_live())
1912            else {
1913                continue;
1914            };
1915            if type_id.is_some_and(|tid| st.entity_type != tid) {
1916                continue;
1917            }
1918            if skipped < offset {
1919                skipped += 1;
1920                continue;
1921            }
1922            if entities.len() >= limit {
1923                break;
1924            }
1925            selected.insert(st.name);
1926            entities.push(st);
1927        }
1928
1929        let relations: Vec<&StoredRelation> = self
1930            .relations
1931            .iter()
1932            .filter(|r| selected.contains(&r.from) || selected.contains(&r.to))
1933            .collect();
1934        GraphView { kg: self, entities, relations }
1935    }
1936
1937    /// Type-filtered, paginated view of the whole graph. Unlike [`read_graph`],
1938    /// relations are restricted to those whose **both** endpoints fall in the
1939    /// returned entity page, so the slice is internally consistent.
1940    pub fn read_graph_filtered(
1941        &self,
1942        entity_type: Option<&str>,
1943        offset: usize,
1944        limit: usize,
1945    ) -> KnowledgeGraphOut {
1946        self.read_graph_filtered_view(entity_type, offset, limit).to_owned_out()
1947    }
1948
1949    /// Borrowing view variant of [`read_graph_filtered`] (M6).
1950    pub fn read_graph_filtered_view(
1951        &self,
1952        entity_type: Option<&str>,
1953        offset: usize,
1954        limit: usize,
1955    ) -> GraphView<'_> {
1956        let type_id = match entity_type {
1957            Some(t) => match self.interner.get_optional(t) {
1958                Some(id) => Some(id),
1959                None => return GraphView { kg: self, entities: Vec::new(), relations: Vec::new() },
1960            },
1961            None => None,
1962        };
1963
1964        let mut selected: AHashSet<StrId> = AHashSet::new();
1965        let mut entities: Vec<&StoredEntity> = Vec::new();
1966        let mut skipped = 0usize;
1967        for st in self
1968            .entity_slots
1969            .iter()
1970            .filter_map(|s| s.as_ref())
1971            .filter(|e| e.is_live())
1972        {
1973            if type_id.is_some_and(|tid| st.entity_type != tid) {
1974                continue;
1975            }
1976            if skipped < offset {
1977                skipped += 1;
1978                continue;
1979            }
1980            if entities.len() >= limit {
1981                break;
1982            }
1983            selected.insert(st.name);
1984            entities.push(st);
1985        }
1986
1987        let relations: Vec<&StoredRelation> = self
1988            .relations
1989            .iter()
1990            .filter(|r| selected.contains(&r.from) && selected.contains(&r.to))
1991            .collect();
1992        GraphView { kg: self, entities, relations }
1993    }
1994
1995    /// Neighborhood expansion around `name` out to `depth` hops, following
1996    /// edges in the requested [`Direction`] and (optionally) of one relation
1997    /// type. Returns the origin plus reached entities, and every relation
1998    /// (passing the type filter) whose endpoints are both inside that set.
1999    ///
2000    /// `depth == 1` (the common case) is a single linear pass over the flat
2001    /// relation vec; deeper queries build an adjacency map once (O(E)) and BFS.
2002    pub fn neighbors(
2003        &self,
2004        name: &str,
2005        direction: Direction,
2006        rtype: Option<&str>,
2007        depth: u32,
2008    ) -> Result<KnowledgeGraphOut> {
2009        self.lookup_live_slot(name)
2010            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
2011        // Safe: lookup_live_slot succeeded, so the name is interned.
2012        let start = self.interner.get_optional(name).unwrap();
2013
2014        // An unknown relation-type filter can match nothing: return just origin.
2015        let rtype_id = match rtype {
2016            Some(r) => match self.interner.get_optional(r) {
2017                Some(id) => Some(id),
2018                None => {
2019                    let entities = self.entity_by_name_id(start).into_iter().collect();
2020                    return Ok(KnowledgeGraphOut { entities, relations: Vec::new() });
2021                }
2022            },
2023            None => None,
2024        };
2025
2026        let mut visited: AHashSet<StrId> = AHashSet::new();
2027        visited.insert(start);
2028
2029        let type_ok = |r: &StoredRelation| rtype_id.is_none_or(|rt| r.relation_type == rt);
2030
2031        if depth == 1 {
2032            for r in self.relations.iter().filter(|r| type_ok(r)) {
2033                match direction {
2034                    Direction::Out => {
2035                        if r.from == start {
2036                            visited.insert(r.to);
2037                        }
2038                    }
2039                    Direction::In => {
2040                        if r.to == start {
2041                            visited.insert(r.from);
2042                        }
2043                    }
2044                    Direction::Both => {
2045                        if r.from == start {
2046                            visited.insert(r.to);
2047                        } else if r.to == start {
2048                            visited.insert(r.from);
2049                        }
2050                    }
2051                }
2052            }
2053        } else if depth >= 2 {
2054            // Build a direction-aware adjacency map once, then BFS.
2055            // For Direction::Both we use the incremental adjacency index;
2056            // for Direction::Out/In we filter relations directly.
2057            let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
2058            match direction {
2059                Direction::Both => {
2060                    for (&node, edges) in &self.adjacency {
2061                        for &(nb, rt) in edges {
2062                            if rtype_id.is_none_or(|rt_id| rt == rt_id) {
2063                                adj.entry(node).or_default().push(nb);
2064                            }
2065                        }
2066                    }
2067                }
2068                Direction::Out | Direction::In => {
2069                    for r in self.relations.iter().filter(|r| type_ok(r)) {
2070                        match direction {
2071                            Direction::Out => adj.entry(r.from).or_default().push(r.to),
2072                            Direction::In => adj.entry(r.to).or_default().push(r.from),
2073                            _ => unreachable!(),
2074                        }
2075                    }
2076                }
2077            }
2078            let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
2079            queue.push_back((start, 0));
2080            while let Some((node, d)) = queue.pop_front() {
2081                if d >= depth {
2082                    continue;
2083                }
2084                if let Some(nbrs) = adj.get(&node) {
2085                    for &nb in nbrs {
2086                        if visited.insert(nb) {
2087                            queue.push_back((nb, d + 1));
2088                        }
2089                    }
2090                }
2091            }
2092        }
2093
2094        let mut entities = Vec::with_capacity(visited.len());
2095        for &nid in &visited {
2096            if let Some(e) = self.entity_by_name_id(nid) {
2097                entities.push(e);
2098            }
2099        }
2100        let relations = self
2101            .relations
2102            .iter()
2103            .filter(|r| type_ok(r) && visited.contains(&r.from) && visited.contains(&r.to))
2104            .map(|r| self.relation_to_output(r))
2105            .collect();
2106        Ok(KnowledgeGraphOut { entities, relations })
2107    }
2108
2109    /// One-shot context bundle for a single entity: the entity itself, every
2110    /// incident relation, its distinct neighbor names, and its degree. Saves an
2111    /// agent the get_entity + two search_relations round-trips.
2112    pub fn describe_entity(&self, name: &str) -> Result<serde_json::Value> {
2113        let name_id = self
2114            .interner
2115            .get_optional(name)
2116            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
2117        let entity = self
2118            .entity_by_name_id(name_id)
2119            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
2120
2121        let mut incident: Vec<Relation> = Vec::new();
2122        let mut neighbor_seen: AHashSet<StrId> = AHashSet::new();
2123        let mut neighbors: Vec<&str> = Vec::new();
2124        for r in &self.relations {
2125            if r.from == name_id || r.to == name_id {
2126                incident.push(self.relation_to_output(r));
2127                let other = if r.from == name_id { r.to } else { r.from };
2128                if other != name_id && neighbor_seen.insert(other) {
2129                    neighbors.push(self.interner.lookup(other));
2130                }
2131            }
2132        }
2133
2134        Ok(serde_json::json!({
2135            "entity": entity,
2136            "relations": incident,
2137            "neighbors": neighbors,
2138            "degree": incident.len(),
2139        }))
2140    }
2141
2142    /// Create-or-merge a batch of entities idempotently. Missing entities are
2143    /// created; existing ones keep their type and gain any new observations
2144    /// (deduplicated). Returns a per-entity outcome. The caller is responsible
2145    /// for flushing — every underlying op is already write-ahead logged.
2146    pub fn upsert_entities(&mut self, entities: &[Entity]) -> Result<Vec<serde_json::Value>> {
2147        for e in entities {
2148            if e.name.is_empty() {
2149                return Err(MCSError::InvalidParams(
2150                    "Entity name must not be empty".into(),
2151                ));
2152            }
2153        }
2154        let mut out = Vec::with_capacity(entities.len());
2155        for e in entities {
2156            if self.lookup_live_slot(&e.name).is_some() {
2157                let added = self.add_observations(&e.name, &e.observations)?;
2158                out.push(serde_json::json!({
2159                    "name": e.name,
2160                    "created": false,
2161                    "addedObservations": added,
2162                }));
2163            } else {
2164                let created = self.create_entities(std::slice::from_ref(e))?;
2165                out.push(serde_json::json!({
2166                    "name": e.name,
2167                    "created": !created.is_empty(),
2168                    "addedObservations": e.observations,
2169                }));
2170            }
2171        }
2172        Ok(out)
2173    }
2174
2175    /// Serialize the graph in one of: `json` (read_graph), `mermaid`, `dot`.
2176    pub fn export(&self, format: &str) -> Result<String> {
2177        match format {
2178            "json" => serde_json::to_string(&self.read_graph()).map_err(MCSError::JsonError),
2179            "mermaid" => Ok(self.export_mermaid()),
2180            "dot" => Ok(self.export_dot()),
2181            other => Err(MCSError::InvalidParams(format!(
2182                "Unknown export format '{other}' (expected json|mermaid|dot)"
2183            ))),
2184        }
2185    }
2186
2187    /// Assign each live entity a stable `n{k}` node id for diagram output.
2188    fn diagram_node_ids(&self) -> (AHashMap<StrId, usize>, Vec<(usize, StrId)>) {
2189        let mut ids: AHashMap<StrId, usize> = AHashMap::new();
2190        let mut order: Vec<(usize, StrId)> = Vec::new();
2191        for st in self
2192            .entity_slots
2193            .iter()
2194            .filter_map(|s| s.as_ref())
2195            .filter(|e| e.is_live())
2196        {
2197            let n = ids.len();
2198            ids.insert(st.name, n);
2199            order.push((n, st.name));
2200        }
2201        (ids, order)
2202    }
2203
2204    fn export_mermaid(&self) -> String {
2205        let (ids, order) = self.diagram_node_ids();
2206        let mut s = String::with_capacity(64 + order.len() * 32 + self.relations.len() * 32);
2207        s.push_str("graph LR\n");
2208        for (n, name_id) in &order {
2209            let label = sanitize_label(self.interner.lookup(*name_id));
2210            s.push_str(&format!("  n{n}[\"{label}\"]\n"));
2211        }
2212        for r in &self.relations {
2213            if let (Some(&a), Some(&b)) = (ids.get(&r.from), ids.get(&r.to)) {
2214                let rel = sanitize_label(self.interner.lookup(r.relation_type));
2215                s.push_str(&format!("  n{a} -->|{rel}| n{b}\n"));
2216            }
2217        }
2218        s
2219    }
2220
2221    fn export_dot(&self) -> String {
2222        let (ids, order) = self.diagram_node_ids();
2223        let mut s = String::with_capacity(64 + order.len() * 32 + self.relations.len() * 32);
2224        s.push_str("digraph G {\n");
2225        for (n, name_id) in &order {
2226            let label = sanitize_label(self.interner.lookup(*name_id));
2227            s.push_str(&format!("  n{n} [label=\"{label}\"];\n"));
2228        }
2229        for r in &self.relations {
2230            if let (Some(&a), Some(&b)) = (ids.get(&r.from), ids.get(&r.to)) {
2231                let rel = sanitize_label(self.interner.lookup(r.relation_type));
2232                s.push_str(&format!("  n{a} -> n{b} [label=\"{rel}\"];\n"));
2233            }
2234        }
2235        s.push_str("}\n");
2236        s
2237    }
2238
2239    // ------ High-level productivity tools ------
2240
2241    /// Merge `source` entity into `target` entity. All observations from
2242    /// source are moved to target (deduplicated), all relations involving
2243    /// source are redirected to target (deduplicated), and source is then
2244    /// deleted.
2245    ///
2246    /// The whole merge is **atomic**: every sub-record is written to the log
2247    /// inside a single `TxnBegin`…`TxnCommit` transaction *before* any in-memory
2248    /// mutation. A failed or torn write therefore leaves both memory and the
2249    /// log untouched (an uncommitted transaction is discarded on replay), so the
2250    /// graph can never observe a half-applied merge. Caller flushes.
2251    pub fn merge_entities(&mut self, source: &str, target: &str) -> Result<serde_json::Value> {
2252        if source == target {
2253            return Err(MCSError::InvalidParams(
2254                "Source and target must be different entities".into(),
2255            ));
2256        }
2257        self.lookup_live_slot(source).ok_or_else(|| {
2258            MCSError::InvalidParams(format!("Source entity '{source}' not found"))
2259        })?;
2260        let target_slot = self.lookup_live_slot(target).ok_or_else(|| {
2261            MCSError::InvalidParams(format!("Target entity '{target}' not found"))
2262        })?;
2263
2264        let source_entity = self.get_entity(source).unwrap();
2265        let moved_obs_count = source_entity.observations.len();
2266        let source_id = self.interner.get_optional(source).unwrap();
2267        let target_id = self.interner.get_optional(target).unwrap();
2268
2269        // Observations to move: dedup against target's existing set and within
2270        // the batch (matching what `add_observations` would have done).
2271        let target_existing: AHashSet<StrId> = self.entity_slots[target_slot as usize]
2272            .as_ref()
2273            .unwrap()
2274            .observations
2275            .iter()
2276            .copied()
2277            .collect();
2278        let mut obs_seen: AHashSet<StrId> = AHashSet::new();
2279        let mut obs_to_add: Vec<String> = Vec::new();
2280        for o in &source_entity.observations {
2281            if let Some(oid) = self.interner.get_optional(o)
2282                && !target_existing.contains(&oid)
2283                && obs_seen.insert(oid)
2284            {
2285                obs_to_add.push(o.clone());
2286            }
2287        }
2288
2289        // Relations to redirect: replace source with target, drop self-loops,
2290        // and dedup against existing relations and within the batch.
2291        let existing_rels: AHashSet<(StrId, StrId, StrId)> =
2292            self.relations.iter().map(|r| (r.from, r.to, r.relation_type)).collect();
2293        let mut rel_seen: AHashSet<(StrId, StrId, StrId)> = AHashSet::new();
2294        let mut redirect: Vec<Relation> = Vec::new();
2295        for r in &self.relations {
2296            if r.from != source_id && r.to != source_id {
2297                continue;
2298            }
2299            let new_from = if r.from == source_id { target_id } else { r.from };
2300            let new_to = if r.to == source_id { target_id } else { r.to };
2301            if new_from == new_to {
2302                continue; // self-loop after redirect
2303            }
2304            let key = (new_from, new_to, r.relation_type);
2305            if existing_rels.contains(&key) || !rel_seen.insert(key) {
2306                continue;
2307            }
2308            redirect.push(Relation {
2309                from: self.interner.lookup(new_from).to_string(),
2310                to: self.interner.lookup(new_to).to_string(),
2311                relation_type: self.interner.lookup(r.relation_type).to_string(),
2312            });
2313        }
2314
2315        let added_count = obs_to_add.len();
2316        let redirected = redirect.len() as u32;
2317
2318        // Build every record up front so writing is the only fallible step.
2319        let mut records: Vec<(RecordKind, Vec<u8>)> = Vec::new();
2320        if !obs_to_add.is_empty() {
2321            let mut buf = Vec::new();
2322            store_enc::encode_add_observations(&mut buf, target, &obs_to_add)
2323                .map_err(MCSError::IoError)?;
2324            records.push((RecordKind::AddObservations, buf));
2325        }
2326        for r in &redirect {
2327            let mut buf = Vec::new();
2328            store_enc::encode_create_relation(&mut buf, &r.from, &r.to, &r.relation_type)
2329                .map_err(MCSError::IoError)?;
2330            records.push((RecordKind::CreateRelation, buf));
2331        }
2332        let mut del_buf = Vec::new();
2333        store_enc::encode_delete_entity(&mut del_buf, source).map_err(MCSError::IoError)?;
2334        records.push((RecordKind::DeleteEntity, del_buf));
2335
2336        // Write-ahead, transactionally: begin, all records, commit.
2337        self.store.write_record(RecordKind::TxnBegin, &[]).map_err(MCSError::IoError)?;
2338        for (kind, data) in &records {
2339            self.store.write_record(*kind, data).map_err(MCSError::IoError)?;
2340        }
2341        self.store.write_record(RecordKind::TxnCommit, &[]).map_err(MCSError::IoError)?;
2342
2343        // Logged and committed — now apply to in-memory state (no more logging).
2344        for (kind, data) in &records {
2345            Self::apply_record(
2346                *kind, data, &mut self.interner, &mut self.entity_slots, &mut self.search,
2347                &mut self.name_table, &mut self.relations,
2348            );
2349        }
2350
2351        Ok(serde_json::json!({
2352            "source": source,
2353            "target": target,
2354            "movedObservations": moved_obs_count,
2355            "addedObservations": added_count,
2356            "redirectedRelations": redirected,
2357        }))
2358    }
2359
2360    /// Extract a connected subgraph around one or more seed entity names,
2361    /// expanding out to `depth` hops along all relations (undirected). Returns
2362    /// the set of reached entities and the relations among them.
2363    pub fn extract_subgraph(&self, names: &[String], depth: u32) -> Result<KnowledgeGraphOut> {
2364        if names.is_empty() {
2365            return Ok(KnowledgeGraphOut {
2366                entities: Vec::new(),
2367                relations: Vec::new(),
2368            });
2369        }
2370        // Seed the BFS queue from any names that exist.
2371        let mut visited: AHashSet<StrId> = AHashSet::new();
2372        let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
2373        for name in names {
2374            if let Some(id) = self.interner.get_optional(name)
2375                && visited.insert(id)
2376            {
2377                queue.push_back((id, 0));
2378            }
2379        }
2380        // Build an undirected adjacency map from the incremental index.
2381        let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
2382        for (&node, edges) in &self.adjacency {
2383            let nb: Vec<StrId> = edges.iter().map(|(to, _)| *to).collect();
2384            adj.insert(node, nb);
2385        }
2386        while let Some((node, d)) = queue.pop_front() {
2387            if d >= depth {
2388                continue;
2389            }
2390            if let Some(nbrs) = adj.get(&node) {
2391                for &nb in nbrs {
2392                    if visited.insert(nb) {
2393                        queue.push_back((nb, d + 1));
2394                    }
2395                }
2396            }
2397        }
2398        let mut entities: Vec<Entity> = Vec::with_capacity(visited.len());
2399        for &nid in &visited {
2400            if let Some(e) = self.entity_by_name_id(nid) {
2401                entities.push(e);
2402            }
2403        }
2404        let relations: Vec<Relation> = self
2405            .relations
2406            .iter()
2407            .filter(|r| visited.contains(&r.from) && visited.contains(&r.to))
2408            .map(|r| self.relation_to_output(r))
2409            .collect();
2410        Ok(KnowledgeGraphOut { entities, relations })
2411    }
2412
2413    /// Return full entities for a list of names. Missing names yield `None`.
2414    pub fn batch_get_entities(&self, names: &[String]) -> Vec<Option<Entity>> {
2415        names.iter().map(|n| self.get_entity(n)).collect()
2416    }
2417
2418    /// Recursive DFS helper — collects every simple path from `current` to
2419    /// `target` up to `max_depth` hops, capped at `max_paths` results.
2420    #[allow(clippy::too_many_arguments)]
2421    fn dfs_all_paths(
2422        adj: &AHashMap<StrId, Vec<StrId>>,
2423        current: StrId,
2424        target: StrId,
2425        max_depth: usize,
2426        max_paths: usize,
2427        visited: &mut AHashSet<StrId>,
2428        current_path: &mut Vec<StrId>,
2429        all_paths: &mut Vec<Vec<StrId>>,
2430    ) {
2431        if all_paths.len() >= max_paths {
2432            return;
2433        }
2434        if current == target && current_path.len() > 1 {
2435            all_paths.push(current_path.clone());
2436            return;
2437        }
2438        if current_path.len() > max_depth {
2439            return;
2440        }
2441        if let Some(neighbors) = adj.get(&current) {
2442            for &nb in neighbors {
2443                if visited.insert(nb) {
2444                    current_path.push(nb);
2445                    Self::dfs_all_paths(
2446                        adj, nb, target, max_depth, max_paths, visited, current_path, all_paths,
2447                    );
2448                    current_path.pop();
2449                    visited.remove(&nb);
2450                }
2451            }
2452        }
2453    }
2454
2455    /// Find all simple paths between `from` and `to` up to `max_depth` hops,
2456    /// returning at most `max_paths` results. Paths are found via DFS with
2457    /// backtracking and include both endpoints.
2458    pub fn find_all_paths(
2459        &self,
2460        from: &str,
2461        to: &str,
2462        max_depth: usize,
2463        max_paths: usize,
2464    ) -> Result<Vec<Vec<String>>> {
2465        let from_id = self
2466            .interner
2467            .get_optional(from)
2468            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
2469        let to_id = self
2470            .interner
2471            .get_optional(to)
2472            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
2473        // Verify both are live.
2474        if self.lookup_live_slot(from).is_none() {
2475            return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
2476        }
2477        if self.lookup_live_slot(to).is_none() {
2478            return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
2479        }
2480        if from_id == to_id {
2481            return Ok(vec![vec![from.to_string()]]);
2482        }
2483        // Build undirected adjacency from the incremental index.
2484        let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::with_capacity(self.adjacency.len());
2485        for (&node, edges) in &self.adjacency {
2486            let nbrs: Vec<StrId> = edges.iter().map(|(to, _)| *to).collect();
2487            adj.insert(node, nbrs);
2488        }
2489        let mut all_paths: Vec<Vec<StrId>> = Vec::new();
2490        let mut current_path = Vec::new();
2491        let mut visited: AHashSet<StrId> = AHashSet::new();
2492        visited.insert(from_id);
2493        current_path.push(from_id);
2494        Self::dfs_all_paths(
2495            &adj,
2496            from_id,
2497            to_id,
2498            max_depth,
2499            max_paths,
2500            &mut visited,
2501            &mut current_path,
2502            &mut all_paths,
2503        );
2504        if all_paths.is_empty() {
2505            return Err(MCSError::MemoryError(format!(
2506                "No path found between '{from}' and '{to}'"
2507            )));
2508        }
2509        let result: Vec<Vec<String>> = all_paths
2510            .into_iter()
2511            .map(|path| {
2512                path.into_iter()
2513                    .map(|id| self.interner.lookup(id).to_string())
2514                    .collect()
2515            })
2516            .collect();
2517        Ok(result)
2518    }
2519
2520    // --- Snapshot ---
2521
2522    /// Create a wait-free read snapshot (item 2 in plan).
2523    /// Freezes entity_slots and relations into `Arc<[_]>` and clones the rest.
2524    pub fn snapshot(&self) -> ReadSnapshot {
2525        ReadSnapshot {
2526            interner: self.interner.clone(),
2527            entity_slots: Arc::from_iter(self.entity_slots.iter().cloned()),
2528            free_slots: self.free_slots.clone(),
2529            name_table: self.name_table.clone(),
2530            relations: Arc::from_iter(self.relations.iter().cloned()),
2531            adjacency: self.adjacency.clone(),
2532            search: self.search.clone(),
2533        }
2534    }
2535
2536    // --- Flush & sync ---
2537
2538    /// Flush and fsync the log to stable storage.
2539    pub fn flush_and_sync(&mut self) -> Result<()> {
2540        self.store.flush_and_sync().map_err(MCSError::IoError)
2541    }
2542}
2543
2544
2545
2546// ---------------------------------------------------------------------------
2547// GraphHandle – wait-free read / serialized-write handle.
2548// ---------------------------------------------------------------------------
2549
2550/// Wait-free read / serialized-write handle to the graph.
2551///
2552/// Readers load a frozen [`ReadSnapshot`] via [`read`](GraphHandle::read)
2553/// (lock-free via `ArcSwap`). Writers take a [`Mutex`] lock, mutate the
2554/// underlying [`KnowledgeGraph`], and publish a fresh snapshot on unlock
2555/// via the [`WriteGuard`] drop glue.
2556pub struct GraphHandle {
2557    inner: Arc<Mutex<KnowledgeGraph>>,
2558    snapshot: ArcSwap<ReadSnapshot>,
2559}
2560
2561/// RAII guard that publishes a fresh [`ReadSnapshot`] on drop.
2562pub struct WriteGuard<'a> {
2563    guard: parking_lot::MutexGuard<'a, KnowledgeGraph>,
2564    snapshot: &'a ArcSwap<ReadSnapshot>,
2565    did_publish: bool,
2566}
2567
2568impl WriteGuard<'_> {
2569    /// Publish a snapshot now (eager, before drop). Also called by Drop.
2570    pub fn publish(&mut self) {
2571        let snap = Arc::new(self.guard.snapshot());
2572        self.snapshot.store(snap);
2573        self.did_publish = true;
2574    }
2575
2576    /// Access the underlying `KnowledgeGraph` for mutation.
2577    pub fn graph(&mut self) -> &mut KnowledgeGraph {
2578        &mut self.guard
2579    }
2580}
2581
2582impl std::ops::Deref for WriteGuard<'_> {
2583    type Target = KnowledgeGraph;
2584    fn deref(&self) -> &KnowledgeGraph {
2585        &self.guard
2586    }
2587}
2588
2589impl std::ops::DerefMut for WriteGuard<'_> {
2590    fn deref_mut(&mut self) -> &mut KnowledgeGraph {
2591        &mut self.guard
2592    }
2593}
2594
2595impl Drop for WriteGuard<'_> {
2596    fn drop(&mut self) {
2597        if !self.did_publish {
2598            self.publish();
2599        }
2600    }
2601}
2602
2603impl GraphHandle {
2604    /// Open or create the graph at `path`, seeding the initial snapshot.
2605    pub fn new(path: &Path) -> std::io::Result<Self> {
2606        let kg = KnowledgeGraph::new(path)?;
2607        let snapshot = Arc::new(kg.snapshot());
2608        Ok(Self {
2609            inner: Arc::new(Mutex::new(kg)),
2610            snapshot: ArcSwap::new(snapshot),
2611        })
2612    }
2613
2614    /// Lock-free read snapshot. Holds an `Arc` reference to the frozen graph data.
2615    pub fn read(&self) -> ReadSnapshot {
2616        (**self.snapshot.load()).clone()
2617    }
2618
2619    /// Serialised write access. Returns a guard that publishes a fresh snapshot
2620    /// when dropped (or when [`WriteGuard::publish`] is called eagerly).
2621    pub fn write(&self) -> WriteGuard<'_> {
2622        WriteGuard {
2623            guard: self.inner.lock(),
2624            snapshot: &self.snapshot,
2625            did_publish: false,
2626        }
2627    }
2628}
2629
2630