Skip to main content

mcp_memory/
kg.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2use std::path::Path;
3
4use serde::ser::{Serialize, SerializeSeq, SerializeStruct, Serializer};
5
6use crate::errors::{MCSError, Result};
7use crate::intern::{StrId, StringInterner};
8use crate::types::{Entity, Relation, KnowledgeGraphOut};
9use crate::search::SearchIndex;
10use crate::store::{self as store_enc, BinaryStore, RecordKind};
11
12const ENTITY_SLOT_LIVE: u8 = 1;
13const NAME_TABLE_SHARDS: usize = 4;
14
15// ---------------------------------------------------------------------------
16// Prefetch helper – issues a non-binding software prefetch hint to pull a
17// cache-line into L1/L2 while we finish probing the current entry.
18// ---------------------------------------------------------------------------
19#[cfg(target_arch = "x86_64")]
20#[inline(always)]
21unsafe fn prefetch_addr(addr: *const u8) {
22    // _MM_HINT_T0 = 3  (temporal prefetch to all cache levels)
23    std::arch::x86_64::_mm_prefetch::<3>(addr);
24}
25
26#[cfg(not(target_arch = "x86_64"))]
27#[inline(always)]
28const unsafe fn prefetch_addr(_addr: *const u8) {}
29
30// ---------------------------------------------------------------------------
31// StoredEntity / StoredRelation – internal representations using StrId.
32// ---------------------------------------------------------------------------
33// Default layout: 40 B / align 8 (Rust packs `state` into the Vec's padding and
34// the `Option` niche is free). With `cache_align`, the slot is rounded to a full
35// 64-byte line so a point lookup/mutation (name_table -> slot index) touches
36// exactly one cache line instead of occasionally straddling two. Costs +60%
37// memory and a wider stride on bulk scans — measure before enabling.
38#[cfg_attr(feature = "cache_align", repr(align(64)))]
39struct StoredEntity {
40    state: u8,
41    name: StrId,
42    entity_type: StrId,
43    observations: Vec<StrId>,
44}
45
46impl StoredEntity {
47    const fn is_live(&self) -> bool {
48        self.state == ENTITY_SLOT_LIVE
49    }
50}
51
52// Default layout: 12 B / align 4 → ~1 in 5 records straddles a 64-byte line.
53// With `cache_align`, align(16) rounds the size to 16 B so 4 records fill a line
54// exactly (no straddle, AVX2-load-friendly) for +33% memory.
55#[cfg_attr(feature = "cache_align", repr(align(16)))]
56struct StoredRelation {
57    from: StrId,
58    to: StrId,
59    relation_type: StrId,
60}
61
62// ---------------------------------------------------------------------------
63// Borrowing serialization views (M6).
64//
65// Read tools used to build owned `Entity`/`Relation` vecs (a fresh `String`
66// per name/type/observation) and *then* serialize them — roughly 2-3x the
67// graph resident at once. These views instead hold references to the selected
68// stored records and emit their interned `&str` directly during
69// serialization, with no intermediate owned strings. The emitted JSON is
70// byte-for-byte identical to serializing `KnowledgeGraphOut`.
71// ---------------------------------------------------------------------------
72
73/// A borrowing view over a selected slice of the graph. Serializes to
74/// `{"entities":[...],"relations":[...]}`.
75pub struct GraphView<'a> {
76    kg: &'a KnowledgeGraph,
77    entities: Vec<&'a StoredEntity>,
78    relations: Vec<&'a StoredRelation>,
79}
80
81impl GraphView<'_> {
82    /// Materialize into the owned [`KnowledgeGraphOut`]. Used by the direct
83    /// (non-serializing) callers and tests; the server's read handlers
84    /// serialize the view directly instead.
85    pub fn to_owned_out(&self) -> KnowledgeGraphOut {
86        KnowledgeGraphOut {
87            entities: self.entities.iter().map(|e| self.kg.entity_to_output(e)).collect(),
88            relations: self.relations.iter().map(|r| self.kg.relation_to_output(r)).collect(),
89        }
90    }
91}
92
93impl Serialize for GraphView<'_> {
94    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
95        let mut st = s.serialize_struct("KnowledgeGraphOut", 2)?;
96        st.serialize_field("entities", &EntityListRef { kg: self.kg, items: &self.entities })?;
97        st.serialize_field("relations", &RelationListRef { kg: self.kg, items: &self.relations })?;
98        st.end()
99    }
100}
101
102struct EntityListRef<'a> { kg: &'a KnowledgeGraph, items: &'a [&'a StoredEntity] }
103impl Serialize for EntityListRef<'_> {
104    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
105        let mut seq = s.serialize_seq(Some(self.items.len()))?;
106        for &e in self.items {
107            seq.serialize_element(&EntityRef { kg: self.kg, e })?;
108        }
109        seq.end()
110    }
111}
112
113struct RelationListRef<'a> { kg: &'a KnowledgeGraph, items: &'a [&'a StoredRelation] }
114impl Serialize for RelationListRef<'_> {
115    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
116        let mut seq = s.serialize_seq(Some(self.items.len()))?;
117        for &r in self.items {
118            seq.serialize_element(&RelationRef { kg: self.kg, r })?;
119        }
120        seq.end()
121    }
122}
123
124struct EntityRef<'a> { kg: &'a KnowledgeGraph, e: &'a StoredEntity }
125impl Serialize for EntityRef<'_> {
126    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
127        let mut st = s.serialize_struct("Entity", 3)?;
128        st.serialize_field("name", self.kg.interner.lookup(self.e.name))?;
129        st.serialize_field("entityType", self.kg.interner.lookup(self.e.entity_type))?;
130        st.serialize_field("observations", &ObsRef { kg: self.kg, obs: &self.e.observations })?;
131        st.end()
132    }
133}
134
135struct ObsRef<'a> { kg: &'a KnowledgeGraph, obs: &'a [StrId] }
136impl Serialize for ObsRef<'_> {
137    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
138        let mut seq = s.serialize_seq(Some(self.obs.len()))?;
139        for &o in self.obs {
140            seq.serialize_element(self.kg.interner.lookup(o))?;
141        }
142        seq.end()
143    }
144}
145
146struct RelationRef<'a> { kg: &'a KnowledgeGraph, r: &'a StoredRelation }
147impl Serialize for RelationRef<'_> {
148    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
149        let mut st = s.serialize_struct("Relation", 3)?;
150        st.serialize_field("from", self.kg.interner.lookup(self.r.from))?;
151        st.serialize_field("to", self.kg.interner.lookup(self.r.to))?;
152        st.serialize_field("relationType", self.kg.interner.lookup(self.r.relation_type))?;
153        st.end()
154    }
155}
156
157/// Edge-following direction for neighborhood queries.
158#[derive(Clone, Copy, PartialEq, Eq, Debug)]
159pub enum Direction {
160    /// Follow `from -> to` (outgoing edges).
161    Out,
162    /// Follow `to -> from` (incoming edges).
163    In,
164    /// Follow edges regardless of orientation.
165    Both,
166}
167
168impl Direction {
169    /// Parse a direction string; anything other than `"out"`/`"in"` is `Both`.
170    pub fn parse(s: Option<&str>) -> Self {
171        match s {
172            Some("out") => Direction::Out,
173            Some("in") => Direction::In,
174            _ => Direction::Both,
175        }
176    }
177}
178
179/// Escape a string for embedding inside a Mermaid/DOT quoted label.
180fn sanitize_label(s: &str) -> String {
181    let mut out = String::with_capacity(s.len());
182    for c in s.chars() {
183        match c {
184            '"' => out.push('\''),
185            '\n' | '\r' => out.push(' '),
186            _ => out.push(c),
187        }
188    }
189    out
190}
191
192// ---------------------------------------------------------------------------
193// ShardedNameTable – open-addressing hash map split into N independent shards.
194//
195// Each shard uses **ctrl-byte bucket** approach: a 1-byte metadata array
196// stores the 7-bit hash stamp (h2) for each slot, with `0xFF` = EMPTY.
197// On probe, the first memory access is a single byte (ctrl). The full key
198// (StrId) is only compared when the stamp matches — ~127/128 of probe steps
199// touch nothing but the ctrl byte.  See also SwissTable / hashbrown.
200// ---------------------------------------------------------------------------
201const EMPTY_SLOT: u8 = 0xFF;
202
203#[inline(always)]
204const fn h2(hash: u64) -> u8 {
205    (hash & 0x7F) as u8
206}
207
208#[inline(always)]
209const fn h1(hash: u64, mask: usize) -> usize {
210    ((hash >> 7) as usize) & mask
211}
212
213struct NameTableShard {
214    ctrl: Vec<u8>,      // 0xFF = empty; 0x00-0x7F = h2 stamp (bit 7 always clear)
215    names: Vec<StrId>,
216    slots: Vec<u32>,
217    mask: usize,
218    count: usize,
219}
220
221impl NameTableShard {
222    fn new(capacity: usize) -> Self {
223        let cap = capacity.next_power_of_two().max(16);
224        Self {
225            ctrl: vec![EMPTY_SLOT; cap],
226            names: vec![StrId::EMPTY; cap],
227            slots: vec![u32::MAX; cap],
228            mask: cap - 1,
229            count: 0,
230        }
231    }
232
233    #[inline(always)]
234    fn lookup(&self, hash: u64, name: StrId) -> Option<u32> {
235        let stamp = h2(hash);
236        let mask = self.mask;
237        let mut idx = h1(hash, mask);
238        let ctrl = self.ctrl.as_ptr();
239        let names = self.names.as_ptr();
240        let slots = self.slots.as_ptr();
241        let len = self.ctrl.len();
242
243        for _ in 0..len {
244            // Prefetch the ctrl byte 4 slots ahead — overlaps memory latency.
245            let prefetch_idx = idx.wrapping_add(4) & mask;
246            unsafe { prefetch_addr(ctrl.add(prefetch_idx)) };
247
248            // SAFETY: idx always < len because of &mask on each iteration.
249            unsafe {
250                let c = *ctrl.add(idx);
251                // Bit 7 set → EMPTY → key not present.
252                if c & 0x80 != 0 {
253                    return None;
254                }
255                // Stamp match → compare full key (rare: ~1/128 probes).
256                if c == stamp && *names.add(idx) == name {
257                    return Some(*slots.add(idx));
258                }
259            }
260            idx = (idx + 1) & mask;
261        }
262        None
263    }
264
265    fn insert(&mut self, interner: &StringInterner, hash: u64, name: StrId, slot: u32) {
266        if self.count * 4 > self.ctrl.len() * 3 {
267            self.grow(interner);
268        }
269        let stamp = h2(hash);
270        let mask = self.mask;
271        let mut idx = h1(hash, mask);
272        loop {
273            // SAFETY: idx & mask always < len for power-of-two capacity.
274            unsafe {
275                if *self.ctrl.get_unchecked(idx) & 0x80 != 0 {
276                    *self.ctrl.get_unchecked_mut(idx) = stamp;
277                    *self.names.get_unchecked_mut(idx) = name;
278                    *self.slots.get_unchecked_mut(idx) = slot;
279                    self.count += 1;
280                    return;
281                }
282            }
283            idx = (idx + 1) & mask;
284        }
285    }
286
287    fn remove(&mut self, interner: &StringInterner, hash: u64, name: StrId) {
288        let stamp = h2(hash);
289        let mask = self.mask;
290        let mut idx = h1(hash, mask);
291        let len = self.ctrl.len();
292        for _ in 0..len {
293            if self.ctrl[idx] & 0x80 != 0 {
294                return;
295            }
296            if self.ctrl[idx] == stamp && self.names[idx] == name {
297                // Found — remove with shift-back to preserve probe chains.
298                self.ctrl[idx] = EMPTY_SLOT;
299                self.names[idx] = StrId::EMPTY;
300                self.slots[idx] = u32::MAX;
301                self.count -= 1;
302
303                let mut next = (idx + 1) & mask;
304                while self.ctrl[next] & 0x80 == 0 {
305                    let nn = self.names[next];
306                    let ns = self.slots[next];
307                    // Hash is no longer stored (M4) — recompute it from the
308                    // interned name to find the entry's ideal bucket.
309                    let nh = interner.get_hash(nn);
310                    self.ctrl[next] = EMPTY_SLOT;
311                    self.names[next] = StrId::EMPTY;
312                    self.slots[next] = u32::MAX;
313                    self.count -= 1;
314
315                    // Re-insert at its ideal bucket.
316                    let nstamp = h2(nh);
317                    let mut re_idx = h1(nh, mask);
318                    while self.ctrl[re_idx] & 0x80 == 0 {
319                        re_idx = (re_idx + 1) & mask;
320                    }
321                    self.ctrl[re_idx] = nstamp;
322                    self.names[re_idx] = nn;
323                    self.slots[re_idx] = ns;
324                    self.count += 1;
325
326                    next = (next + 1) & mask;
327                }
328                return;
329            }
330            idx = (idx + 1) & mask;
331        }
332    }
333
334    fn grow(&mut self, interner: &StringInterner) {
335        let new_cap = self.ctrl.len() * 2;
336        let new_mask = new_cap - 1;
337        let mut new_ctrl = vec![EMPTY_SLOT; new_cap];
338        let mut new_names = vec![StrId::EMPTY; new_cap];
339        let mut new_slots = vec![u32::MAX; new_cap];
340
341        for i in 0..self.ctrl.len() {
342            if self.ctrl[i] & 0x80 == 0 {
343                // Recompute the hash from the interned name (M4: not stored).
344                let name = self.names[i];
345                let hash = interner.get_hash(name);
346                let stamp = h2(hash);
347                let mut idx = h1(hash, new_mask);
348                while new_ctrl[idx] & 0x80 == 0 {
349                    idx = (idx + 1) & new_mask;
350                }
351                new_ctrl[idx] = stamp;
352                new_names[idx] = name;
353                new_slots[idx] = self.slots[i];
354            }
355        }
356
357        self.ctrl = new_ctrl;
358        self.names = new_names;
359        self.slots = new_slots;
360        self.mask = new_mask;
361    }
362}
363
364struct ShardedNameTable {
365    shards: [NameTableShard; NAME_TABLE_SHARDS],
366}
367
368impl ShardedNameTable {
369    fn new(capacity_per_shard: usize) -> Self {
370        Self {
371            shards: [
372                NameTableShard::new(capacity_per_shard),
373                NameTableShard::new(capacity_per_shard),
374                NameTableShard::new(capacity_per_shard),
375                NameTableShard::new(capacity_per_shard),
376            ],
377        }
378    }
379
380    #[inline(always)]
381    const fn shard(hash: u64) -> usize {
382        (hash as usize) & (NAME_TABLE_SHARDS - 1)
383    }
384
385    #[inline(always)]
386    fn lookup(&self, hash: u64, name: StrId) -> Option<u32> {
387        self.shards[Self::shard(hash)].lookup(hash, name)
388    }
389
390    #[inline(always)]
391    fn insert(&mut self, interner: &StringInterner, hash: u64, name: StrId, slot: u32) {
392        self.shards[Self::shard(hash)].insert(interner, hash, name, slot);
393    }
394
395    #[inline(always)]
396    fn remove(&mut self, interner: &StringInterner, hash: u64, name: StrId) {
397        self.shards[Self::shard(hash)].remove(interner, hash, name);
398    }
399}
400
401// ---------------------------------------------------------------------------
402// KnowledgeGraph – the central type.
403// ---------------------------------------------------------------------------
404pub struct KnowledgeGraph {
405    interner: StringInterner,
406    entity_slots: Vec<Option<StoredEntity>>,
407    /// Tombstoned slot indices available for reuse on the next create (M2),
408    /// so create/delete churn doesn't grow `entity_slots` without bound.
409    free_slots: Vec<u32>,
410    name_table: ShardedNameTable,
411    relations: Vec<StoredRelation>,
412    search: SearchIndex,
413    store: BinaryStore,
414}
415
416impl KnowledgeGraph {
417    pub fn new(path: &Path) -> std::io::Result<Self> {
418        let store = BinaryStore::new(path)?;
419
420        // Replay into local collections, then assign into self — no raw pointers needed (X3).
421        let mut interner = StringInterner::with_capacity(65536, 1024);
422        let mut entity_slots: Vec<Option<StoredEntity>> = Vec::with_capacity(256);
423        let mut name_table = ShardedNameTable::new(64);
424        let mut relations: Vec<StoredRelation> = Vec::with_capacity(64);
425        let mut search = SearchIndex::new();
426
427        store.replay(|kind, data| {
428            match kind {
429                RecordKind::CreateEntity => {
430                    if let Some((name, etype, obs)) = store_enc::decode_create_entity(data) {
431                        Self::replay_create_entity(
432                            &mut interner, &mut entity_slots, &mut search, &mut name_table, name, etype, &obs,
433                        );
434                    }
435                }
436                RecordKind::CreateRelation => {
437                    if let Some((from, to, rtype)) = store_enc::decode_create_relation(data) {
438                        let from_id = interner.intern(from);
439                        let to_id = interner.intern(to);
440                        let type_id = interner.intern(rtype);
441                        relations.push(StoredRelation {
442                            from: from_id,
443                            to: to_id,
444                            relation_type: type_id,
445                        });
446                    }
447                }
448                RecordKind::AddObservations => {
449                    if let Some((name, obs)) = store_enc::decode_add_observations(data) {
450                        Self::replay_add_observations(
451                            &mut interner, &mut entity_slots, &mut search, &mut name_table, name, &obs,
452                        );
453                    }
454                }
455                RecordKind::DeleteEntity => {
456                    if let Some(name) = store_enc::decode_delete_entity(data) {
457                        Self::replay_delete_entity(
458                            &mut interner, &mut entity_slots, &mut relations, &mut search, &mut name_table, name,
459                        );
460                    }
461                }
462                RecordKind::DeleteObservations => {
463                    if let Some((name, obs)) = store_enc::decode_delete_observations(data) {
464                        Self::replay_delete_observations(
465                            &mut interner, &mut entity_slots, &mut search, &mut name_table, name, &obs,
466                        );
467                    }
468                }
469                RecordKind::DeleteRelation => {
470                    if let Some((from, to, rtype)) = store_enc::decode_delete_relation(data) {
471                        let from_id = interner.intern(from);
472                        let to_id = interner.intern(to);
473                        let type_id = interner.intern(rtype);
474                        relations.retain(|r| {
475                            !(r.from == from_id && r.to == to_id && r.relation_type == type_id)
476                        });
477                    }
478                }
479            }
480        })?;
481
482        // Slots tombstoned by deletes during replay are available for reuse (M2).
483        let free_slots: Vec<u32> = entity_slots
484            .iter()
485            .enumerate()
486            .filter(|(_, s)| s.is_none())
487            .map(|(i, _)| i as u32)
488            .collect();
489
490        Ok(Self {
491            interner,
492            entity_slots,
493            free_slots,
494            name_table,
495            relations,
496            search,
497            store,
498        })
499    }
500
501    // -----------------------------------------------------------------------
502    // Replay helpers (static to avoid borrow issues in the closure)
503    // -----------------------------------------------------------------------
504
505    #[allow(clippy::ptr_arg)]
506    fn replay_create_entity(
507        interner: &mut StringInterner,
508        entities: &mut Vec<Option<StoredEntity>>,
509        search: &mut SearchIndex,
510        name_table: &mut ShardedNameTable,
511        name: &str,
512        etype: &str,
513        observations: &[&str],
514    ) {
515        let name_id = interner.intern(name);
516        let type_id = interner.intern(etype);
517        let obs_ids: Vec<StrId> = observations.iter().map(|o| interner.intern(o)).collect();
518        let slot = entities.len() as u32;
519        entities.push(Some(StoredEntity {
520            state: ENTITY_SLOT_LIVE,
521            name: name_id,
522            entity_type: type_id,
523            observations: obs_ids.clone(),
524        }));
525        let hash = interner.get_hash(name_id);
526        name_table.insert(&*interner, hash, name_id, slot);
527        search.index_entity(interner, slot, name_id, type_id, &obs_ids);
528    }
529
530    fn replay_add_observations(
531        interner: &mut StringInterner,
532        entities: &mut [Option<StoredEntity>],
533        search: &mut SearchIndex,
534        name_table: &mut ShardedNameTable,
535        name: &str,
536        observations: &[&str],
537    ) {
538        let name_id = interner.intern(name);
539        let hash = interner.get_hash(name_id);
540        if let Some(slot) = name_table.lookup(hash, name_id)
541            && let Some(Some(entity)) = entities.get_mut(slot as usize)
542        {
543            for &o in observations {
544                let oid = interner.intern(o);
545                if !entity.observations.contains(&oid) {
546                    entity.observations.push(oid);
547                }
548            }
549            search.remove_entity(slot);
550            search.index_entity(
551                interner,
552                slot,
553                entity.name,
554                entity.entity_type,
555                &entity.observations,
556            );
557        }
558    }
559
560    fn replay_delete_entity(
561        interner: &mut StringInterner,
562        entities: &mut [Option<StoredEntity>],
563        rels: &mut Vec<StoredRelation>,
564        search: &mut SearchIndex,
565        name_table: &mut ShardedNameTable,
566        name: &str,
567    ) {
568        let name_id = interner.intern(name);
569        let hash = interner.get_hash(name_id);
570        if let Some(slot) = name_table.lookup(hash, name_id)
571            && let Some(Some(_)) = entities.get(slot as usize)
572        {
573            entities[slot as usize] = None;
574            search.remove_entity(slot);
575            name_table.remove(&*interner, hash, name_id);
576        }
577        rels.retain(|r| r.from != name_id && r.to != name_id);
578    }
579
580    fn replay_delete_observations(
581        interner: &mut StringInterner,
582        entities: &mut [Option<StoredEntity>],
583        search: &mut SearchIndex,
584        name_table: &mut ShardedNameTable,
585        name: &str,
586        observations: &[&str],
587    ) {
588        let name_id = interner.intern(name);
589        let hash = interner.get_hash(name_id);
590        if let Some(slot) = name_table.lookup(hash, name_id)
591            && let Some(Some(entity)) = entities.get_mut(slot as usize)
592        {
593            let remove_ids: Vec<StrId> = observations.iter().map(|o| interner.intern(o)).collect();
594            entity.observations.retain(|o| !remove_ids.contains(o));
595            search.remove_entity(slot);
596            search.index_entity(
597                interner,
598                slot,
599                entity.name,
600                entity.entity_type,
601                &entity.observations,
602            );
603        }
604    }
605
606    // -----------------------------------------------------------------------
607    // Public API
608    // -----------------------------------------------------------------------
609
610    pub const fn interner(&self) -> &StringInterner {
611        &self.interner
612    }
613
614    /// Return a single entity by exact name match.
615    pub fn get_entity(&self, name: &str) -> Option<Entity> {
616        let name_id = self.interner.get_optional(name)?;
617        let hash = self.interner.get_hash(name_id);
618        let slot = self.name_table.lookup(hash, name_id)?;
619        let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
620        if !stored.is_live() {
621            return None;
622        }
623        Some(self.entity_to_output(stored))
624    }
625
626    /// Return aggregate statistics about the graph.
627    pub fn graph_stats(&self) -> serde_json::Value {
628        let live_entities = self
629            .entity_slots
630            .iter()
631            .filter(|s| s.as_ref().is_some_and(|e| e.is_live()))
632            .count();
633        let total_relations = self.relations.len();
634        let index_entries = self.search.len();
635        let total_obs: usize = self
636            .entity_slots
637            .iter()
638            .filter_map(|s| s.as_ref())
639            .filter(|e| e.is_live())
640            .map(|e| e.observations.len())
641            .sum();
642
643        serde_json::json!({
644            "entities": live_entities,
645            "relations": total_relations,
646            "totalObservations": total_obs,
647            "searchIndexEntries": index_entries,
648            "internedStrings": self.interner.len(),
649            "internedBytes": self.interner.total_bytes(),
650        })
651    }
652
653    /// Search relations by optional filters: `from`, `to`, `relationType`.
654    /// Any filter that is absent matches everything. A filter value that does
655    /// not exist in the graph returns empty results.
656    pub fn search_relations(&self, from: Option<&str>, to: Option<&str>, rtype: Option<&str>) -> Vec<Relation> {
657        let from_id = match from {
658            Some(f) => match self.interner.get_optional(f) {
659                Some(id) => Some(id),
660                None => return Vec::new(),
661            },
662            None => None,
663        };
664        let to_id = match to {
665            Some(t) => match self.interner.get_optional(t) {
666                Some(id) => Some(id),
667                None => return Vec::new(),
668            },
669            None => None,
670        };
671        let rtype_id = match rtype {
672            Some(r) => match self.interner.get_optional(r) {
673                Some(id) => Some(id),
674                None => return Vec::new(),
675            },
676            None => None,
677        };
678
679        self.relations
680            .iter()
681            .filter(|r| {
682                from_id.is_none_or(|f| r.from == f)
683                    && to_id.is_none_or(|t| r.to == t)
684                    && rtype_id.is_none_or(|rt| r.relation_type == rt)
685            })
686            .map(|r| Relation {
687                from: self.interner.lookup(r.from).to_string(),
688                to: self.interner.lookup(r.to).to_string(),
689                relation_type: self.interner.lookup(r.relation_type).to_string(),
690            })
691            .collect()
692    }
693
694    /// BFS shortest-path between two entity names. Returns the sequence of
695    /// entity names along the path (inclusive of both endpoints).
696    pub fn find_path(&self, from: &str, to: &str) -> Result<Vec<String>> {
697        let from_id = self.interner.get_optional(from)
698            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
699        let to_id = self.interner.get_optional(to)
700            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
701        let hash_from = self.interner.get_hash(from_id);
702        let hash_to = self.interner.get_hash(to_id);
703
704        if self.name_table.lookup(hash_from, from_id).is_none() {
705            return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
706        }
707        if self.name_table.lookup(hash_to, to_id).is_none() {
708            return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
709        }
710        if from_id == to_id {
711            return Ok(vec![from.to_string()]);
712        }
713
714        // Build adjacency list (P4) — O(E) once, not O(V×E).
715        let mut adj: HashMap<StrId, Vec<(StrId, StrId)>> = HashMap::new();
716        for rel in &self.relations {
717            adj.entry(rel.from).or_default().push((rel.to, rel.relation_type));
718            adj.entry(rel.to).or_default().push((rel.from, rel.relation_type));
719        }
720
721        // BFS over adjacency list
722        let mut visited: HashSet<StrId> = HashSet::new();
723        let mut parent: HashMap<StrId, StrId> = HashMap::new();
724        let mut queue: VecDeque<StrId> = VecDeque::new();
725
726        visited.insert(from_id);
727        queue.push_back(from_id);
728
729        while let Some(current) = queue.pop_front() {
730            if current == to_id {
731                break;
732            }
733
734            if let Some(neighbors) = adj.get(&current) {
735                for &(neighbor, _) in neighbors {
736                    if visited.insert(neighbor) {
737                        parent.insert(neighbor, current);
738                        queue.push_back(neighbor);
739                    }
740                }
741            }
742        }
743
744        if !parent.contains_key(&to_id) && from_id != to_id {
745            return Err(MCSError::MemoryError(format!(
746                "No path found between '{from}' and '{to}'"
747            )));
748        }
749
750        // Reconstruct path
751        let mut path: Vec<String> = Vec::new();
752        let mut cur = to_id;
753        loop {
754            path.push(self.interner.lookup(cur).to_string());
755            if cur == from_id {
756                break;
757            }
758            cur = *parent.get(&cur).ok_or_else(|| {
759                MCSError::MemoryError("Path reconstruction failed".into())
760            })?;
761        }
762        path.reverse();
763        Ok(path)
764    }
765
766    /// Rewrite the binary log from the current in-memory state.
767    /// After compaction the log contains only the minimal set of records
768    /// needed to reconstruct the graph (all creates, no deletes).
769    /// Crash-safe: writes to a temp file, then atomically renames (C3).
770    pub fn compact(&mut self) -> Result<()> {
771        // 1. Collect current state as create-records
772        let mut create_entities: Vec<Entity> = Vec::new();
773        let mut create_relations: Vec<Relation> = Vec::new();
774
775        for slot in &self.entity_slots {
776            if let Some(stored) = slot.as_ref().filter(|e| e.is_live()) {
777                create_entities.push(self.entity_to_output(stored));
778            }
779        }
780        for rel in &self.relations {
781            create_relations.push(Relation {
782                from: self.interner.lookup(rel.from).to_string(),
783                to: self.interner.lookup(rel.to).to_string(),
784                relation_type: self.interner.lookup(rel.relation_type).to_string(),
785            });
786        }
787
788        // 2. Write to a temp file first
789        let tmp_path = self.store.path().with_extension("tmp");
790        let mut tmp_store = BinaryStore::new(&tmp_path).map_err(MCSError::IoError)?;
791        for entity in &create_entities {
792            let mut buf = Vec::new();
793            store_enc::encode_create_entity(&mut buf, &entity.name, &entity.entity_type, &entity.observations)
794                .map_err(MCSError::IoError)?;
795            tmp_store.write_record(RecordKind::CreateEntity, &buf).map_err(MCSError::IoError)?;
796        }
797        for relation in &create_relations {
798            let mut buf = Vec::new();
799            store_enc::encode_create_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
800                .map_err(MCSError::IoError)?;
801            tmp_store.write_record(RecordKind::CreateRelation, &buf).map_err(MCSError::IoError)?;
802        }
803        tmp_store.flush_and_sync().map_err(MCSError::IoError)?;
804        drop(tmp_store);
805
806        // 3. Atomically rename over the original (atomic on POSIX)
807        std::fs::rename(&tmp_path, self.store.path()).map_err(MCSError::IoError)?;
808
809        // 4. Rebuild the entire in-memory graph from the compacted log (M1/M2).
810        //    Replaying into fresh structures reclaims the interner arena (stale
811        //    strings from deleted/edited entities), tombstoned entity slots, and
812        //    stale search-index entries — none of which the old reopen-only path
813        //    reclaimed.
814        let path = self.store.path().clone();
815        *self = KnowledgeGraph::new(&path).map_err(MCSError::IoError)?;
816
817        Ok(())
818    }
819
820    // ---- Public API with write-ahead log (C1) and error propagation ----
821
822    pub fn create_entities(&mut self, entities: &[Entity]) -> Result<Vec<Entity>> {
823        // Validate up front so an invalid entity never produces partial writes.
824        for entity in entities {
825            if entity.name.is_empty() {
826                return Err(MCSError::InvalidParams(
827                    "Entity name must not be empty".into(),
828                ));
829            }
830        }
831        let mut created = Vec::new();
832        for entity in entities {
833            // Check dedup before writing (using non-interning lookup)
834            let existing = self.interner.get_optional(&entity.name)
835                .and_then(|id| {
836                    let hash = self.interner.get_hash(id);
837                    self.name_table.lookup(hash, id)
838                });
839            if existing.is_some() {
840                continue;
841            }
842            // Write-ahead: encode and log before mutating state
843            let mut buf = Vec::new();
844            store_enc::encode_create_entity(&mut buf, &entity.name, &entity.entity_type, &entity.observations)
845                .map_err(MCSError::IoError)?;
846            self.store.write_record(RecordKind::CreateEntity, &buf)
847                .map_err(MCSError::IoError)?;
848
849            let name_id = self.interner.intern(&entity.name);
850            let hash = self.interner.get_hash(name_id);
851            let type_id = self.interner.intern(&entity.entity_type);
852            let obs_ids: Vec<StrId> = entity
853                .observations
854                .iter()
855                .map(|o| self.interner.intern(o))
856                .collect();
857            // Reuse a tombstoned slot if one is free (M2); its old search-index
858            // entries were cleared on delete, so the slot starts clean.
859            let reused = self.free_slots.pop();
860            let slot = reused.unwrap_or(self.entity_slots.len() as u32);
861            self.search
862                .index_entity(&mut self.interner, slot, name_id, type_id, &obs_ids);
863            let stored = Some(StoredEntity {
864                state: ENTITY_SLOT_LIVE,
865                name: name_id,
866                entity_type: type_id,
867                observations: obs_ids,
868            });
869            match reused {
870                Some(s) => self.entity_slots[s as usize] = stored,
871                None => self.entity_slots.push(stored),
872            }
873            self.name_table.insert(&self.interner, hash, name_id, slot);
874            created.push(Entity {
875                name: entity.name.clone(),
876                entity_type: entity.entity_type.clone(),
877                observations: entity.observations.clone(),
878            });
879        }
880        Ok(created)
881    }
882
883    pub fn create_relations(&mut self, relations: &[Relation]) -> Result<Vec<Relation>> {
884        // Validate up front so an invalid relation never produces partial writes.
885        for relation in relations {
886            if relation.from.is_empty() || relation.to.is_empty() {
887                return Err(MCSError::InvalidParams(
888                    "Relation endpoints must not be empty".into(),
889                ));
890            }
891        }
892        let mut created = Vec::new();
893        // Build a dedup set for O(1) duplicate checks (P5)
894        let mut rel_set: HashSet<(StrId, StrId, StrId)> = HashSet::new();
895        for rel in &self.relations {
896            rel_set.insert((rel.from, rel.to, rel.relation_type));
897        }
898        for relation in relations {
899            let from_id = self.interner.intern(&relation.from);
900            let to_id = self.interner.intern(&relation.to);
901            let type_id = self.interner.intern(&relation.relation_type);
902            if !rel_set.insert((from_id, to_id, type_id)) {
903                continue;
904            }
905            // Write-ahead: log before mutation
906            let mut buf = Vec::new();
907            store_enc::encode_create_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
908                .map_err(MCSError::IoError)?;
909            self.store.write_record(RecordKind::CreateRelation, &buf)
910                .map_err(MCSError::IoError)?;
911
912            self.relations.push(StoredRelation {
913                from: from_id,
914                to: to_id,
915                relation_type: type_id,
916            });
917            created.push(Relation {
918                from: relation.from.clone(),
919                to: relation.to.clone(),
920                relation_type: relation.relation_type.clone(),
921            });
922        }
923        Ok(created)
924    }
925
926    pub fn add_observations(&mut self, entity_name: &str, contents: &[String]) -> Result<Vec<String>> {
927        let name_id = self.interner.get_optional(entity_name)
928            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
929        let hash = self.interner.get_hash(name_id);
930        let slot = self
931            .name_table
932            .lookup(hash, name_id)
933            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
934        let stored = self
935            .entity_slots
936            .get_mut(slot as usize)
937            .and_then(|e| e.as_mut())
938            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
939
940        // Deduplicate new observations (P7) — use HashSet for O(1) lookups
941        let existing: HashSet<StrId> = stored.observations.iter().copied().collect();
942        let mut added = Vec::new();
943        let mut interned_added = Vec::new();
944        for content in contents {
945            let cid = self.interner.intern(content);
946            if existing.contains(&cid) {
947                continue;
948            }
949            stored.observations.push(cid);
950            interned_added.push(cid);
951            added.push(content.clone());
952        }
953        if !added.is_empty() {
954            // Write-ahead: log before re-indexing
955            let mut buf = Vec::new();
956            store_enc::encode_add_observations(&mut buf, entity_name, &added)
957                .map_err(MCSError::IoError)?;
958            self.store.write_record(RecordKind::AddObservations, &buf)
959                .map_err(MCSError::IoError)?;
960
961            // Incrementally index only the new observation tokens (P3) — no
962            // full remove + re-index of the whole entity.
963            self.search
964                .index_additional(&mut self.interner, slot, &interned_added);
965        }
966        Ok(added)
967    }
968
969    pub fn delete_entities(&mut self, entity_names: &[String]) -> Result<()> {
970        let mut deleted_names = Vec::new();
971        for name in entity_names {
972            let name_id_opt = self.interner.get_optional(name);
973            if let Some(name_id) = name_id_opt {
974                let hash = self.interner.get_hash(name_id);
975                if let Some(slot) = self.name_table.lookup(hash, name_id)
976                    && let Some(Some(_)) = self.entity_slots.get(slot as usize)
977                {
978                    // Write-ahead: log before mutation
979                    let mut buf = Vec::new();
980                    store_enc::encode_delete_entity(&mut buf, name)
981                        .map_err(MCSError::IoError)?;
982                    self.store.write_record(RecordKind::DeleteEntity, &buf)
983                        .map_err(MCSError::IoError)?;
984
985                    self.entity_slots[slot as usize] = None;
986                    self.free_slots.push(slot);
987                    self.search.remove_entity(slot);
988                    self.name_table.remove(&self.interner, hash, name_id);
989                    deleted_names.push(name.clone());
990                }
991            }
992        }
993        if !deleted_names.is_empty() {
994            // Use a HashSet for O(1) retain checks (P5)
995            let deleted_ids: HashSet<StrId> = deleted_names.iter()
996                .map(|n| self.interner.intern(n))
997                .collect();
998            self.relations
999                .retain(|r| !deleted_ids.contains(&r.from) && !deleted_ids.contains(&r.to));
1000        }
1001        Ok(())
1002    }
1003
1004    pub fn delete_observations(&mut self, entity_name: &str, observations: &[String]) -> Result<()> {
1005        let name_id = self.interner.get_optional(entity_name)
1006            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1007        let hash = self.interner.get_hash(name_id);
1008        let slot = self
1009            .name_table
1010            .lookup(hash, name_id)
1011            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1012        let stored = self
1013            .entity_slots
1014            .get_mut(slot as usize)
1015            .and_then(|e| e.as_mut())
1016            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1017        let remove_ids: HashSet<StrId> = observations.iter().map(|o| self.interner.intern(o)).collect();
1018        stored.observations.retain(|o| !remove_ids.contains(o));
1019        // Write-ahead: log before re-indexing
1020        let mut buf = Vec::new();
1021        store_enc::encode_delete_observations(&mut buf, entity_name, observations)
1022            .map_err(MCSError::IoError)?;
1023        self.store.write_record(RecordKind::DeleteObservations, &buf)
1024            .map_err(MCSError::IoError)?;
1025
1026        self.search.remove_entity(slot);
1027        self.search
1028            .index_entity(&mut self.interner, slot, stored.name, stored.entity_type, &stored.observations);
1029        Ok(())
1030    }
1031
1032    pub fn delete_relations(&mut self, relations: &[Relation]) -> Result<()> {
1033        // Collect targets into a HashSet for O(1) retain checks (P5)
1034        let rels: HashSet<(StrId, StrId, StrId)> = relations
1035            .iter()
1036            .map(|r| {
1037                (
1038                    self.interner.intern(&r.from),
1039                    self.interner.intern(&r.to),
1040                    self.interner.intern(&r.relation_type),
1041                )
1042            })
1043            .collect();
1044        self.relations
1045            .retain(|r| !rels.contains(&(r.from, r.to, r.relation_type)));
1046        for relation in relations {
1047            let mut buf = Vec::new();
1048            store_enc::encode_delete_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
1049                .map_err(MCSError::IoError)?;
1050            self.store.write_record(RecordKind::DeleteRelation, &buf)
1051                .map_err(MCSError::IoError)?;
1052        }
1053        Ok(())
1054    }
1055
1056    pub fn read_graph(&self) -> KnowledgeGraphOut {
1057        self.read_graph_view().to_owned_out()
1058    }
1059
1060    /// Borrowing, allocation-light view of the full graph (M6). Serializing it
1061    /// streams interned `&str` directly instead of materializing a `String`
1062    /// per name/type/observation.
1063    pub fn read_graph_view(&self) -> GraphView<'_> {
1064        let entities: Vec<&StoredEntity> = self
1065            .entity_slots
1066            .iter()
1067            .filter_map(|s| s.as_ref().filter(|e| e.is_live()))
1068            .collect();
1069        let relations: Vec<&StoredRelation> = self.relations.iter().collect();
1070        GraphView { kg: self, entities, relations }
1071    }
1072
1073    /// Relevance-ranked substring search returning all matches (no pagination).
1074    /// Equivalent to `search_nodes_filtered(query, None, 0, usize::MAX)`.
1075    pub fn search_nodes(&self, query: &str) -> KnowledgeGraphOut {
1076        self.search_nodes_filtered(query, None, 0, usize::MAX)
1077    }
1078
1079    pub fn open_nodes(&self, names: &[String]) -> KnowledgeGraphOut {
1080        self.open_nodes_view(names).to_owned_out()
1081    }
1082
1083    /// Borrowing view variant of [`open_nodes`] (M6).
1084    pub fn open_nodes_view(&self, names: &[String]) -> GraphView<'_> {
1085        let name_ids: HashSet<StrId> = names.iter()
1086            .filter_map(|n| self.interner.get_optional(n))
1087            .collect();
1088        let entities: Vec<&StoredEntity> = self
1089            .entity_slots
1090            .iter()
1091            .filter_map(|s| {
1092                s.as_ref()
1093                    .filter(|stored| stored.is_live() && name_ids.contains(&stored.name))
1094            })
1095            .collect();
1096        let matched_names: HashSet<StrId> = entities.iter().map(|e| e.name).collect();
1097        let relations: Vec<&StoredRelation> = self
1098            .relations
1099            .iter()
1100            .filter(|r| matched_names.contains(&r.from) || matched_names.contains(&r.to))
1101            .collect();
1102        GraphView { kg: self, entities, relations }
1103    }
1104
1105    // -----------------------------------------------------------------------
1106    // Internal helpers
1107    // -----------------------------------------------------------------------
1108
1109    fn entity_to_output(&self, stored: &StoredEntity) -> Entity {
1110        Entity {
1111            name: self.interner.lookup(stored.name).to_string(),
1112            entity_type: self.interner.lookup(stored.entity_type).to_string(),
1113            observations: stored
1114                .observations
1115                .iter()
1116                .map(|o| self.interner.lookup(*o).to_string())
1117                .collect(),
1118        }
1119    }
1120
1121    #[inline]
1122    fn relation_to_output(&self, r: &StoredRelation) -> Relation {
1123        Relation {
1124            from: self.interner.lookup(r.from).to_string(),
1125            to: self.interner.lookup(r.to).to_string(),
1126            relation_type: self.interner.lookup(r.relation_type).to_string(),
1127        }
1128    }
1129
1130    /// Resolve a name to its live entity slot, or `None` if absent/deleted.
1131    fn lookup_live_slot(&self, name: &str) -> Option<u32> {
1132        let name_id = self.interner.get_optional(name)?;
1133        let hash = self.interner.get_hash(name_id);
1134        let slot = self.name_table.lookup(hash, name_id)?;
1135        let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
1136        stored.is_live().then_some(slot)
1137    }
1138
1139    /// Materialize a live entity from its interned name id.
1140    fn entity_by_name_id(&self, name_id: StrId) -> Option<Entity> {
1141        let hash = self.interner.get_hash(name_id);
1142        let slot = self.name_table.lookup(hash, name_id)?;
1143        let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
1144        stored.is_live().then(|| self.entity_to_output(stored))
1145    }
1146
1147    /// Tally distinct entity types and their live-entity counts, ranked by
1148    /// count descending (ties broken by name). One linear pass over the dense
1149    /// slot vec; only the final names are allocated.
1150    pub fn entity_type_counts(&self) -> Vec<(String, usize)> {
1151        let mut counts: HashMap<StrId, usize> = HashMap::new();
1152        for st in self
1153            .entity_slots
1154            .iter()
1155            .filter_map(|s| s.as_ref())
1156            .filter(|e| e.is_live())
1157        {
1158            *counts.entry(st.entity_type).or_insert(0) += 1;
1159        }
1160        self.rank_counts(counts)
1161    }
1162
1163    /// Tally distinct relation types and their counts, ranked by count desc.
1164    pub fn relation_type_counts(&self) -> Vec<(String, usize)> {
1165        let mut counts: HashMap<StrId, usize> = HashMap::new();
1166        for r in &self.relations {
1167            *counts.entry(r.relation_type).or_insert(0) += 1;
1168        }
1169        self.rank_counts(counts)
1170    }
1171
1172    fn rank_counts(&self, counts: HashMap<StrId, usize>) -> Vec<(String, usize)> {
1173        let mut out: Vec<(String, usize)> = counts
1174            .into_iter()
1175            .map(|(id, c)| (self.interner.lookup(id).to_string(), c))
1176            .collect();
1177        out.sort_unstable_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
1178        out
1179    }
1180
1181    /// Relevance-ranked, optionally type-filtered, paginated node search.
1182    /// Entities come back best-match-first (see [`SearchIndex::search_ranked`]).
1183    /// Relations touching any returned entity (either endpoint) are included.
1184    pub fn search_nodes_filtered(
1185        &self,
1186        query: &str,
1187        entity_type: Option<&str>,
1188        offset: usize,
1189        limit: usize,
1190    ) -> KnowledgeGraphOut {
1191        self.search_nodes_view(query, entity_type, offset, limit).to_owned_out()
1192    }
1193
1194    /// Borrowing view variant of [`search_nodes_filtered`] (M6).
1195    pub fn search_nodes_view(
1196        &self,
1197        query: &str,
1198        entity_type: Option<&str>,
1199        offset: usize,
1200        limit: usize,
1201    ) -> GraphView<'_> {
1202        let type_id = match entity_type {
1203            Some(t) => match self.interner.get_optional(t) {
1204                Some(id) => Some(id),
1205                None => return GraphView { kg: self, entities: Vec::new(), relations: Vec::new() },
1206            },
1207            None => None,
1208        };
1209
1210        let ranked = self.search.search_ranked(query, &self.interner);
1211        let mut selected: HashSet<StrId> = HashSet::new();
1212        let mut entities: Vec<&StoredEntity> = Vec::new();
1213        let mut skipped = 0usize;
1214        for (slot, _score) in ranked {
1215            let Some(st) = self
1216                .entity_slots
1217                .get(slot as usize)
1218                .and_then(|s| s.as_ref())
1219                .filter(|e| e.is_live())
1220            else {
1221                continue;
1222            };
1223            if type_id.is_some_and(|tid| st.entity_type != tid) {
1224                continue;
1225            }
1226            if skipped < offset {
1227                skipped += 1;
1228                continue;
1229            }
1230            if entities.len() >= limit {
1231                break;
1232            }
1233            selected.insert(st.name);
1234            entities.push(st);
1235        }
1236
1237        let relations: Vec<&StoredRelation> = self
1238            .relations
1239            .iter()
1240            .filter(|r| selected.contains(&r.from) || selected.contains(&r.to))
1241            .collect();
1242        GraphView { kg: self, entities, relations }
1243    }
1244
1245    /// Type-filtered, paginated view of the whole graph. Unlike [`read_graph`],
1246    /// relations are restricted to those whose **both** endpoints fall in the
1247    /// returned entity page, so the slice is internally consistent.
1248    pub fn read_graph_filtered(
1249        &self,
1250        entity_type: Option<&str>,
1251        offset: usize,
1252        limit: usize,
1253    ) -> KnowledgeGraphOut {
1254        self.read_graph_filtered_view(entity_type, offset, limit).to_owned_out()
1255    }
1256
1257    /// Borrowing view variant of [`read_graph_filtered`] (M6).
1258    pub fn read_graph_filtered_view(
1259        &self,
1260        entity_type: Option<&str>,
1261        offset: usize,
1262        limit: usize,
1263    ) -> GraphView<'_> {
1264        let type_id = match entity_type {
1265            Some(t) => match self.interner.get_optional(t) {
1266                Some(id) => Some(id),
1267                None => return GraphView { kg: self, entities: Vec::new(), relations: Vec::new() },
1268            },
1269            None => None,
1270        };
1271
1272        let mut selected: HashSet<StrId> = HashSet::new();
1273        let mut entities: Vec<&StoredEntity> = Vec::new();
1274        let mut skipped = 0usize;
1275        for st in self
1276            .entity_slots
1277            .iter()
1278            .filter_map(|s| s.as_ref())
1279            .filter(|e| e.is_live())
1280        {
1281            if type_id.is_some_and(|tid| st.entity_type != tid) {
1282                continue;
1283            }
1284            if skipped < offset {
1285                skipped += 1;
1286                continue;
1287            }
1288            if entities.len() >= limit {
1289                break;
1290            }
1291            selected.insert(st.name);
1292            entities.push(st);
1293        }
1294
1295        let relations: Vec<&StoredRelation> = self
1296            .relations
1297            .iter()
1298            .filter(|r| selected.contains(&r.from) && selected.contains(&r.to))
1299            .collect();
1300        GraphView { kg: self, entities, relations }
1301    }
1302
1303    /// Neighborhood expansion around `name` out to `depth` hops, following
1304    /// edges in the requested [`Direction`] and (optionally) of one relation
1305    /// type. Returns the origin plus reached entities, and every relation
1306    /// (passing the type filter) whose endpoints are both inside that set.
1307    ///
1308    /// `depth == 1` (the common case) is a single linear pass over the flat
1309    /// relation vec; deeper queries build an adjacency map once (O(E)) and BFS.
1310    pub fn neighbors(
1311        &self,
1312        name: &str,
1313        direction: Direction,
1314        rtype: Option<&str>,
1315        depth: u32,
1316    ) -> Result<KnowledgeGraphOut> {
1317        self.lookup_live_slot(name)
1318            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
1319        // Safe: lookup_live_slot succeeded, so the name is interned.
1320        let start = self.interner.get_optional(name).unwrap();
1321
1322        // An unknown relation-type filter can match nothing: return just origin.
1323        let rtype_id = match rtype {
1324            Some(r) => match self.interner.get_optional(r) {
1325                Some(id) => Some(id),
1326                None => {
1327                    let entities = self.entity_by_name_id(start).into_iter().collect();
1328                    return Ok(KnowledgeGraphOut { entities, relations: Vec::new() });
1329                }
1330            },
1331            None => None,
1332        };
1333
1334        let mut visited: HashSet<StrId> = HashSet::new();
1335        visited.insert(start);
1336
1337        let type_ok = |r: &StoredRelation| rtype_id.is_none_or(|rt| r.relation_type == rt);
1338
1339        if depth == 1 {
1340            for r in self.relations.iter().filter(|r| type_ok(r)) {
1341                match direction {
1342                    Direction::Out => {
1343                        if r.from == start {
1344                            visited.insert(r.to);
1345                        }
1346                    }
1347                    Direction::In => {
1348                        if r.to == start {
1349                            visited.insert(r.from);
1350                        }
1351                    }
1352                    Direction::Both => {
1353                        if r.from == start {
1354                            visited.insert(r.to);
1355                        } else if r.to == start {
1356                            visited.insert(r.from);
1357                        }
1358                    }
1359                }
1360            }
1361        } else if depth >= 2 {
1362            // Build a direction-aware adjacency map once, then BFS.
1363            let mut adj: HashMap<StrId, Vec<StrId>> = HashMap::new();
1364            for r in self.relations.iter().filter(|r| type_ok(r)) {
1365                match direction {
1366                    Direction::Out => adj.entry(r.from).or_default().push(r.to),
1367                    Direction::In => adj.entry(r.to).or_default().push(r.from),
1368                    Direction::Both => {
1369                        adj.entry(r.from).or_default().push(r.to);
1370                        adj.entry(r.to).or_default().push(r.from);
1371                    }
1372                }
1373            }
1374            let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
1375            queue.push_back((start, 0));
1376            while let Some((node, d)) = queue.pop_front() {
1377                if d >= depth {
1378                    continue;
1379                }
1380                if let Some(nbrs) = adj.get(&node) {
1381                    for &nb in nbrs {
1382                        if visited.insert(nb) {
1383                            queue.push_back((nb, d + 1));
1384                        }
1385                    }
1386                }
1387            }
1388        }
1389
1390        let mut entities = Vec::with_capacity(visited.len());
1391        for &nid in &visited {
1392            if let Some(e) = self.entity_by_name_id(nid) {
1393                entities.push(e);
1394            }
1395        }
1396        let relations = self
1397            .relations
1398            .iter()
1399            .filter(|r| type_ok(r) && visited.contains(&r.from) && visited.contains(&r.to))
1400            .map(|r| self.relation_to_output(r))
1401            .collect();
1402        Ok(KnowledgeGraphOut { entities, relations })
1403    }
1404
1405    /// One-shot context bundle for a single entity: the entity itself, every
1406    /// incident relation, its distinct neighbor names, and its degree. Saves an
1407    /// agent the get_entity + two search_relations round-trips.
1408    pub fn describe_entity(&self, name: &str) -> Result<serde_json::Value> {
1409        let name_id = self
1410            .interner
1411            .get_optional(name)
1412            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
1413        let entity = self
1414            .entity_by_name_id(name_id)
1415            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
1416
1417        let mut incident: Vec<Relation> = Vec::new();
1418        let mut neighbor_seen: HashSet<StrId> = HashSet::new();
1419        let mut neighbors: Vec<&str> = Vec::new();
1420        for r in &self.relations {
1421            if r.from == name_id || r.to == name_id {
1422                incident.push(self.relation_to_output(r));
1423                let other = if r.from == name_id { r.to } else { r.from };
1424                if other != name_id && neighbor_seen.insert(other) {
1425                    neighbors.push(self.interner.lookup(other));
1426                }
1427            }
1428        }
1429
1430        Ok(serde_json::json!({
1431            "entity": entity,
1432            "relations": incident,
1433            "neighbors": neighbors,
1434            "degree": incident.len(),
1435        }))
1436    }
1437
1438    /// Create-or-merge a batch of entities idempotently. Missing entities are
1439    /// created; existing ones keep their type and gain any new observations
1440    /// (deduplicated). Returns a per-entity outcome. The caller is responsible
1441    /// for flushing — every underlying op is already write-ahead logged.
1442    pub fn upsert_entities(&mut self, entities: &[Entity]) -> Result<Vec<serde_json::Value>> {
1443        for e in entities {
1444            if e.name.is_empty() {
1445                return Err(MCSError::InvalidParams(
1446                    "Entity name must not be empty".into(),
1447                ));
1448            }
1449        }
1450        let mut out = Vec::with_capacity(entities.len());
1451        for e in entities {
1452            if self.lookup_live_slot(&e.name).is_some() {
1453                let added = self.add_observations(&e.name, &e.observations)?;
1454                out.push(serde_json::json!({
1455                    "name": e.name,
1456                    "created": false,
1457                    "addedObservations": added,
1458                }));
1459            } else {
1460                let created = self.create_entities(std::slice::from_ref(e))?;
1461                out.push(serde_json::json!({
1462                    "name": e.name,
1463                    "created": !created.is_empty(),
1464                    "addedObservations": e.observations,
1465                }));
1466            }
1467        }
1468        Ok(out)
1469    }
1470
1471    /// Serialize the graph in one of: `json` (read_graph), `mermaid`, `dot`.
1472    pub fn export(&self, format: &str) -> Result<String> {
1473        match format {
1474            "json" => serde_json::to_string(&self.read_graph()).map_err(MCSError::JsonError),
1475            "mermaid" => Ok(self.export_mermaid()),
1476            "dot" => Ok(self.export_dot()),
1477            other => Err(MCSError::InvalidParams(format!(
1478                "Unknown export format '{other}' (expected json|mermaid|dot)"
1479            ))),
1480        }
1481    }
1482
1483    /// Assign each live entity a stable `n{k}` node id for diagram output.
1484    fn diagram_node_ids(&self) -> (HashMap<StrId, usize>, Vec<(usize, StrId)>) {
1485        let mut ids: HashMap<StrId, usize> = HashMap::new();
1486        let mut order: Vec<(usize, StrId)> = Vec::new();
1487        for st in self
1488            .entity_slots
1489            .iter()
1490            .filter_map(|s| s.as_ref())
1491            .filter(|e| e.is_live())
1492        {
1493            let n = ids.len();
1494            ids.insert(st.name, n);
1495            order.push((n, st.name));
1496        }
1497        (ids, order)
1498    }
1499
1500    fn export_mermaid(&self) -> String {
1501        let (ids, order) = self.diagram_node_ids();
1502        let mut s = String::with_capacity(64 + order.len() * 32 + self.relations.len() * 32);
1503        s.push_str("graph LR\n");
1504        for (n, name_id) in &order {
1505            let label = sanitize_label(self.interner.lookup(*name_id));
1506            s.push_str(&format!("  n{n}[\"{label}\"]\n"));
1507        }
1508        for r in &self.relations {
1509            if let (Some(&a), Some(&b)) = (ids.get(&r.from), ids.get(&r.to)) {
1510                let rel = sanitize_label(self.interner.lookup(r.relation_type));
1511                s.push_str(&format!("  n{a} -->|{rel}| n{b}\n"));
1512            }
1513        }
1514        s
1515    }
1516
1517    fn export_dot(&self) -> String {
1518        let (ids, order) = self.diagram_node_ids();
1519        let mut s = String::with_capacity(64 + order.len() * 32 + self.relations.len() * 32);
1520        s.push_str("digraph G {\n");
1521        for (n, name_id) in &order {
1522            let label = sanitize_label(self.interner.lookup(*name_id));
1523            s.push_str(&format!("  n{n} [label=\"{label}\"];\n"));
1524        }
1525        for r in &self.relations {
1526            if let (Some(&a), Some(&b)) = (ids.get(&r.from), ids.get(&r.to)) {
1527                let rel = sanitize_label(self.interner.lookup(r.relation_type));
1528                s.push_str(&format!("  n{a} -> n{b} [label=\"{rel}\"];\n"));
1529            }
1530        }
1531        s.push_str("}\n");
1532        s
1533    }
1534
1535    // --- Flush & sync ---
1536
1537    /// Flush and fsync the log to stable storage.
1538    pub fn flush_and_sync(&mut self) -> Result<()> {
1539        self.store.flush_and_sync().map_err(MCSError::IoError)
1540    }
1541}