Skip to main content

mcp_memory/
kg.rs

1use std::collections::VecDeque;
2
3use ahash::{AHashMap, AHashSet};
4use std::path::Path;
5
6use serde::ser::{Serialize, SerializeSeq, SerializeStruct, Serializer};
7
8use crate::errors::{MCSError, Result};
9use crate::intern::{StrId, StringInterner};
10use crate::types::{Entity, Relation, KnowledgeGraphOut};
11use crate::search::SearchIndex;
12use crate::store::{self as store_enc, BinaryStore, RecordKind};
13
14const ENTITY_SLOT_LIVE: u8 = 1;
15const NAME_TABLE_SHARDS: usize = 4;
16
17// ---------------------------------------------------------------------------
18// Prefetch helper – issues a non-binding software prefetch hint to pull a
19// cache-line into L1/L2 while we finish probing the current entry.
20// ---------------------------------------------------------------------------
21#[cfg(target_arch = "x86_64")]
22#[inline(always)]
23unsafe fn prefetch_addr(addr: *const u8) {
24    // _MM_HINT_T0 = 3  (temporal prefetch to all cache levels)
25    std::arch::x86_64::_mm_prefetch::<3>(addr);
26}
27
28#[cfg(not(target_arch = "x86_64"))]
29#[inline(always)]
30const unsafe fn prefetch_addr(_addr: *const u8) {}
31
32// ---------------------------------------------------------------------------
33// StoredEntity / StoredRelation – internal representations using StrId.
34// ---------------------------------------------------------------------------
35// Default layout: 40 B / align 8 (Rust packs `state` into the Vec's padding and
36// the `Option` niche is free). With `cache_align`, the slot is rounded to a full
37// 64-byte line so a point lookup/mutation (name_table -> slot index) touches
38// exactly one cache line instead of occasionally straddling two. Costs +60%
39// memory and a wider stride on bulk scans — measure before enabling.
40#[cfg_attr(feature = "cache_align", repr(align(64)))]
41struct StoredEntity {
42    state: u8,
43    name: StrId,
44    entity_type: StrId,
45    observations: Vec<StrId>,
46}
47
48impl StoredEntity {
49    const fn is_live(&self) -> bool {
50        self.state == ENTITY_SLOT_LIVE
51    }
52}
53
54// Default layout: 12 B / align 4 → ~1 in 5 records straddles a 64-byte line.
55// With `cache_align`, align(16) rounds the size to 16 B so 4 records fill a line
56// exactly (no straddle, AVX2-load-friendly) for +33% memory.
57#[cfg_attr(feature = "cache_align", repr(align(16)))]
58struct StoredRelation {
59    from: StrId,
60    to: StrId,
61    relation_type: StrId,
62}
63
64// ---------------------------------------------------------------------------
65// Borrowing serialization views (M6).
66//
67// Read tools used to build owned `Entity`/`Relation` vecs (a fresh `String`
68// per name/type/observation) and *then* serialize them — roughly 2-3x the
69// graph resident at once. These views instead hold references to the selected
70// stored records and emit their interned `&str` directly during
71// serialization, with no intermediate owned strings. The emitted JSON is
72// byte-for-byte identical to serializing `KnowledgeGraphOut`.
73// ---------------------------------------------------------------------------
74
75/// A borrowing view over a selected slice of the graph. Serializes to
76/// `{"entities":[...],"relations":[...]}`.
77pub struct GraphView<'a> {
78    kg: &'a KnowledgeGraph,
79    entities: Vec<&'a StoredEntity>,
80    relations: Vec<&'a StoredRelation>,
81}
82
83impl GraphView<'_> {
84    /// Materialize into the owned [`KnowledgeGraphOut`]. Used by the direct
85    /// (non-serializing) callers and tests; the server's read handlers
86    /// serialize the view directly instead.
87    pub fn to_owned_out(&self) -> KnowledgeGraphOut {
88        KnowledgeGraphOut {
89            entities: self.entities.iter().map(|e| self.kg.entity_to_output(e)).collect(),
90            relations: self.relations.iter().map(|r| self.kg.relation_to_output(r)).collect(),
91        }
92    }
93}
94
95impl Serialize for GraphView<'_> {
96    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
97        let mut st = s.serialize_struct("KnowledgeGraphOut", 2)?;
98        st.serialize_field("entities", &EntityListRef { kg: self.kg, items: &self.entities })?;
99        st.serialize_field("relations", &RelationListRef { kg: self.kg, items: &self.relations })?;
100        st.end()
101    }
102}
103
104struct EntityListRef<'a> { kg: &'a KnowledgeGraph, items: &'a [&'a StoredEntity] }
105impl Serialize for EntityListRef<'_> {
106    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
107        let mut seq = s.serialize_seq(Some(self.items.len()))?;
108        for &e in self.items {
109            seq.serialize_element(&EntityRef { kg: self.kg, e })?;
110        }
111        seq.end()
112    }
113}
114
115struct RelationListRef<'a> { kg: &'a KnowledgeGraph, items: &'a [&'a StoredRelation] }
116impl Serialize for RelationListRef<'_> {
117    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
118        let mut seq = s.serialize_seq(Some(self.items.len()))?;
119        for &r in self.items {
120            seq.serialize_element(&RelationRef { kg: self.kg, r })?;
121        }
122        seq.end()
123    }
124}
125
126struct EntityRef<'a> { kg: &'a KnowledgeGraph, e: &'a StoredEntity }
127impl Serialize for EntityRef<'_> {
128    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
129        let mut st = s.serialize_struct("Entity", 3)?;
130        st.serialize_field("name", self.kg.interner.lookup(self.e.name))?;
131        st.serialize_field("entityType", self.kg.interner.lookup(self.e.entity_type))?;
132        st.serialize_field("observations", &ObsRef { kg: self.kg, obs: &self.e.observations })?;
133        st.end()
134    }
135}
136
137struct ObsRef<'a> { kg: &'a KnowledgeGraph, obs: &'a [StrId] }
138impl Serialize for ObsRef<'_> {
139    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
140        let mut seq = s.serialize_seq(Some(self.obs.len()))?;
141        for &o in self.obs {
142            seq.serialize_element(self.kg.interner.lookup(o))?;
143        }
144        seq.end()
145    }
146}
147
148struct RelationRef<'a> { kg: &'a KnowledgeGraph, r: &'a StoredRelation }
149impl Serialize for RelationRef<'_> {
150    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
151        let mut st = s.serialize_struct("Relation", 3)?;
152        st.serialize_field("from", self.kg.interner.lookup(self.r.from))?;
153        st.serialize_field("to", self.kg.interner.lookup(self.r.to))?;
154        st.serialize_field("relationType", self.kg.interner.lookup(self.r.relation_type))?;
155        st.end()
156    }
157}
158
159/// Edge-following direction for neighborhood queries.
160#[derive(Clone, Copy, PartialEq, Eq, Debug)]
161pub enum Direction {
162    /// Follow `from -> to` (outgoing edges).
163    Out,
164    /// Follow `to -> from` (incoming edges).
165    In,
166    /// Follow edges regardless of orientation.
167    Both,
168}
169
170impl Direction {
171    /// Parse a direction string; anything other than `"out"`/`"in"` is `Both`.
172    pub fn parse(s: Option<&str>) -> Self {
173        match s {
174            Some("out") => Direction::Out,
175            Some("in") => Direction::In,
176            _ => Direction::Both,
177        }
178    }
179}
180
181/// Escape a string for embedding inside a Mermaid/DOT quoted label.
182fn sanitize_label(s: &str) -> String {
183    let mut out = String::with_capacity(s.len());
184    for c in s.chars() {
185        match c {
186            '"' => out.push('\''),
187            '\n' | '\r' => out.push(' '),
188            _ => out.push(c),
189        }
190    }
191    out
192}
193
194// ---------------------------------------------------------------------------
195// ShardedNameTable – open-addressing hash map split into N independent shards.
196//
197// Each shard uses **ctrl-byte bucket** approach: a 1-byte metadata array
198// stores the 7-bit hash stamp (h2) for each slot, with `0xFF` = EMPTY.
199// On probe, the first memory access is a single byte (ctrl). The full key
200// (StrId) is only compared when the stamp matches — ~127/128 of probe steps
201// touch nothing but the ctrl byte.  See also SwissTable / hashbrown.
202// ---------------------------------------------------------------------------
203const EMPTY_SLOT: u8 = 0xFF;
204
205#[inline(always)]
206const fn h2(hash: u64) -> u8 {
207    (hash & 0x7F) as u8
208}
209
210#[inline(always)]
211const fn h1(hash: u64, mask: usize) -> usize {
212    ((hash >> 7) as usize) & mask
213}
214
215struct NameTableShard {
216    ctrl: Vec<u8>,      // 0xFF = empty; 0x00-0x7F = h2 stamp (bit 7 always clear)
217    names: Vec<StrId>,
218    slots: Vec<u32>,
219    mask: usize,
220    count: usize,
221}
222
223impl NameTableShard {
224    fn new(capacity: usize) -> Self {
225        let cap = capacity.next_power_of_two().max(16);
226        Self {
227            ctrl: vec![EMPTY_SLOT; cap],
228            names: vec![StrId::EMPTY; cap],
229            slots: vec![u32::MAX; cap],
230            mask: cap - 1,
231            count: 0,
232        }
233    }
234
235    #[inline(always)]
236    fn lookup(&self, hash: u64, name: StrId) -> Option<u32> {
237        let stamp = h2(hash);
238        let mask = self.mask;
239        let mut idx = h1(hash, mask);
240        let ctrl = self.ctrl.as_ptr();
241        let names = self.names.as_ptr();
242        let slots = self.slots.as_ptr();
243        let len = self.ctrl.len();
244
245        for _ in 0..len {
246            // Prefetch the ctrl byte 4 slots ahead — overlaps memory latency.
247            let prefetch_idx = idx.wrapping_add(4) & mask;
248            unsafe { prefetch_addr(ctrl.add(prefetch_idx)) };
249
250            // SAFETY: idx always < len because of &mask on each iteration.
251            unsafe {
252                let c = *ctrl.add(idx);
253                // Bit 7 set → EMPTY → key not present.
254                if c & 0x80 != 0 {
255                    return None;
256                }
257                // Stamp match → compare full key (rare: ~1/128 probes).
258                if c == stamp && *names.add(idx) == name {
259                    return Some(*slots.add(idx));
260                }
261            }
262            idx = (idx + 1) & mask;
263        }
264        None
265    }
266
267    fn insert(&mut self, interner: &StringInterner, hash: u64, name: StrId, slot: u32) {
268        if self.count * 4 > self.ctrl.len() * 3 {
269            self.grow(interner);
270        }
271        let stamp = h2(hash);
272        let mask = self.mask;
273        let mut idx = h1(hash, mask);
274        loop {
275            // SAFETY: idx & mask always < len for power-of-two capacity.
276            unsafe {
277                if *self.ctrl.get_unchecked(idx) & 0x80 != 0 {
278                    *self.ctrl.get_unchecked_mut(idx) = stamp;
279                    *self.names.get_unchecked_mut(idx) = name;
280                    *self.slots.get_unchecked_mut(idx) = slot;
281                    self.count += 1;
282                    return;
283                }
284            }
285            idx = (idx + 1) & mask;
286        }
287    }
288
289    fn remove(&mut self, interner: &StringInterner, hash: u64, name: StrId) {
290        let stamp = h2(hash);
291        let mask = self.mask;
292        let mut idx = h1(hash, mask);
293        let len = self.ctrl.len();
294        for _ in 0..len {
295            if self.ctrl[idx] & 0x80 != 0 {
296                return;
297            }
298            if self.ctrl[idx] == stamp && self.names[idx] == name {
299                // Found — remove with shift-back to preserve probe chains.
300                self.ctrl[idx] = EMPTY_SLOT;
301                self.names[idx] = StrId::EMPTY;
302                self.slots[idx] = u32::MAX;
303                self.count -= 1;
304
305                let mut next = (idx + 1) & mask;
306                while self.ctrl[next] & 0x80 == 0 {
307                    let nn = self.names[next];
308                    let ns = self.slots[next];
309                    // Hash is no longer stored (M4) — recompute it from the
310                    // interned name to find the entry's ideal bucket.
311                    let nh = interner.get_hash(nn);
312                    self.ctrl[next] = EMPTY_SLOT;
313                    self.names[next] = StrId::EMPTY;
314                    self.slots[next] = u32::MAX;
315                    self.count -= 1;
316
317                    // Re-insert at its ideal bucket.
318                    let nstamp = h2(nh);
319                    let mut re_idx = h1(nh, mask);
320                    while self.ctrl[re_idx] & 0x80 == 0 {
321                        re_idx = (re_idx + 1) & mask;
322                    }
323                    self.ctrl[re_idx] = nstamp;
324                    self.names[re_idx] = nn;
325                    self.slots[re_idx] = ns;
326                    self.count += 1;
327
328                    next = (next + 1) & mask;
329                }
330                return;
331            }
332            idx = (idx + 1) & mask;
333        }
334    }
335
336    fn grow(&mut self, interner: &StringInterner) {
337        let new_cap = self.ctrl.len() * 2;
338        let new_mask = new_cap - 1;
339        let mut new_ctrl = vec![EMPTY_SLOT; new_cap];
340        let mut new_names = vec![StrId::EMPTY; new_cap];
341        let mut new_slots = vec![u32::MAX; new_cap];
342
343        for i in 0..self.ctrl.len() {
344            if self.ctrl[i] & 0x80 == 0 {
345                // Recompute the hash from the interned name (M4: not stored).
346                let name = self.names[i];
347                let hash = interner.get_hash(name);
348                let stamp = h2(hash);
349                let mut idx = h1(hash, new_mask);
350                while new_ctrl[idx] & 0x80 == 0 {
351                    idx = (idx + 1) & new_mask;
352                }
353                new_ctrl[idx] = stamp;
354                new_names[idx] = name;
355                new_slots[idx] = self.slots[i];
356            }
357        }
358
359        self.ctrl = new_ctrl;
360        self.names = new_names;
361        self.slots = new_slots;
362        self.mask = new_mask;
363    }
364}
365
366struct ShardedNameTable {
367    shards: [NameTableShard; NAME_TABLE_SHARDS],
368}
369
370impl ShardedNameTable {
371    fn new(capacity_per_shard: usize) -> Self {
372        Self {
373            shards: [
374                NameTableShard::new(capacity_per_shard),
375                NameTableShard::new(capacity_per_shard),
376                NameTableShard::new(capacity_per_shard),
377                NameTableShard::new(capacity_per_shard),
378            ],
379        }
380    }
381
382    #[inline(always)]
383    const fn shard(hash: u64) -> usize {
384        (hash as usize) & (NAME_TABLE_SHARDS - 1)
385    }
386
387    #[inline(always)]
388    fn lookup(&self, hash: u64, name: StrId) -> Option<u32> {
389        self.shards[Self::shard(hash)].lookup(hash, name)
390    }
391
392    #[inline(always)]
393    fn insert(&mut self, interner: &StringInterner, hash: u64, name: StrId, slot: u32) {
394        self.shards[Self::shard(hash)].insert(interner, hash, name, slot);
395    }
396
397    #[inline(always)]
398    fn remove(&mut self, interner: &StringInterner, hash: u64, name: StrId) {
399        self.shards[Self::shard(hash)].remove(interner, hash, name);
400    }
401}
402
403// ---------------------------------------------------------------------------
404// KnowledgeGraph – the central type.
405// ---------------------------------------------------------------------------
406pub struct KnowledgeGraph {
407    interner: StringInterner,
408    entity_slots: Vec<Option<StoredEntity>>,
409    /// Tombstoned slot indices available for reuse on the next create (M2),
410    /// so create/delete churn doesn't grow `entity_slots` without bound.
411    free_slots: Vec<u32>,
412    name_table: ShardedNameTable,
413    relations: Vec<StoredRelation>,
414    search: SearchIndex,
415    store: BinaryStore,
416}
417
418impl KnowledgeGraph {
419    pub fn new(path: &Path) -> std::io::Result<Self> {
420        let store = BinaryStore::new(path)?;
421
422        // Replay into local collections, then assign into self — no raw pointers needed (X3).
423        let mut interner = StringInterner::with_capacity(65536, 1024);
424        let mut entity_slots: Vec<Option<StoredEntity>> = Vec::with_capacity(256);
425        let mut name_table = ShardedNameTable::new(64);
426        let mut relations: Vec<StoredRelation> = Vec::with_capacity(64);
427        let mut search = SearchIndex::new();
428
429        store.replay(|kind, data| {
430            match kind {
431                RecordKind::CreateEntity => {
432                    if let Some((name, etype, obs)) = store_enc::decode_create_entity(data) {
433                        Self::replay_create_entity(
434                            &mut interner, &mut entity_slots, &mut search, &mut name_table, name, etype, &obs,
435                        );
436                    }
437                }
438                RecordKind::CreateRelation => {
439                    if let Some((from, to, rtype)) = store_enc::decode_create_relation(data) {
440                        let from_id = interner.intern(from);
441                        let to_id = interner.intern(to);
442                        let type_id = interner.intern(rtype);
443                        relations.push(StoredRelation {
444                            from: from_id,
445                            to: to_id,
446                            relation_type: type_id,
447                        });
448                    }
449                }
450                RecordKind::AddObservations => {
451                    if let Some((name, obs)) = store_enc::decode_add_observations(data) {
452                        Self::replay_add_observations(
453                            &mut interner, &mut entity_slots, &mut search, &mut name_table, name, &obs,
454                        );
455                    }
456                }
457                RecordKind::DeleteEntity => {
458                    if let Some(name) = store_enc::decode_delete_entity(data) {
459                        Self::replay_delete_entity(
460                            &mut interner, &mut entity_slots, &mut relations, &mut search, &mut name_table, name,
461                        );
462                    }
463                }
464                RecordKind::DeleteObservations => {
465                    if let Some((name, obs)) = store_enc::decode_delete_observations(data) {
466                        Self::replay_delete_observations(
467                            &mut interner, &mut entity_slots, &mut search, &mut name_table, name, &obs,
468                        );
469                    }
470                }
471                RecordKind::DeleteRelation => {
472                    if let Some((from, to, rtype)) = store_enc::decode_delete_relation(data) {
473                        let from_id = interner.intern(from);
474                        let to_id = interner.intern(to);
475                        let type_id = interner.intern(rtype);
476                        relations.retain(|r| {
477                            !(r.from == from_id && r.to == to_id && r.relation_type == type_id)
478                        });
479                    }
480                }
481            }
482        })?;
483
484        // Slots tombstoned by deletes during replay are available for reuse (M2).
485        let free_slots: Vec<u32> = entity_slots
486            .iter()
487            .enumerate()
488            .filter(|(_, s)| s.is_none())
489            .map(|(i, _)| i as u32)
490            .collect();
491
492        Ok(Self {
493            interner,
494            entity_slots,
495            free_slots,
496            name_table,
497            relations,
498            search,
499            store,
500        })
501    }
502
503    // -----------------------------------------------------------------------
504    // Replay helpers (static to avoid borrow issues in the closure)
505    // -----------------------------------------------------------------------
506
507    #[allow(clippy::ptr_arg)]
508    fn replay_create_entity(
509        interner: &mut StringInterner,
510        entities: &mut Vec<Option<StoredEntity>>,
511        search: &mut SearchIndex,
512        name_table: &mut ShardedNameTable,
513        name: &str,
514        etype: &str,
515        observations: &[&str],
516    ) {
517        let name_id = interner.intern(name);
518        let type_id = interner.intern(etype);
519        let obs_ids: Vec<StrId> = observations.iter().map(|o| interner.intern(o)).collect();
520        let slot = entities.len() as u32;
521        entities.push(Some(StoredEntity {
522            state: ENTITY_SLOT_LIVE,
523            name: name_id,
524            entity_type: type_id,
525            observations: obs_ids.clone(),
526        }));
527        let hash = interner.get_hash(name_id);
528        name_table.insert(&*interner, hash, name_id, slot);
529        search.index_entity(interner, slot, name_id, type_id, &obs_ids);
530    }
531
532    fn replay_add_observations(
533        interner: &mut StringInterner,
534        entities: &mut [Option<StoredEntity>],
535        search: &mut SearchIndex,
536        name_table: &mut ShardedNameTable,
537        name: &str,
538        observations: &[&str],
539    ) {
540        let name_id = interner.intern(name);
541        let hash = interner.get_hash(name_id);
542        if let Some(slot) = name_table.lookup(hash, name_id)
543            && let Some(Some(entity)) = entities.get_mut(slot as usize)
544        {
545            for &o in observations {
546                let oid = interner.intern(o);
547                if !entity.observations.contains(&oid) {
548                    entity.observations.push(oid);
549                }
550            }
551            search.remove_entity(slot);
552            search.index_entity(
553                interner,
554                slot,
555                entity.name,
556                entity.entity_type,
557                &entity.observations,
558            );
559        }
560    }
561
562    fn replay_delete_entity(
563        interner: &mut StringInterner,
564        entities: &mut [Option<StoredEntity>],
565        rels: &mut Vec<StoredRelation>,
566        search: &mut SearchIndex,
567        name_table: &mut ShardedNameTable,
568        name: &str,
569    ) {
570        let name_id = interner.intern(name);
571        let hash = interner.get_hash(name_id);
572        if let Some(slot) = name_table.lookup(hash, name_id)
573            && let Some(Some(_)) = entities.get(slot as usize)
574        {
575            entities[slot as usize] = None;
576            search.remove_entity(slot);
577            name_table.remove(&*interner, hash, name_id);
578        }
579        rels.retain(|r| r.from != name_id && r.to != name_id);
580    }
581
582    fn replay_delete_observations(
583        interner: &mut StringInterner,
584        entities: &mut [Option<StoredEntity>],
585        search: &mut SearchIndex,
586        name_table: &mut ShardedNameTable,
587        name: &str,
588        observations: &[&str],
589    ) {
590        let name_id = interner.intern(name);
591        let hash = interner.get_hash(name_id);
592        if let Some(slot) = name_table.lookup(hash, name_id)
593            && let Some(Some(entity)) = entities.get_mut(slot as usize)
594        {
595            let remove_ids: Vec<StrId> = observations.iter().map(|o| interner.intern(o)).collect();
596            entity.observations.retain(|o| !remove_ids.contains(o));
597            search.remove_entity(slot);
598            search.index_entity(
599                interner,
600                slot,
601                entity.name,
602                entity.entity_type,
603                &entity.observations,
604            );
605        }
606    }
607
608    // -----------------------------------------------------------------------
609    // Public API
610    // -----------------------------------------------------------------------
611
612    pub const fn interner(&self) -> &StringInterner {
613        &self.interner
614    }
615
616    /// Return a single entity by exact name match.
617    pub fn get_entity(&self, name: &str) -> Option<Entity> {
618        let name_id = self.interner.get_optional(name)?;
619        let hash = self.interner.get_hash(name_id);
620        let slot = self.name_table.lookup(hash, name_id)?;
621        let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
622        if !stored.is_live() {
623            return None;
624        }
625        Some(self.entity_to_output(stored))
626    }
627
628    /// Return aggregate statistics about the graph.
629    pub fn graph_stats(&self) -> serde_json::Value {
630        let live_entities = self
631            .entity_slots
632            .iter()
633            .filter(|s| s.as_ref().is_some_and(|e| e.is_live()))
634            .count();
635        let total_relations = self.relations.len();
636        let index_entries = self.search.len();
637        let total_obs: usize = self
638            .entity_slots
639            .iter()
640            .filter_map(|s| s.as_ref())
641            .filter(|e| e.is_live())
642            .map(|e| e.observations.len())
643            .sum();
644
645        serde_json::json!({
646            "entities": live_entities,
647            "relations": total_relations,
648            "totalObservations": total_obs,
649            "searchIndexEntries": index_entries,
650            "internedStrings": self.interner.len(),
651            "internedBytes": self.interner.total_bytes(),
652        })
653    }
654
655    /// Search relations by optional filters: `from`, `to`, `relationType`.
656    /// Any filter that is absent matches everything. A filter value that does
657    /// not exist in the graph returns empty results.
658    pub fn search_relations(&self, from: Option<&str>, to: Option<&str>, rtype: Option<&str>) -> Vec<Relation> {
659        let from_id = match from {
660            Some(f) => match self.interner.get_optional(f) {
661                Some(id) => Some(id),
662                None => return Vec::new(),
663            },
664            None => None,
665        };
666        let to_id = match to {
667            Some(t) => match self.interner.get_optional(t) {
668                Some(id) => Some(id),
669                None => return Vec::new(),
670            },
671            None => None,
672        };
673        let rtype_id = match rtype {
674            Some(r) => match self.interner.get_optional(r) {
675                Some(id) => Some(id),
676                None => return Vec::new(),
677            },
678            None => None,
679        };
680
681        self.relations
682            .iter()
683            .filter(|r| {
684                from_id.is_none_or(|f| r.from == f)
685                    && to_id.is_none_or(|t| r.to == t)
686                    && rtype_id.is_none_or(|rt| r.relation_type == rt)
687            })
688            .map(|r| Relation {
689                from: self.interner.lookup(r.from).to_string(),
690                to: self.interner.lookup(r.to).to_string(),
691                relation_type: self.interner.lookup(r.relation_type).to_string(),
692            })
693            .collect()
694    }
695
696    /// BFS shortest-path between two entity names. Returns the sequence of
697    /// entity names along the path (inclusive of both endpoints).
698    pub fn find_path(&self, from: &str, to: &str) -> Result<Vec<String>> {
699        let from_id = self.interner.get_optional(from)
700            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
701        let to_id = self.interner.get_optional(to)
702            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
703        let hash_from = self.interner.get_hash(from_id);
704        let hash_to = self.interner.get_hash(to_id);
705
706        if self.name_table.lookup(hash_from, from_id).is_none() {
707            return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
708        }
709        if self.name_table.lookup(hash_to, to_id).is_none() {
710            return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
711        }
712        if from_id == to_id {
713            return Ok(vec![from.to_string()]);
714        }
715
716        // Build adjacency list (P4) — O(E) once, not O(V×E).
717        let mut adj: AHashMap<StrId, Vec<(StrId, StrId)>> = AHashMap::new();
718        for rel in &self.relations {
719            adj.entry(rel.from).or_default().push((rel.to, rel.relation_type));
720            adj.entry(rel.to).or_default().push((rel.from, rel.relation_type));
721        }
722
723        // BFS over adjacency list
724        let mut visited: AHashSet<StrId> = AHashSet::new();
725        let mut parent: AHashMap<StrId, StrId> = AHashMap::new();
726        let mut queue: VecDeque<StrId> = VecDeque::new();
727
728        visited.insert(from_id);
729        queue.push_back(from_id);
730
731        while let Some(current) = queue.pop_front() {
732            if current == to_id {
733                break;
734            }
735
736            if let Some(neighbors) = adj.get(&current) {
737                for &(neighbor, _) in neighbors {
738                    if visited.insert(neighbor) {
739                        parent.insert(neighbor, current);
740                        queue.push_back(neighbor);
741                    }
742                }
743            }
744        }
745
746        if !parent.contains_key(&to_id) && from_id != to_id {
747            return Err(MCSError::MemoryError(format!(
748                "No path found between '{from}' and '{to}'"
749            )));
750        }
751
752        // Reconstruct path
753        let mut path: Vec<String> = Vec::new();
754        let mut cur = to_id;
755        loop {
756            path.push(self.interner.lookup(cur).to_string());
757            if cur == from_id {
758                break;
759            }
760            cur = *parent.get(&cur).ok_or_else(|| {
761                MCSError::MemoryError("Path reconstruction failed".into())
762            })?;
763        }
764        path.reverse();
765        Ok(path)
766    }
767
768    /// Rewrite the binary log from the current in-memory state.
769    /// After compaction the log contains only the minimal set of records
770    /// needed to reconstruct the graph (all creates, no deletes).
771    /// Crash-safe: writes to a temp file, then atomically renames (C3).
772    pub fn compact(&mut self) -> Result<()> {
773        // 1. Collect current state as create-records
774        let mut create_entities: Vec<Entity> = Vec::new();
775        let mut create_relations: Vec<Relation> = Vec::new();
776
777        for slot in &self.entity_slots {
778            if let Some(stored) = slot.as_ref().filter(|e| e.is_live()) {
779                create_entities.push(self.entity_to_output(stored));
780            }
781        }
782        for rel in &self.relations {
783            create_relations.push(Relation {
784                from: self.interner.lookup(rel.from).to_string(),
785                to: self.interner.lookup(rel.to).to_string(),
786                relation_type: self.interner.lookup(rel.relation_type).to_string(),
787            });
788        }
789
790        // 2. Write to a temp file first
791        let tmp_path = self.store.path().with_extension("tmp");
792        let mut tmp_store = BinaryStore::new(&tmp_path).map_err(MCSError::IoError)?;
793        for entity in &create_entities {
794            let mut buf = Vec::new();
795            store_enc::encode_create_entity(&mut buf, &entity.name, &entity.entity_type, &entity.observations)
796                .map_err(MCSError::IoError)?;
797            tmp_store.write_record(RecordKind::CreateEntity, &buf).map_err(MCSError::IoError)?;
798        }
799        for relation in &create_relations {
800            let mut buf = Vec::new();
801            store_enc::encode_create_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
802                .map_err(MCSError::IoError)?;
803            tmp_store.write_record(RecordKind::CreateRelation, &buf).map_err(MCSError::IoError)?;
804        }
805        tmp_store.flush_and_sync().map_err(MCSError::IoError)?;
806        drop(tmp_store);
807
808        // 3. Atomically rename over the original (atomic on POSIX)
809        std::fs::rename(&tmp_path, self.store.path()).map_err(MCSError::IoError)?;
810
811        // 4. Rebuild the entire in-memory graph from the compacted log (M1/M2).
812        //    Replaying into fresh structures reclaims the interner arena (stale
813        //    strings from deleted/edited entities), tombstoned entity slots, and
814        //    stale search-index entries — none of which the old reopen-only path
815        //    reclaimed.
816        let path = self.store.path().clone();
817        *self = KnowledgeGraph::new(&path).map_err(MCSError::IoError)?;
818
819        Ok(())
820    }
821
822    // ---- Public API with write-ahead log (C1) and error propagation ----
823
824    pub fn create_entities(&mut self, entities: &[Entity]) -> Result<Vec<Entity>> {
825        // Validate up front so an invalid entity never produces partial writes.
826        for entity in entities {
827            if entity.name.is_empty() {
828                return Err(MCSError::InvalidParams(
829                    "Entity name must not be empty".into(),
830                ));
831            }
832        }
833        let mut created = Vec::new();
834        for entity in entities {
835            // Check dedup before writing (using non-interning lookup)
836            let existing = self.interner.get_optional(&entity.name)
837                .and_then(|id| {
838                    let hash = self.interner.get_hash(id);
839                    self.name_table.lookup(hash, id)
840                });
841            if existing.is_some() {
842                continue;
843            }
844            // Write-ahead: encode and log before mutating state
845            let mut buf = Vec::new();
846            store_enc::encode_create_entity(&mut buf, &entity.name, &entity.entity_type, &entity.observations)
847                .map_err(MCSError::IoError)?;
848            self.store.write_record(RecordKind::CreateEntity, &buf)
849                .map_err(MCSError::IoError)?;
850
851            let name_id = self.interner.intern(&entity.name);
852            let hash = self.interner.get_hash(name_id);
853            let type_id = self.interner.intern(&entity.entity_type);
854            let obs_ids: Vec<StrId> = entity
855                .observations
856                .iter()
857                .map(|o| self.interner.intern(o))
858                .collect();
859            // Reuse a tombstoned slot if one is free (M2); its old search-index
860            // entries were cleared on delete, so the slot starts clean.
861            let reused = self.free_slots.pop();
862            let slot = reused.unwrap_or(self.entity_slots.len() as u32);
863            self.search
864                .index_entity(&mut self.interner, slot, name_id, type_id, &obs_ids);
865            let stored = Some(StoredEntity {
866                state: ENTITY_SLOT_LIVE,
867                name: name_id,
868                entity_type: type_id,
869                observations: obs_ids,
870            });
871            match reused {
872                Some(s) => self.entity_slots[s as usize] = stored,
873                None => self.entity_slots.push(stored),
874            }
875            self.name_table.insert(&self.interner, hash, name_id, slot);
876            created.push(Entity {
877                name: entity.name.clone(),
878                entity_type: entity.entity_type.clone(),
879                observations: entity.observations.clone(),
880            });
881        }
882        Ok(created)
883    }
884
885    pub fn create_relations(&mut self, relations: &[Relation]) -> Result<Vec<Relation>> {
886        // Validate up front so an invalid relation never produces partial writes.
887        for relation in relations {
888            if relation.from.is_empty() || relation.to.is_empty() {
889                return Err(MCSError::InvalidParams(
890                    "Relation endpoints must not be empty".into(),
891                ));
892            }
893        }
894        let mut created = Vec::new();
895        // Build a dedup set for O(1) duplicate checks (P5)
896        let mut rel_set: AHashSet<(StrId, StrId, StrId)> = AHashSet::new();
897        for rel in &self.relations {
898            rel_set.insert((rel.from, rel.to, rel.relation_type));
899        }
900        for relation in relations {
901            let from_id = self.interner.intern(&relation.from);
902            let to_id = self.interner.intern(&relation.to);
903            let type_id = self.interner.intern(&relation.relation_type);
904            if !rel_set.insert((from_id, to_id, type_id)) {
905                continue;
906            }
907            // Write-ahead: log before mutation
908            let mut buf = Vec::new();
909            store_enc::encode_create_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
910                .map_err(MCSError::IoError)?;
911            self.store.write_record(RecordKind::CreateRelation, &buf)
912                .map_err(MCSError::IoError)?;
913
914            self.relations.push(StoredRelation {
915                from: from_id,
916                to: to_id,
917                relation_type: type_id,
918            });
919            created.push(Relation {
920                from: relation.from.clone(),
921                to: relation.to.clone(),
922                relation_type: relation.relation_type.clone(),
923            });
924        }
925        Ok(created)
926    }
927
928    pub fn add_observations(&mut self, entity_name: &str, contents: &[String]) -> Result<Vec<String>> {
929        let name_id = self.interner.get_optional(entity_name)
930            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
931        let hash = self.interner.get_hash(name_id);
932        let slot = self
933            .name_table
934            .lookup(hash, name_id)
935            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
936        let stored = self
937            .entity_slots
938            .get_mut(slot as usize)
939            .and_then(|e| e.as_mut())
940            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
941
942        // Deduplicate new observations (P7) — use AHashSet for O(1) lookups
943        let existing: AHashSet<StrId> = stored.observations.iter().copied().collect();
944        let mut added = Vec::new();
945        let mut interned_added = Vec::new();
946        for content in contents {
947            let cid = self.interner.intern(content);
948            if existing.contains(&cid) {
949                continue;
950            }
951            stored.observations.push(cid);
952            interned_added.push(cid);
953            added.push(content.clone());
954        }
955        if !added.is_empty() {
956            // Write-ahead: log before re-indexing
957            let mut buf = Vec::new();
958            store_enc::encode_add_observations(&mut buf, entity_name, &added)
959                .map_err(MCSError::IoError)?;
960            self.store.write_record(RecordKind::AddObservations, &buf)
961                .map_err(MCSError::IoError)?;
962
963            // Incrementally index only the new observation tokens (P3) — no
964            // full remove + re-index of the whole entity.
965            self.search
966                .index_additional(&mut self.interner, slot, &interned_added);
967        }
968        Ok(added)
969    }
970
971    pub fn delete_entities(&mut self, entity_names: &[String]) -> Result<()> {
972        let mut deleted_names = Vec::new();
973        for name in entity_names {
974            let name_id_opt = self.interner.get_optional(name);
975            if let Some(name_id) = name_id_opt {
976                let hash = self.interner.get_hash(name_id);
977                if let Some(slot) = self.name_table.lookup(hash, name_id)
978                    && let Some(Some(_)) = self.entity_slots.get(slot as usize)
979                {
980                    // Write-ahead: log before mutation
981                    let mut buf = Vec::new();
982                    store_enc::encode_delete_entity(&mut buf, name)
983                        .map_err(MCSError::IoError)?;
984                    self.store.write_record(RecordKind::DeleteEntity, &buf)
985                        .map_err(MCSError::IoError)?;
986
987                    self.entity_slots[slot as usize] = None;
988                    self.free_slots.push(slot);
989                    self.search.remove_entity(slot);
990                    self.name_table.remove(&self.interner, hash, name_id);
991                    deleted_names.push(name.clone());
992                }
993            }
994        }
995        if !deleted_names.is_empty() {
996            // Use a AHashSet for O(1) retain checks (P5)
997            let deleted_ids: AHashSet<StrId> = deleted_names.iter()
998                .map(|n| self.interner.intern(n))
999                .collect();
1000            self.relations
1001                .retain(|r| !deleted_ids.contains(&r.from) && !deleted_ids.contains(&r.to));
1002        }
1003        Ok(())
1004    }
1005
1006    pub fn delete_observations(&mut self, entity_name: &str, observations: &[String]) -> Result<()> {
1007        let name_id = self.interner.get_optional(entity_name)
1008            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1009        let hash = self.interner.get_hash(name_id);
1010        let slot = self
1011            .name_table
1012            .lookup(hash, name_id)
1013            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1014        let stored = self
1015            .entity_slots
1016            .get_mut(slot as usize)
1017            .and_then(|e| e.as_mut())
1018            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1019        let remove_ids: AHashSet<StrId> = observations.iter().map(|o| self.interner.intern(o)).collect();
1020        stored.observations.retain(|o| !remove_ids.contains(o));
1021        // Write-ahead: log before re-indexing
1022        let mut buf = Vec::new();
1023        store_enc::encode_delete_observations(&mut buf, entity_name, observations)
1024            .map_err(MCSError::IoError)?;
1025        self.store.write_record(RecordKind::DeleteObservations, &buf)
1026            .map_err(MCSError::IoError)?;
1027
1028        self.search.remove_entity(slot);
1029        self.search
1030            .index_entity(&mut self.interner, slot, stored.name, stored.entity_type, &stored.observations);
1031        Ok(())
1032    }
1033
1034    pub fn delete_relations(&mut self, relations: &[Relation]) -> Result<()> {
1035        // Collect targets into a AHashSet for O(1) retain checks (P5)
1036        let rels: AHashSet<(StrId, StrId, StrId)> = relations
1037            .iter()
1038            .map(|r| {
1039                (
1040                    self.interner.intern(&r.from),
1041                    self.interner.intern(&r.to),
1042                    self.interner.intern(&r.relation_type),
1043                )
1044            })
1045            .collect();
1046        self.relations
1047            .retain(|r| !rels.contains(&(r.from, r.to, r.relation_type)));
1048        for relation in relations {
1049            let mut buf = Vec::new();
1050            store_enc::encode_delete_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
1051                .map_err(MCSError::IoError)?;
1052            self.store.write_record(RecordKind::DeleteRelation, &buf)
1053                .map_err(MCSError::IoError)?;
1054        }
1055        Ok(())
1056    }
1057
1058    pub fn read_graph(&self) -> KnowledgeGraphOut {
1059        self.read_graph_view().to_owned_out()
1060    }
1061
1062    /// Borrowing, allocation-light view of the full graph (M6). Serializing it
1063    /// streams interned `&str` directly instead of materializing a `String`
1064    /// per name/type/observation.
1065    pub fn read_graph_view(&self) -> GraphView<'_> {
1066        let entities: Vec<&StoredEntity> = self
1067            .entity_slots
1068            .iter()
1069            .filter_map(|s| s.as_ref().filter(|e| e.is_live()))
1070            .collect();
1071        let relations: Vec<&StoredRelation> = self.relations.iter().collect();
1072        GraphView { kg: self, entities, relations }
1073    }
1074
1075    /// Relevance-ranked substring search returning all matches (no pagination).
1076    /// Equivalent to `search_nodes_filtered(query, None, 0, usize::MAX)`.
1077    pub fn search_nodes(&self, query: &str) -> KnowledgeGraphOut {
1078        self.search_nodes_filtered(query, None, 0, usize::MAX)
1079    }
1080
1081    pub fn open_nodes(&self, names: &[String]) -> KnowledgeGraphOut {
1082        self.open_nodes_view(names).to_owned_out()
1083    }
1084
1085    /// Borrowing view variant of [`open_nodes`] (M6).
1086    pub fn open_nodes_view(&self, names: &[String]) -> GraphView<'_> {
1087        let name_ids: AHashSet<StrId> = names.iter()
1088            .filter_map(|n| self.interner.get_optional(n))
1089            .collect();
1090        let entities: Vec<&StoredEntity> = self
1091            .entity_slots
1092            .iter()
1093            .filter_map(|s| {
1094                s.as_ref()
1095                    .filter(|stored| stored.is_live() && name_ids.contains(&stored.name))
1096            })
1097            .collect();
1098        let matched_names: AHashSet<StrId> = entities.iter().map(|e| e.name).collect();
1099        let relations: Vec<&StoredRelation> = self
1100            .relations
1101            .iter()
1102            .filter(|r| matched_names.contains(&r.from) || matched_names.contains(&r.to))
1103            .collect();
1104        GraphView { kg: self, entities, relations }
1105    }
1106
1107    // -----------------------------------------------------------------------
1108    // Internal helpers
1109    // -----------------------------------------------------------------------
1110
1111    fn entity_to_output(&self, stored: &StoredEntity) -> Entity {
1112        Entity {
1113            name: self.interner.lookup(stored.name).to_string(),
1114            entity_type: self.interner.lookup(stored.entity_type).to_string(),
1115            observations: stored
1116                .observations
1117                .iter()
1118                .map(|o| self.interner.lookup(*o).to_string())
1119                .collect(),
1120        }
1121    }
1122
1123    #[inline]
1124    fn relation_to_output(&self, r: &StoredRelation) -> Relation {
1125        Relation {
1126            from: self.interner.lookup(r.from).to_string(),
1127            to: self.interner.lookup(r.to).to_string(),
1128            relation_type: self.interner.lookup(r.relation_type).to_string(),
1129        }
1130    }
1131
1132    /// Resolve a name to its live entity slot, or `None` if absent/deleted.
1133    fn lookup_live_slot(&self, name: &str) -> Option<u32> {
1134        let name_id = self.interner.get_optional(name)?;
1135        let hash = self.interner.get_hash(name_id);
1136        let slot = self.name_table.lookup(hash, name_id)?;
1137        let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
1138        stored.is_live().then_some(slot)
1139    }
1140
1141    /// Materialize a live entity from its interned name id.
1142    fn entity_by_name_id(&self, name_id: StrId) -> Option<Entity> {
1143        let hash = self.interner.get_hash(name_id);
1144        let slot = self.name_table.lookup(hash, name_id)?;
1145        let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
1146        stored.is_live().then(|| self.entity_to_output(stored))
1147    }
1148
1149    /// Tally distinct entity types and their live-entity counts, ranked by
1150    /// count descending (ties broken by name). One linear pass over the dense
1151    /// slot vec; only the final names are allocated.
1152    pub fn entity_type_counts(&self) -> Vec<(String, usize)> {
1153        let mut counts: AHashMap<StrId, usize> = AHashMap::new();
1154        for st in self
1155            .entity_slots
1156            .iter()
1157            .filter_map(|s| s.as_ref())
1158            .filter(|e| e.is_live())
1159        {
1160            *counts.entry(st.entity_type).or_insert(0) += 1;
1161        }
1162        self.rank_counts(counts)
1163    }
1164
1165    /// Tally distinct relation types and their counts, ranked by count desc.
1166    pub fn relation_type_counts(&self) -> Vec<(String, usize)> {
1167        let mut counts: AHashMap<StrId, usize> = AHashMap::new();
1168        for r in &self.relations {
1169            *counts.entry(r.relation_type).or_insert(0) += 1;
1170        }
1171        self.rank_counts(counts)
1172    }
1173
1174    fn rank_counts(&self, counts: AHashMap<StrId, usize>) -> Vec<(String, usize)> {
1175        let mut out: Vec<(String, usize)> = counts
1176            .into_iter()
1177            .map(|(id, c)| (self.interner.lookup(id).to_string(), c))
1178            .collect();
1179        out.sort_unstable_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
1180        out
1181    }
1182
1183    /// Relevance-ranked, optionally type-filtered, paginated node search.
1184    /// Entities come back best-match-first (see [`SearchIndex::search_ranked`]).
1185    /// Relations touching any returned entity (either endpoint) are included.
1186    pub fn search_nodes_filtered(
1187        &self,
1188        query: &str,
1189        entity_type: Option<&str>,
1190        offset: usize,
1191        limit: usize,
1192    ) -> KnowledgeGraphOut {
1193        self.search_nodes_view(query, entity_type, offset, limit).to_owned_out()
1194    }
1195
1196    /// Borrowing view variant of [`search_nodes_filtered`] (M6).
1197    pub fn search_nodes_view(
1198        &self,
1199        query: &str,
1200        entity_type: Option<&str>,
1201        offset: usize,
1202        limit: usize,
1203    ) -> GraphView<'_> {
1204        let type_id = match entity_type {
1205            Some(t) => match self.interner.get_optional(t) {
1206                Some(id) => Some(id),
1207                None => return GraphView { kg: self, entities: Vec::new(), relations: Vec::new() },
1208            },
1209            None => None,
1210        };
1211
1212        let ranked = self.search.search_ranked(query, &self.interner);
1213        let mut selected: AHashSet<StrId> = AHashSet::new();
1214        let mut entities: Vec<&StoredEntity> = Vec::new();
1215        let mut skipped = 0usize;
1216        for (slot, _score) in ranked {
1217            let Some(st) = self
1218                .entity_slots
1219                .get(slot as usize)
1220                .and_then(|s| s.as_ref())
1221                .filter(|e| e.is_live())
1222            else {
1223                continue;
1224            };
1225            if type_id.is_some_and(|tid| st.entity_type != tid) {
1226                continue;
1227            }
1228            if skipped < offset {
1229                skipped += 1;
1230                continue;
1231            }
1232            if entities.len() >= limit {
1233                break;
1234            }
1235            selected.insert(st.name);
1236            entities.push(st);
1237        }
1238
1239        let relations: Vec<&StoredRelation> = self
1240            .relations
1241            .iter()
1242            .filter(|r| selected.contains(&r.from) || selected.contains(&r.to))
1243            .collect();
1244        GraphView { kg: self, entities, relations }
1245    }
1246
1247    /// Type-filtered, paginated view of the whole graph. Unlike [`read_graph`],
1248    /// relations are restricted to those whose **both** endpoints fall in the
1249    /// returned entity page, so the slice is internally consistent.
1250    pub fn read_graph_filtered(
1251        &self,
1252        entity_type: Option<&str>,
1253        offset: usize,
1254        limit: usize,
1255    ) -> KnowledgeGraphOut {
1256        self.read_graph_filtered_view(entity_type, offset, limit).to_owned_out()
1257    }
1258
1259    /// Borrowing view variant of [`read_graph_filtered`] (M6).
1260    pub fn read_graph_filtered_view(
1261        &self,
1262        entity_type: Option<&str>,
1263        offset: usize,
1264        limit: usize,
1265    ) -> GraphView<'_> {
1266        let type_id = match entity_type {
1267            Some(t) => match self.interner.get_optional(t) {
1268                Some(id) => Some(id),
1269                None => return GraphView { kg: self, entities: Vec::new(), relations: Vec::new() },
1270            },
1271            None => None,
1272        };
1273
1274        let mut selected: AHashSet<StrId> = AHashSet::new();
1275        let mut entities: Vec<&StoredEntity> = Vec::new();
1276        let mut skipped = 0usize;
1277        for st in self
1278            .entity_slots
1279            .iter()
1280            .filter_map(|s| s.as_ref())
1281            .filter(|e| e.is_live())
1282        {
1283            if type_id.is_some_and(|tid| st.entity_type != tid) {
1284                continue;
1285            }
1286            if skipped < offset {
1287                skipped += 1;
1288                continue;
1289            }
1290            if entities.len() >= limit {
1291                break;
1292            }
1293            selected.insert(st.name);
1294            entities.push(st);
1295        }
1296
1297        let relations: Vec<&StoredRelation> = self
1298            .relations
1299            .iter()
1300            .filter(|r| selected.contains(&r.from) && selected.contains(&r.to))
1301            .collect();
1302        GraphView { kg: self, entities, relations }
1303    }
1304
1305    /// Neighborhood expansion around `name` out to `depth` hops, following
1306    /// edges in the requested [`Direction`] and (optionally) of one relation
1307    /// type. Returns the origin plus reached entities, and every relation
1308    /// (passing the type filter) whose endpoints are both inside that set.
1309    ///
1310    /// `depth == 1` (the common case) is a single linear pass over the flat
1311    /// relation vec; deeper queries build an adjacency map once (O(E)) and BFS.
1312    pub fn neighbors(
1313        &self,
1314        name: &str,
1315        direction: Direction,
1316        rtype: Option<&str>,
1317        depth: u32,
1318    ) -> Result<KnowledgeGraphOut> {
1319        self.lookup_live_slot(name)
1320            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
1321        // Safe: lookup_live_slot succeeded, so the name is interned.
1322        let start = self.interner.get_optional(name).unwrap();
1323
1324        // An unknown relation-type filter can match nothing: return just origin.
1325        let rtype_id = match rtype {
1326            Some(r) => match self.interner.get_optional(r) {
1327                Some(id) => Some(id),
1328                None => {
1329                    let entities = self.entity_by_name_id(start).into_iter().collect();
1330                    return Ok(KnowledgeGraphOut { entities, relations: Vec::new() });
1331                }
1332            },
1333            None => None,
1334        };
1335
1336        let mut visited: AHashSet<StrId> = AHashSet::new();
1337        visited.insert(start);
1338
1339        let type_ok = |r: &StoredRelation| rtype_id.is_none_or(|rt| r.relation_type == rt);
1340
1341        if depth == 1 {
1342            for r in self.relations.iter().filter(|r| type_ok(r)) {
1343                match direction {
1344                    Direction::Out => {
1345                        if r.from == start {
1346                            visited.insert(r.to);
1347                        }
1348                    }
1349                    Direction::In => {
1350                        if r.to == start {
1351                            visited.insert(r.from);
1352                        }
1353                    }
1354                    Direction::Both => {
1355                        if r.from == start {
1356                            visited.insert(r.to);
1357                        } else if r.to == start {
1358                            visited.insert(r.from);
1359                        }
1360                    }
1361                }
1362            }
1363        } else if depth >= 2 {
1364            // Build a direction-aware adjacency map once, then BFS.
1365            let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
1366            for r in self.relations.iter().filter(|r| type_ok(r)) {
1367                match direction {
1368                    Direction::Out => adj.entry(r.from).or_default().push(r.to),
1369                    Direction::In => adj.entry(r.to).or_default().push(r.from),
1370                    Direction::Both => {
1371                        adj.entry(r.from).or_default().push(r.to);
1372                        adj.entry(r.to).or_default().push(r.from);
1373                    }
1374                }
1375            }
1376            let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
1377            queue.push_back((start, 0));
1378            while let Some((node, d)) = queue.pop_front() {
1379                if d >= depth {
1380                    continue;
1381                }
1382                if let Some(nbrs) = adj.get(&node) {
1383                    for &nb in nbrs {
1384                        if visited.insert(nb) {
1385                            queue.push_back((nb, d + 1));
1386                        }
1387                    }
1388                }
1389            }
1390        }
1391
1392        let mut entities = Vec::with_capacity(visited.len());
1393        for &nid in &visited {
1394            if let Some(e) = self.entity_by_name_id(nid) {
1395                entities.push(e);
1396            }
1397        }
1398        let relations = self
1399            .relations
1400            .iter()
1401            .filter(|r| type_ok(r) && visited.contains(&r.from) && visited.contains(&r.to))
1402            .map(|r| self.relation_to_output(r))
1403            .collect();
1404        Ok(KnowledgeGraphOut { entities, relations })
1405    }
1406
1407    /// One-shot context bundle for a single entity: the entity itself, every
1408    /// incident relation, its distinct neighbor names, and its degree. Saves an
1409    /// agent the get_entity + two search_relations round-trips.
1410    pub fn describe_entity(&self, name: &str) -> Result<serde_json::Value> {
1411        let name_id = self
1412            .interner
1413            .get_optional(name)
1414            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
1415        let entity = self
1416            .entity_by_name_id(name_id)
1417            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
1418
1419        let mut incident: Vec<Relation> = Vec::new();
1420        let mut neighbor_seen: AHashSet<StrId> = AHashSet::new();
1421        let mut neighbors: Vec<&str> = Vec::new();
1422        for r in &self.relations {
1423            if r.from == name_id || r.to == name_id {
1424                incident.push(self.relation_to_output(r));
1425                let other = if r.from == name_id { r.to } else { r.from };
1426                if other != name_id && neighbor_seen.insert(other) {
1427                    neighbors.push(self.interner.lookup(other));
1428                }
1429            }
1430        }
1431
1432        Ok(serde_json::json!({
1433            "entity": entity,
1434            "relations": incident,
1435            "neighbors": neighbors,
1436            "degree": incident.len(),
1437        }))
1438    }
1439
1440    /// Create-or-merge a batch of entities idempotently. Missing entities are
1441    /// created; existing ones keep their type and gain any new observations
1442    /// (deduplicated). Returns a per-entity outcome. The caller is responsible
1443    /// for flushing — every underlying op is already write-ahead logged.
1444    pub fn upsert_entities(&mut self, entities: &[Entity]) -> Result<Vec<serde_json::Value>> {
1445        for e in entities {
1446            if e.name.is_empty() {
1447                return Err(MCSError::InvalidParams(
1448                    "Entity name must not be empty".into(),
1449                ));
1450            }
1451        }
1452        let mut out = Vec::with_capacity(entities.len());
1453        for e in entities {
1454            if self.lookup_live_slot(&e.name).is_some() {
1455                let added = self.add_observations(&e.name, &e.observations)?;
1456                out.push(serde_json::json!({
1457                    "name": e.name,
1458                    "created": false,
1459                    "addedObservations": added,
1460                }));
1461            } else {
1462                let created = self.create_entities(std::slice::from_ref(e))?;
1463                out.push(serde_json::json!({
1464                    "name": e.name,
1465                    "created": !created.is_empty(),
1466                    "addedObservations": e.observations,
1467                }));
1468            }
1469        }
1470        Ok(out)
1471    }
1472
1473    /// Serialize the graph in one of: `json` (read_graph), `mermaid`, `dot`.
1474    pub fn export(&self, format: &str) -> Result<String> {
1475        match format {
1476            "json" => serde_json::to_string(&self.read_graph()).map_err(MCSError::JsonError),
1477            "mermaid" => Ok(self.export_mermaid()),
1478            "dot" => Ok(self.export_dot()),
1479            other => Err(MCSError::InvalidParams(format!(
1480                "Unknown export format '{other}' (expected json|mermaid|dot)"
1481            ))),
1482        }
1483    }
1484
1485    /// Assign each live entity a stable `n{k}` node id for diagram output.
1486    fn diagram_node_ids(&self) -> (AHashMap<StrId, usize>, Vec<(usize, StrId)>) {
1487        let mut ids: AHashMap<StrId, usize> = AHashMap::new();
1488        let mut order: Vec<(usize, StrId)> = Vec::new();
1489        for st in self
1490            .entity_slots
1491            .iter()
1492            .filter_map(|s| s.as_ref())
1493            .filter(|e| e.is_live())
1494        {
1495            let n = ids.len();
1496            ids.insert(st.name, n);
1497            order.push((n, st.name));
1498        }
1499        (ids, order)
1500    }
1501
1502    fn export_mermaid(&self) -> String {
1503        let (ids, order) = self.diagram_node_ids();
1504        let mut s = String::with_capacity(64 + order.len() * 32 + self.relations.len() * 32);
1505        s.push_str("graph LR\n");
1506        for (n, name_id) in &order {
1507            let label = sanitize_label(self.interner.lookup(*name_id));
1508            s.push_str(&format!("  n{n}[\"{label}\"]\n"));
1509        }
1510        for r in &self.relations {
1511            if let (Some(&a), Some(&b)) = (ids.get(&r.from), ids.get(&r.to)) {
1512                let rel = sanitize_label(self.interner.lookup(r.relation_type));
1513                s.push_str(&format!("  n{a} -->|{rel}| n{b}\n"));
1514            }
1515        }
1516        s
1517    }
1518
1519    fn export_dot(&self) -> String {
1520        let (ids, order) = self.diagram_node_ids();
1521        let mut s = String::with_capacity(64 + order.len() * 32 + self.relations.len() * 32);
1522        s.push_str("digraph G {\n");
1523        for (n, name_id) in &order {
1524            let label = sanitize_label(self.interner.lookup(*name_id));
1525            s.push_str(&format!("  n{n} [label=\"{label}\"];\n"));
1526        }
1527        for r in &self.relations {
1528            if let (Some(&a), Some(&b)) = (ids.get(&r.from), ids.get(&r.to)) {
1529                let rel = sanitize_label(self.interner.lookup(r.relation_type));
1530                s.push_str(&format!("  n{a} -> n{b} [label=\"{rel}\"];\n"));
1531            }
1532        }
1533        s.push_str("}\n");
1534        s
1535    }
1536
1537    // ------ High-level productivity tools ------
1538
1539    /// Merge `source` entity into `target` entity. All observations from
1540    /// source are moved to target (deduplicated), all relations involving
1541    /// source are redirected to target (deduplicated), and source is then
1542    /// deleted. Every underlying mutation goes through the write-ahead log.
1543    /// Caller is responsible for `flush_and_sync()`.
1544    pub fn merge_entities(&mut self, source: &str, target: &str) -> Result<serde_json::Value> {
1545        if source == target {
1546            return Err(MCSError::InvalidParams(
1547                "Source and target must be different entities".into(),
1548            ));
1549        }
1550        self.lookup_live_slot(source).ok_or_else(|| {
1551            MCSError::InvalidParams(format!("Source entity '{source}' not found"))
1552        })?;
1553        self.lookup_live_slot(target).ok_or_else(|| {
1554            MCSError::InvalidParams(format!("Target entity '{target}' not found"))
1555        })?;
1556
1557        let source_entity = self.get_entity(source).unwrap();
1558        let moved_obs_count = source_entity.observations.len();
1559
1560        // Move observations to target (dedup via add_observations).
1561        let added_count = if !source_entity.observations.is_empty() {
1562            self.add_observations(target, &source_entity.observations)?.len()
1563        } else {
1564            0
1565        };
1566
1567        // Redirect relations: create new ones with target in place of source.
1568        let source_id = self.interner.get_optional(source).unwrap();
1569        let source_rels: Vec<Relation> = self
1570            .relations
1571            .iter()
1572            .filter(|r| r.from == source_id || r.to == source_id)
1573            .filter_map(|r| {
1574                let new_from = if r.from == source_id {
1575                    target
1576                } else {
1577                    self.interner.lookup(r.from)
1578                };
1579                let new_to = if r.to == source_id {
1580                    target
1581                } else {
1582                    self.interner.lookup(r.to)
1583                };
1584                // Skip self-loops — they are meaningless after redirect.
1585                if new_from == new_to {
1586                    None
1587                } else {
1588                    Some(Relation {
1589                        from: new_from.to_string(),
1590                        to: new_to.to_string(),
1591                        relation_type: self.interner.lookup(r.relation_type).to_string(),
1592                    })
1593                }
1594            })
1595            .collect();
1596
1597        let redirected = self.create_relations(&source_rels)?.len() as u32;
1598
1599        // Delete source entity (also removes stale relations).
1600        self.delete_entities(&[source.to_string()])?;
1601
1602        Ok(serde_json::json!({
1603            "source": source,
1604            "target": target,
1605            "movedObservations": moved_obs_count,
1606            "addedObservations": added_count,
1607            "redirectedRelations": redirected,
1608        }))
1609    }
1610
1611    /// Extract a connected subgraph around one or more seed entity names,
1612    /// expanding out to `depth` hops along all relations (undirected). Returns
1613    /// the set of reached entities and the relations among them.
1614    pub fn extract_subgraph(&self, names: &[String], depth: u32) -> Result<KnowledgeGraphOut> {
1615        if names.is_empty() {
1616            return Ok(KnowledgeGraphOut {
1617                entities: Vec::new(),
1618                relations: Vec::new(),
1619            });
1620        }
1621        // Seed the BFS queue from any names that exist.
1622        let mut visited: AHashSet<StrId> = AHashSet::new();
1623        let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
1624        for name in names {
1625            if let Some(id) = self.interner.get_optional(name) {
1626                if visited.insert(id) {
1627                    queue.push_back((id, 0));
1628                }
1629            }
1630        }
1631        // Build an undirected adjacency map once.
1632        let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
1633        for r in &self.relations {
1634            adj.entry(r.from).or_default().push(r.to);
1635            adj.entry(r.to).or_default().push(r.from);
1636        }
1637        while let Some((node, d)) = queue.pop_front() {
1638            if d >= depth {
1639                continue;
1640            }
1641            if let Some(nbrs) = adj.get(&node) {
1642                for &nb in nbrs {
1643                    if visited.insert(nb) {
1644                        queue.push_back((nb, d + 1));
1645                    }
1646                }
1647            }
1648        }
1649        let mut entities: Vec<Entity> = Vec::with_capacity(visited.len());
1650        for &nid in &visited {
1651            if let Some(e) = self.entity_by_name_id(nid) {
1652                entities.push(e);
1653            }
1654        }
1655        let relations: Vec<Relation> = self
1656            .relations
1657            .iter()
1658            .filter(|r| visited.contains(&r.from) && visited.contains(&r.to))
1659            .map(|r| self.relation_to_output(r))
1660            .collect();
1661        Ok(KnowledgeGraphOut { entities, relations })
1662    }
1663
1664    /// Return full entities for a list of names. Missing names yield `None`.
1665    pub fn batch_get_entities(&self, names: &[String]) -> Vec<Option<Entity>> {
1666        names.iter().map(|n| self.get_entity(n)).collect()
1667    }
1668
1669    /// Recursive DFS helper — collects every simple path from `current` to
1670    /// `target` up to `max_depth` hops, capped at `max_paths` results.
1671    fn dfs_all_paths(
1672        adj: &AHashMap<StrId, Vec<StrId>>,
1673        current: StrId,
1674        target: StrId,
1675        max_depth: usize,
1676        max_paths: usize,
1677        visited: &mut AHashSet<StrId>,
1678        current_path: &mut Vec<StrId>,
1679        all_paths: &mut Vec<Vec<StrId>>,
1680    ) {
1681        if all_paths.len() >= max_paths {
1682            return;
1683        }
1684        if current == target && current_path.len() > 1 {
1685            all_paths.push(current_path.clone());
1686            return;
1687        }
1688        if current_path.len() > max_depth {
1689            return;
1690        }
1691        if let Some(neighbors) = adj.get(&current) {
1692            for &nb in neighbors {
1693                if visited.insert(nb) {
1694                    current_path.push(nb);
1695                    Self::dfs_all_paths(
1696                        adj, nb, target, max_depth, max_paths, visited, current_path, all_paths,
1697                    );
1698                    current_path.pop();
1699                    visited.remove(&nb);
1700                }
1701            }
1702        }
1703    }
1704
1705    /// Find all simple paths between `from` and `to` up to `max_depth` hops,
1706    /// returning at most `max_paths` results. Paths are found via DFS with
1707    /// backtracking and include both endpoints.
1708    pub fn find_all_paths(
1709        &self,
1710        from: &str,
1711        to: &str,
1712        max_depth: usize,
1713        max_paths: usize,
1714    ) -> Result<Vec<Vec<String>>> {
1715        let from_id = self
1716            .interner
1717            .get_optional(from)
1718            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
1719        let to_id = self
1720            .interner
1721            .get_optional(to)
1722            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
1723        // Verify both are live.
1724        if self.lookup_live_slot(from).is_none() {
1725            return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
1726        }
1727        if self.lookup_live_slot(to).is_none() {
1728            return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
1729        }
1730        if from_id == to_id {
1731            return Ok(vec![vec![from.to_string()]]);
1732        }
1733        // Build undirected adjacency.
1734        let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
1735        for r in &self.relations {
1736            adj.entry(r.from).or_default().push(r.to);
1737            adj.entry(r.to).or_default().push(r.from);
1738        }
1739        let mut all_paths: Vec<Vec<StrId>> = Vec::new();
1740        let mut current_path = Vec::new();
1741        let mut visited: AHashSet<StrId> = AHashSet::new();
1742        visited.insert(from_id);
1743        current_path.push(from_id);
1744        Self::dfs_all_paths(
1745            &adj,
1746            from_id,
1747            to_id,
1748            max_depth,
1749            max_paths,
1750            &mut visited,
1751            &mut current_path,
1752            &mut all_paths,
1753        );
1754        if all_paths.is_empty() {
1755            return Err(MCSError::MemoryError(format!(
1756                "No path found between '{from}' and '{to}'"
1757            )));
1758        }
1759        let result: Vec<Vec<String>> = all_paths
1760            .into_iter()
1761            .map(|path| {
1762                path.into_iter()
1763                    .map(|id| self.interner.lookup(id).to_string())
1764                    .collect()
1765            })
1766            .collect();
1767        Ok(result)
1768    }
1769
1770    // --- Flush & sync ---
1771
1772    /// Flush and fsync the log to stable storage.
1773    pub fn flush_and_sync(&mut self) -> Result<()> {
1774        self.store.flush_and_sync().map_err(MCSError::IoError)
1775    }
1776}