Skip to main content

mcp_memory/
kg.rs

1use std::collections::VecDeque;
2
3use ahash::{AHashMap, AHashSet};
4use std::path::Path;
5
6use serde::ser::{Serialize, SerializeSeq, SerializeStruct, Serializer};
7
8use crate::errors::{MCSError, Result};
9use crate::intern::{StrId, StringInterner};
10use crate::types::{Entity, Relation, KnowledgeGraphOut};
11use crate::search::SearchIndex;
12use crate::store::{self as store_enc, BinaryStore, RecordKind};
13
14const ENTITY_SLOT_LIVE: u8 = 1;
15const NAME_TABLE_SHARDS: usize = 4;
16
17// ---------------------------------------------------------------------------
18// Prefetch helper – issues a non-binding software prefetch hint to pull a
19// cache-line into L1/L2 while we finish probing the current entry.
20// ---------------------------------------------------------------------------
21#[cfg(target_arch = "x86_64")]
22#[inline(always)]
23unsafe fn prefetch_addr(addr: *const u8) {
24    // _MM_HINT_T0 = 3  (temporal prefetch to all cache levels)
25    std::arch::x86_64::_mm_prefetch::<3>(addr);
26}
27
28#[cfg(not(target_arch = "x86_64"))]
29#[inline(always)]
30const unsafe fn prefetch_addr(_addr: *const u8) {}
31
32/// fsync the directory containing `path` so a rename/create inside it is durable.
33/// On platforms where a directory cannot be opened/synced this is a no-op.
34fn sync_parent_dir(path: &Path) -> std::io::Result<()> {
35    let dir = path.parent().filter(|p| !p.as_os_str().is_empty());
36    let dir = match dir {
37        Some(d) => d,
38        None => Path::new("."),
39    };
40    match std::fs::File::open(dir) {
41        Ok(f) => match f.sync_all() {
42            Ok(()) => Ok(()),
43            // Some filesystems disallow fsync on a directory handle; tolerate it.
44            Err(e) if e.kind() == std::io::ErrorKind::InvalidInput => Ok(()),
45            Err(e) => Err(e),
46        },
47        Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
48        Err(e) => Err(e),
49    }
50}
51
52// ---------------------------------------------------------------------------
53// StoredEntity / StoredRelation – internal representations using StrId.
54// ---------------------------------------------------------------------------
55// Default layout: 40 B / align 8 (Rust packs `state` into the Vec's padding and
56// the `Option` niche is free). With `cache_align`, the slot is rounded to a full
57// 64-byte line so a point lookup/mutation (name_table -> slot index) touches
58// exactly one cache line instead of occasionally straddling two. Costs +60%
59// memory and a wider stride on bulk scans — measure before enabling.
60#[cfg_attr(feature = "cache_align", repr(align(64)))]
61struct StoredEntity {
62    state: u8,
63    name: StrId,
64    entity_type: StrId,
65    observations: Vec<StrId>,
66}
67
68impl StoredEntity {
69    const fn is_live(&self) -> bool {
70        self.state == ENTITY_SLOT_LIVE
71    }
72}
73
74// Default layout: 12 B / align 4 → ~1 in 5 records straddles a 64-byte line.
75// With `cache_align`, align(16) rounds the size to 16 B so 4 records fill a line
76// exactly (no straddle, AVX2-load-friendly) for +33% memory.
77#[cfg_attr(feature = "cache_align", repr(align(16)))]
78struct StoredRelation {
79    from: StrId,
80    to: StrId,
81    relation_type: StrId,
82}
83
84// ---------------------------------------------------------------------------
85// Borrowing serialization views (M6).
86//
87// Read tools used to build owned `Entity`/`Relation` vecs (a fresh `String`
88// per name/type/observation) and *then* serialize them — roughly 2-3x the
89// graph resident at once. These views instead hold references to the selected
90// stored records and emit their interned `&str` directly during
91// serialization, with no intermediate owned strings. The emitted JSON is
92// byte-for-byte identical to serializing `KnowledgeGraphOut`.
93// ---------------------------------------------------------------------------
94
95/// A borrowing view over a selected slice of the graph. Serializes to
96/// `{"entities":[...],"relations":[...]}`.
97pub struct GraphView<'a> {
98    kg: &'a KnowledgeGraph,
99    entities: Vec<&'a StoredEntity>,
100    relations: Vec<&'a StoredRelation>,
101}
102
103impl GraphView<'_> {
104    /// Materialize into the owned [`KnowledgeGraphOut`]. Used by the direct
105    /// (non-serializing) callers and tests; the server's read handlers
106    /// serialize the view directly instead.
107    pub fn to_owned_out(&self) -> KnowledgeGraphOut {
108        KnowledgeGraphOut {
109            entities: self.entities.iter().map(|e| self.kg.entity_to_output(e)).collect(),
110            relations: self.relations.iter().map(|r| self.kg.relation_to_output(r)).collect(),
111        }
112    }
113}
114
115impl Serialize for GraphView<'_> {
116    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
117        let mut st = s.serialize_struct("KnowledgeGraphOut", 2)?;
118        st.serialize_field("entities", &EntityListRef { kg: self.kg, items: &self.entities })?;
119        st.serialize_field("relations", &RelationListRef { kg: self.kg, items: &self.relations })?;
120        st.end()
121    }
122}
123
124struct EntityListRef<'a> { kg: &'a KnowledgeGraph, items: &'a [&'a StoredEntity] }
125impl Serialize for EntityListRef<'_> {
126    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
127        let mut seq = s.serialize_seq(Some(self.items.len()))?;
128        for &e in self.items {
129            seq.serialize_element(&EntityRef { kg: self.kg, e })?;
130        }
131        seq.end()
132    }
133}
134
135struct RelationListRef<'a> { kg: &'a KnowledgeGraph, items: &'a [&'a StoredRelation] }
136impl Serialize for RelationListRef<'_> {
137    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
138        let mut seq = s.serialize_seq(Some(self.items.len()))?;
139        for &r in self.items {
140            seq.serialize_element(&RelationRef { kg: self.kg, r })?;
141        }
142        seq.end()
143    }
144}
145
146struct EntityRef<'a> { kg: &'a KnowledgeGraph, e: &'a StoredEntity }
147impl Serialize for EntityRef<'_> {
148    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
149        let mut st = s.serialize_struct("Entity", 3)?;
150        st.serialize_field("name", self.kg.interner.lookup(self.e.name))?;
151        st.serialize_field("entityType", self.kg.interner.lookup(self.e.entity_type))?;
152        st.serialize_field("observations", &ObsRef { kg: self.kg, obs: &self.e.observations })?;
153        st.end()
154    }
155}
156
157struct ObsRef<'a> { kg: &'a KnowledgeGraph, obs: &'a [StrId] }
158impl Serialize for ObsRef<'_> {
159    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
160        let mut seq = s.serialize_seq(Some(self.obs.len()))?;
161        for &o in self.obs {
162            seq.serialize_element(self.kg.interner.lookup(o))?;
163        }
164        seq.end()
165    }
166}
167
168struct RelationRef<'a> { kg: &'a KnowledgeGraph, r: &'a StoredRelation }
169impl Serialize for RelationRef<'_> {
170    fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
171        let mut st = s.serialize_struct("Relation", 3)?;
172        st.serialize_field("from", self.kg.interner.lookup(self.r.from))?;
173        st.serialize_field("to", self.kg.interner.lookup(self.r.to))?;
174        st.serialize_field("relationType", self.kg.interner.lookup(self.r.relation_type))?;
175        st.end()
176    }
177}
178
179/// Edge-following direction for neighborhood queries.
180#[derive(Clone, Copy, PartialEq, Eq, Debug)]
181pub enum Direction {
182    /// Follow `from -> to` (outgoing edges).
183    Out,
184    /// Follow `to -> from` (incoming edges).
185    In,
186    /// Follow edges regardless of orientation.
187    Both,
188}
189
190impl Direction {
191    /// Parse a direction string; anything other than `"out"`/`"in"` is `Both`.
192    pub fn parse(s: Option<&str>) -> Self {
193        match s {
194            Some("out") => Direction::Out,
195            Some("in") => Direction::In,
196            _ => Direction::Both,
197        }
198    }
199}
200
201/// Escape a string for embedding inside a Mermaid/DOT quoted label.
202fn sanitize_label(s: &str) -> String {
203    let mut out = String::with_capacity(s.len());
204    for c in s.chars() {
205        match c {
206            '"' => out.push('\''),
207            '\n' | '\r' => out.push(' '),
208            _ => out.push(c),
209        }
210    }
211    out
212}
213
214// ---------------------------------------------------------------------------
215// ShardedNameTable – open-addressing hash map split into N independent shards.
216//
217// Each shard uses **ctrl-byte bucket** approach: a 1-byte metadata array
218// stores the 7-bit hash stamp (h2) for each slot, with `0xFF` = EMPTY.
219// On probe, the first memory access is a single byte (ctrl). The full key
220// (StrId) is only compared when the stamp matches — ~127/128 of probe steps
221// touch nothing but the ctrl byte.  See also SwissTable / hashbrown.
222// ---------------------------------------------------------------------------
223const EMPTY_SLOT: u8 = 0xFF;
224
225#[inline(always)]
226const fn h2(hash: u64) -> u8 {
227    (hash & 0x7F) as u8
228}
229
230#[inline(always)]
231const fn h1(hash: u64, mask: usize) -> usize {
232    ((hash >> 7) as usize) & mask
233}
234
235struct NameTableShard {
236    ctrl: Vec<u8>,      // 0xFF = empty; 0x00-0x7F = h2 stamp (bit 7 always clear)
237    names: Vec<StrId>,
238    slots: Vec<u32>,
239    mask: usize,
240    count: usize,
241}
242
243impl NameTableShard {
244    fn new(capacity: usize) -> Self {
245        let cap = capacity.next_power_of_two().max(16);
246        Self {
247            ctrl: vec![EMPTY_SLOT; cap],
248            names: vec![StrId::EMPTY; cap],
249            slots: vec![u32::MAX; cap],
250            mask: cap - 1,
251            count: 0,
252        }
253    }
254
255    #[inline(always)]
256    fn lookup(&self, hash: u64, name: StrId) -> Option<u32> {
257        let stamp = h2(hash);
258        let mask = self.mask;
259        let mut idx = h1(hash, mask);
260        let ctrl = self.ctrl.as_ptr();
261        let names = self.names.as_ptr();
262        let slots = self.slots.as_ptr();
263        let len = self.ctrl.len();
264
265        for _ in 0..len {
266            // Prefetch the ctrl byte 4 slots ahead — overlaps memory latency.
267            let prefetch_idx = idx.wrapping_add(4) & mask;
268            unsafe { prefetch_addr(ctrl.add(prefetch_idx)) };
269
270            // SAFETY: idx always < len because of &mask on each iteration.
271            unsafe {
272                let c = *ctrl.add(idx);
273                // Bit 7 set → EMPTY → key not present.
274                if c & 0x80 != 0 {
275                    return None;
276                }
277                // Stamp match → compare full key (rare: ~1/128 probes).
278                if c == stamp && *names.add(idx) == name {
279                    return Some(*slots.add(idx));
280                }
281            }
282            idx = (idx + 1) & mask;
283        }
284        None
285    }
286
287    fn insert(&mut self, interner: &StringInterner, hash: u64, name: StrId, slot: u32) {
288        if self.count * 4 > self.ctrl.len() * 3 {
289            self.grow(interner);
290        }
291        let stamp = h2(hash);
292        let mask = self.mask;
293        let mut idx = h1(hash, mask);
294        loop {
295            // SAFETY: idx & mask always < len for power-of-two capacity.
296            unsafe {
297                if *self.ctrl.get_unchecked(idx) & 0x80 != 0 {
298                    *self.ctrl.get_unchecked_mut(idx) = stamp;
299                    *self.names.get_unchecked_mut(idx) = name;
300                    *self.slots.get_unchecked_mut(idx) = slot;
301                    self.count += 1;
302                    return;
303                }
304            }
305            idx = (idx + 1) & mask;
306        }
307    }
308
309    fn remove(&mut self, interner: &StringInterner, hash: u64, name: StrId) {
310        let stamp = h2(hash);
311        let mask = self.mask;
312        let mut idx = h1(hash, mask);
313        let len = self.ctrl.len();
314        for _ in 0..len {
315            if self.ctrl[idx] & 0x80 != 0 {
316                return;
317            }
318            if self.ctrl[idx] == stamp && self.names[idx] == name {
319                // Found — remove with shift-back to preserve probe chains.
320                self.ctrl[idx] = EMPTY_SLOT;
321                self.names[idx] = StrId::EMPTY;
322                self.slots[idx] = u32::MAX;
323                self.count -= 1;
324
325                let mut next = (idx + 1) & mask;
326                while self.ctrl[next] & 0x80 == 0 {
327                    let nn = self.names[next];
328                    let ns = self.slots[next];
329                    // Hash is no longer stored (M4) — recompute it from the
330                    // interned name to find the entry's ideal bucket.
331                    let nh = interner.get_hash(nn);
332                    self.ctrl[next] = EMPTY_SLOT;
333                    self.names[next] = StrId::EMPTY;
334                    self.slots[next] = u32::MAX;
335                    self.count -= 1;
336
337                    // Re-insert at its ideal bucket.
338                    let nstamp = h2(nh);
339                    let mut re_idx = h1(nh, mask);
340                    while self.ctrl[re_idx] & 0x80 == 0 {
341                        re_idx = (re_idx + 1) & mask;
342                    }
343                    self.ctrl[re_idx] = nstamp;
344                    self.names[re_idx] = nn;
345                    self.slots[re_idx] = ns;
346                    self.count += 1;
347
348                    next = (next + 1) & mask;
349                }
350                return;
351            }
352            idx = (idx + 1) & mask;
353        }
354    }
355
356    fn grow(&mut self, interner: &StringInterner) {
357        let new_cap = self.ctrl.len() * 2;
358        let new_mask = new_cap - 1;
359        let mut new_ctrl = vec![EMPTY_SLOT; new_cap];
360        let mut new_names = vec![StrId::EMPTY; new_cap];
361        let mut new_slots = vec![u32::MAX; new_cap];
362
363        for i in 0..self.ctrl.len() {
364            if self.ctrl[i] & 0x80 == 0 {
365                // Recompute the hash from the interned name (M4: not stored).
366                let name = self.names[i];
367                let hash = interner.get_hash(name);
368                let stamp = h2(hash);
369                let mut idx = h1(hash, new_mask);
370                while new_ctrl[idx] & 0x80 == 0 {
371                    idx = (idx + 1) & new_mask;
372                }
373                new_ctrl[idx] = stamp;
374                new_names[idx] = name;
375                new_slots[idx] = self.slots[i];
376            }
377        }
378
379        self.ctrl = new_ctrl;
380        self.names = new_names;
381        self.slots = new_slots;
382        self.mask = new_mask;
383    }
384}
385
386struct ShardedNameTable {
387    shards: [NameTableShard; NAME_TABLE_SHARDS],
388}
389
390impl ShardedNameTable {
391    fn new(capacity_per_shard: usize) -> Self {
392        Self {
393            shards: [
394                NameTableShard::new(capacity_per_shard),
395                NameTableShard::new(capacity_per_shard),
396                NameTableShard::new(capacity_per_shard),
397                NameTableShard::new(capacity_per_shard),
398            ],
399        }
400    }
401
402    #[inline(always)]
403    const fn shard(hash: u64) -> usize {
404        (hash as usize) & (NAME_TABLE_SHARDS - 1)
405    }
406
407    #[inline(always)]
408    fn lookup(&self, hash: u64, name: StrId) -> Option<u32> {
409        self.shards[Self::shard(hash)].lookup(hash, name)
410    }
411
412    #[inline(always)]
413    fn insert(&mut self, interner: &StringInterner, hash: u64, name: StrId, slot: u32) {
414        self.shards[Self::shard(hash)].insert(interner, hash, name, slot);
415    }
416
417    #[inline(always)]
418    fn remove(&mut self, interner: &StringInterner, hash: u64, name: StrId) {
419        self.shards[Self::shard(hash)].remove(interner, hash, name);
420    }
421}
422
423// ---------------------------------------------------------------------------
424// KnowledgeGraph – the central type.
425// ---------------------------------------------------------------------------
426pub struct KnowledgeGraph {
427    interner: StringInterner,
428    entity_slots: Vec<Option<StoredEntity>>,
429    /// Tombstoned slot indices available for reuse on the next create (M2),
430    /// so create/delete churn doesn't grow `entity_slots` without bound.
431    free_slots: Vec<u32>,
432    name_table: ShardedNameTable,
433    relations: Vec<StoredRelation>,
434    search: SearchIndex,
435    store: BinaryStore,
436}
437
438impl KnowledgeGraph {
439    pub fn new(path: &Path) -> std::io::Result<Self> {
440        let store = BinaryStore::new(path)?;
441
442        // Replay into local collections, then assign into self — no raw pointers needed (X3).
443        let mut interner = StringInterner::with_capacity(65536, 1024);
444        let mut entity_slots: Vec<Option<StoredEntity>> = Vec::with_capacity(256);
445        let mut name_table = ShardedNameTable::new(64);
446        let mut relations: Vec<StoredRelation> = Vec::with_capacity(64);
447        let mut search = SearchIndex::new();
448
449        // Transaction buffer: while `Some`, records are accumulated and only
450        // applied on `TxnCommit`. An unclosed transaction at EOF is discarded,
451        // which is what makes multi-record operations (e.g. `merge_entities`)
452        // crash-atomic.
453        let mut pending: Option<Vec<(RecordKind, Vec<u8>)>> = None;
454        store.replay(|kind, data| {
455            match kind {
456                RecordKind::TxnBegin => pending = Some(Vec::new()),
457                RecordKind::TxnCommit => {
458                    if let Some(buffered) = pending.take() {
459                        for (k, d) in &buffered {
460                            Self::apply_record(
461                                *k, d, &mut interner, &mut entity_slots, &mut search,
462                                &mut name_table, &mut relations,
463                            );
464                        }
465                    }
466                }
467                other => match pending.as_mut() {
468                    Some(buffered) => buffered.push((other, data.to_vec())),
469                    None => Self::apply_record(
470                        other, data, &mut interner, &mut entity_slots, &mut search,
471                        &mut name_table, &mut relations,
472                    ),
473                },
474            }
475        })?;
476
477        // Slots tombstoned by deletes during replay are available for reuse (M2).
478        let free_slots: Vec<u32> = entity_slots
479            .iter()
480            .enumerate()
481            .filter(|(_, s)| s.is_none())
482            .map(|(i, _)| i as u32)
483            .collect();
484
485        Ok(Self {
486            interner,
487            entity_slots,
488            free_slots,
489            name_table,
490            relations,
491            search,
492            store,
493        })
494    }
495
496    // -----------------------------------------------------------------------
497    // Replay helpers (static to avoid borrow issues in the closure)
498    // -----------------------------------------------------------------------
499
500    /// Apply one already-decoded log record to the in-memory collections.
501    /// Shared by direct replay and by transaction commit. `TxnBegin`/`TxnCommit`
502    /// are handled by the caller and are no-ops here.
503    #[allow(clippy::too_many_arguments)]
504    fn apply_record(
505        kind: RecordKind,
506        data: &[u8],
507        interner: &mut StringInterner,
508        entity_slots: &mut Vec<Option<StoredEntity>>,
509        search: &mut SearchIndex,
510        name_table: &mut ShardedNameTable,
511        relations: &mut Vec<StoredRelation>,
512    ) {
513        match kind {
514            RecordKind::CreateEntity => {
515                if let Some((name, etype, obs)) = store_enc::decode_create_entity(data) {
516                    Self::replay_create_entity(
517                        interner, entity_slots, search, name_table, name, etype, &obs,
518                    );
519                }
520            }
521            RecordKind::CreateRelation => {
522                if let Some((from, to, rtype)) = store_enc::decode_create_relation(data) {
523                    let from_id = interner.intern(from);
524                    let to_id = interner.intern(to);
525                    let type_id = interner.intern(rtype);
526                    relations.push(StoredRelation {
527                        from: from_id,
528                        to: to_id,
529                        relation_type: type_id,
530                    });
531                }
532            }
533            RecordKind::AddObservations => {
534                if let Some((name, obs)) = store_enc::decode_add_observations(data) {
535                    Self::replay_add_observations(
536                        interner, entity_slots, search, name_table, name, &obs,
537                    );
538                }
539            }
540            RecordKind::DeleteEntity => {
541                if let Some(name) = store_enc::decode_delete_entity(data) {
542                    Self::replay_delete_entity(
543                        interner, entity_slots, relations, search, name_table, name,
544                    );
545                }
546            }
547            RecordKind::DeleteObservations => {
548                if let Some((name, obs)) = store_enc::decode_delete_observations(data) {
549                    Self::replay_delete_observations(
550                        interner, entity_slots, search, name_table, name, &obs,
551                    );
552                }
553            }
554            RecordKind::DeleteRelation => {
555                if let Some((from, to, rtype)) = store_enc::decode_delete_relation(data) {
556                    let from_id = interner.intern(from);
557                    let to_id = interner.intern(to);
558                    let type_id = interner.intern(rtype);
559                    relations.retain(|r| {
560                        !(r.from == from_id && r.to == to_id && r.relation_type == type_id)
561                    });
562                }
563            }
564            RecordKind::TxnBegin | RecordKind::TxnCommit => {}
565        }
566    }
567
568    #[allow(clippy::ptr_arg)]
569    fn replay_create_entity(
570        interner: &mut StringInterner,
571        entities: &mut Vec<Option<StoredEntity>>,
572        search: &mut SearchIndex,
573        name_table: &mut ShardedNameTable,
574        name: &str,
575        etype: &str,
576        observations: &[&str],
577    ) {
578        let name_id = interner.intern(name);
579        let type_id = interner.intern(etype);
580        let obs_ids: Vec<StrId> = observations.iter().map(|o| interner.intern(o)).collect();
581        let slot = entities.len() as u32;
582        entities.push(Some(StoredEntity {
583            state: ENTITY_SLOT_LIVE,
584            name: name_id,
585            entity_type: type_id,
586            observations: obs_ids.clone(),
587        }));
588        let hash = interner.get_hash(name_id);
589        name_table.insert(&*interner, hash, name_id, slot);
590        search.index_entity(interner, slot, name_id, type_id, &obs_ids);
591    }
592
593    fn replay_add_observations(
594        interner: &mut StringInterner,
595        entities: &mut [Option<StoredEntity>],
596        search: &mut SearchIndex,
597        name_table: &mut ShardedNameTable,
598        name: &str,
599        observations: &[&str],
600    ) {
601        let name_id = interner.intern(name);
602        let hash = interner.get_hash(name_id);
603        if let Some(slot) = name_table.lookup(hash, name_id)
604            && let Some(Some(entity)) = entities.get_mut(slot as usize)
605        {
606            for &o in observations {
607                let oid = interner.intern(o);
608                if !entity.observations.contains(&oid) {
609                    entity.observations.push(oid);
610                }
611            }
612            search.remove_entity(slot);
613            search.index_entity(
614                interner,
615                slot,
616                entity.name,
617                entity.entity_type,
618                &entity.observations,
619            );
620        }
621    }
622
623    fn replay_delete_entity(
624        interner: &mut StringInterner,
625        entities: &mut [Option<StoredEntity>],
626        rels: &mut Vec<StoredRelation>,
627        search: &mut SearchIndex,
628        name_table: &mut ShardedNameTable,
629        name: &str,
630    ) {
631        let name_id = interner.intern(name);
632        let hash = interner.get_hash(name_id);
633        if let Some(slot) = name_table.lookup(hash, name_id)
634            && let Some(Some(_)) = entities.get(slot as usize)
635        {
636            entities[slot as usize] = None;
637            search.remove_entity(slot);
638            name_table.remove(&*interner, hash, name_id);
639        }
640        rels.retain(|r| r.from != name_id && r.to != name_id);
641    }
642
643    fn replay_delete_observations(
644        interner: &mut StringInterner,
645        entities: &mut [Option<StoredEntity>],
646        search: &mut SearchIndex,
647        name_table: &mut ShardedNameTable,
648        name: &str,
649        observations: &[&str],
650    ) {
651        let name_id = interner.intern(name);
652        let hash = interner.get_hash(name_id);
653        if let Some(slot) = name_table.lookup(hash, name_id)
654            && let Some(Some(entity)) = entities.get_mut(slot as usize)
655        {
656            let remove_ids: Vec<StrId> = observations.iter().map(|o| interner.intern(o)).collect();
657            entity.observations.retain(|o| !remove_ids.contains(o));
658            search.remove_entity(slot);
659            search.index_entity(
660                interner,
661                slot,
662                entity.name,
663                entity.entity_type,
664                &entity.observations,
665            );
666        }
667    }
668
669    // -----------------------------------------------------------------------
670    // Public API
671    // -----------------------------------------------------------------------
672
673    pub const fn interner(&self) -> &StringInterner {
674        &self.interner
675    }
676
677    /// Return a single entity by exact name match.
678    pub fn get_entity(&self, name: &str) -> Option<Entity> {
679        let name_id = self.interner.get_optional(name)?;
680        let hash = self.interner.get_hash(name_id);
681        let slot = self.name_table.lookup(hash, name_id)?;
682        let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
683        if !stored.is_live() {
684            return None;
685        }
686        Some(self.entity_to_output(stored))
687    }
688
689    /// Return aggregate statistics about the graph.
690    pub fn graph_stats(&self) -> serde_json::Value {
691        let live_entities = self
692            .entity_slots
693            .iter()
694            .filter(|s| s.as_ref().is_some_and(|e| e.is_live()))
695            .count();
696        let total_relations = self.relations.len();
697        let index_entries = self.search.len();
698        let total_obs: usize = self
699            .entity_slots
700            .iter()
701            .filter_map(|s| s.as_ref())
702            .filter(|e| e.is_live())
703            .map(|e| e.observations.len())
704            .sum();
705
706        serde_json::json!({
707            "entities": live_entities,
708            "relations": total_relations,
709            "totalObservations": total_obs,
710            "searchIndexEntries": index_entries,
711            "internedStrings": self.interner.len(),
712            "internedBytes": self.interner.total_bytes(),
713        })
714    }
715
716    /// Search relations by optional filters: `from`, `to`, `relationType`.
717    /// Any filter that is absent matches everything. A filter value that does
718    /// not exist in the graph returns empty results.
719    pub fn search_relations(&self, from: Option<&str>, to: Option<&str>, rtype: Option<&str>) -> Vec<Relation> {
720        let from_id = match from {
721            Some(f) => match self.interner.get_optional(f) {
722                Some(id) => Some(id),
723                None => return Vec::new(),
724            },
725            None => None,
726        };
727        let to_id = match to {
728            Some(t) => match self.interner.get_optional(t) {
729                Some(id) => Some(id),
730                None => return Vec::new(),
731            },
732            None => None,
733        };
734        let rtype_id = match rtype {
735            Some(r) => match self.interner.get_optional(r) {
736                Some(id) => Some(id),
737                None => return Vec::new(),
738            },
739            None => None,
740        };
741
742        self.relations
743            .iter()
744            .filter(|r| {
745                from_id.is_none_or(|f| r.from == f)
746                    && to_id.is_none_or(|t| r.to == t)
747                    && rtype_id.is_none_or(|rt| r.relation_type == rt)
748            })
749            .map(|r| Relation {
750                from: self.interner.lookup(r.from).to_string(),
751                to: self.interner.lookup(r.to).to_string(),
752                relation_type: self.interner.lookup(r.relation_type).to_string(),
753            })
754            .collect()
755    }
756
757    /// BFS shortest-path between two entity names. Returns the sequence of
758    /// entity names along the path (inclusive of both endpoints).
759    pub fn find_path(&self, from: &str, to: &str) -> Result<Vec<String>> {
760        let from_id = self.interner.get_optional(from)
761            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
762        let to_id = self.interner.get_optional(to)
763            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
764        let hash_from = self.interner.get_hash(from_id);
765        let hash_to = self.interner.get_hash(to_id);
766
767        if self.name_table.lookup(hash_from, from_id).is_none() {
768            return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
769        }
770        if self.name_table.lookup(hash_to, to_id).is_none() {
771            return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
772        }
773        if from_id == to_id {
774            return Ok(vec![from.to_string()]);
775        }
776
777        // Build adjacency list (P4) — O(E) once, not O(V×E).
778        let mut adj: AHashMap<StrId, Vec<(StrId, StrId)>> = AHashMap::new();
779        for rel in &self.relations {
780            adj.entry(rel.from).or_default().push((rel.to, rel.relation_type));
781            adj.entry(rel.to).or_default().push((rel.from, rel.relation_type));
782        }
783
784        // BFS over adjacency list
785        let mut visited: AHashSet<StrId> = AHashSet::new();
786        let mut parent: AHashMap<StrId, StrId> = AHashMap::new();
787        let mut queue: VecDeque<StrId> = VecDeque::new();
788
789        visited.insert(from_id);
790        queue.push_back(from_id);
791
792        while let Some(current) = queue.pop_front() {
793            if current == to_id {
794                break;
795            }
796
797            if let Some(neighbors) = adj.get(&current) {
798                for &(neighbor, _) in neighbors {
799                    if visited.insert(neighbor) {
800                        parent.insert(neighbor, current);
801                        queue.push_back(neighbor);
802                    }
803                }
804            }
805        }
806
807        if !parent.contains_key(&to_id) && from_id != to_id {
808            return Err(MCSError::MemoryError(format!(
809                "No path found between '{from}' and '{to}'"
810            )));
811        }
812
813        // Reconstruct path
814        let mut path: Vec<String> = Vec::new();
815        let mut cur = to_id;
816        loop {
817            path.push(self.interner.lookup(cur).to_string());
818            if cur == from_id {
819                break;
820            }
821            cur = *parent.get(&cur).ok_or_else(|| {
822                MCSError::MemoryError("Path reconstruction failed".into())
823            })?;
824        }
825        path.reverse();
826        Ok(path)
827    }
828
829    /// Rewrite the binary log from the current in-memory state.
830    /// After compaction the log contains only the minimal set of records
831    /// needed to reconstruct the graph (all creates, no deletes).
832    /// Crash-safe: writes to a temp file, then atomically renames (C3).
833    pub fn compact(&mut self) -> Result<()> {
834        // 1. Collect current state as create-records
835        let mut create_entities: Vec<Entity> = Vec::new();
836        let mut create_relations: Vec<Relation> = Vec::new();
837
838        for slot in &self.entity_slots {
839            if let Some(stored) = slot.as_ref().filter(|e| e.is_live()) {
840                create_entities.push(self.entity_to_output(stored));
841            }
842        }
843        for rel in &self.relations {
844            create_relations.push(Relation {
845                from: self.interner.lookup(rel.from).to_string(),
846                to: self.interner.lookup(rel.to).to_string(),
847                relation_type: self.interner.lookup(rel.relation_type).to_string(),
848            });
849        }
850
851        // 2. Write to a temp file first.
852        //    Remove any stale temp left by a previously-interrupted compact:
853        //    `BinaryStore::new` opens in append mode and only writes the MAGIC
854        //    header when the file does not already exist, so appending to a
855        //    leftover temp would produce a duplicated, header-corrupted log
856        //    once renamed over the real one (C1).
857        let tmp_path = self.store.path().with_extension("tmp");
858        if let Err(e) = std::fs::remove_file(&tmp_path)
859            && e.kind() != std::io::ErrorKind::NotFound
860        {
861            return Err(MCSError::IoError(e));
862        }
863        let mut tmp_store = BinaryStore::new(&tmp_path).map_err(MCSError::IoError)?;
864        for entity in &create_entities {
865            let mut buf = Vec::new();
866            store_enc::encode_create_entity(&mut buf, &entity.name, &entity.entity_type, &entity.observations)
867                .map_err(MCSError::IoError)?;
868            tmp_store.write_record(RecordKind::CreateEntity, &buf).map_err(MCSError::IoError)?;
869        }
870        for relation in &create_relations {
871            let mut buf = Vec::new();
872            store_enc::encode_create_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
873                .map_err(MCSError::IoError)?;
874            tmp_store.write_record(RecordKind::CreateRelation, &buf).map_err(MCSError::IoError)?;
875        }
876        tmp_store.flush_and_sync().map_err(MCSError::IoError)?;
877        drop(tmp_store);
878
879        // 3. Atomically rename over the original (atomic on POSIX), then fsync
880        //    the containing directory so the rename itself is durable across a
881        //    crash — content swap is atomic, but the directory entry update is
882        //    not durable until the dir is synced (C2).
883        std::fs::rename(&tmp_path, self.store.path()).map_err(MCSError::IoError)?;
884        sync_parent_dir(self.store.path()).map_err(MCSError::IoError)?;
885
886        // 4. Rebuild the entire in-memory graph from the compacted log (M1/M2).
887        //    Replaying into fresh structures reclaims the interner arena (stale
888        //    strings from deleted/edited entities), tombstoned entity slots, and
889        //    stale search-index entries — none of which the old reopen-only path
890        //    reclaimed.
891        let path = self.store.path().clone();
892        *self = KnowledgeGraph::new(&path).map_err(MCSError::IoError)?;
893
894        Ok(())
895    }
896
897    // ---- Public API with write-ahead log (C1) and error propagation ----
898
899    pub fn create_entities(&mut self, entities: &[Entity]) -> Result<Vec<Entity>> {
900        // Validate up front so an invalid entity never produces partial writes.
901        for entity in entities {
902            if entity.name.is_empty() {
903                return Err(MCSError::InvalidParams(
904                    "Entity name must not be empty".into(),
905                ));
906            }
907        }
908        let mut created = Vec::new();
909        for entity in entities {
910            // Check dedup before writing (using non-interning lookup)
911            let existing = self.interner.get_optional(&entity.name)
912                .and_then(|id| {
913                    let hash = self.interner.get_hash(id);
914                    self.name_table.lookup(hash, id)
915                });
916            if existing.is_some() {
917                continue;
918            }
919            // Write-ahead: encode and log before mutating state
920            let mut buf = Vec::new();
921            store_enc::encode_create_entity(&mut buf, &entity.name, &entity.entity_type, &entity.observations)
922                .map_err(MCSError::IoError)?;
923            self.store.write_record(RecordKind::CreateEntity, &buf)
924                .map_err(MCSError::IoError)?;
925
926            let name_id = self.interner.intern(&entity.name);
927            let hash = self.interner.get_hash(name_id);
928            let type_id = self.interner.intern(&entity.entity_type);
929            let obs_ids: Vec<StrId> = entity
930                .observations
931                .iter()
932                .map(|o| self.interner.intern(o))
933                .collect();
934            // Reuse a tombstoned slot if one is free (M2); its old search-index
935            // entries were cleared on delete, so the slot starts clean.
936            let reused = self.free_slots.pop();
937            let slot = reused.unwrap_or(self.entity_slots.len() as u32);
938            self.search
939                .index_entity(&mut self.interner, slot, name_id, type_id, &obs_ids);
940            let stored = Some(StoredEntity {
941                state: ENTITY_SLOT_LIVE,
942                name: name_id,
943                entity_type: type_id,
944                observations: obs_ids,
945            });
946            match reused {
947                Some(s) => self.entity_slots[s as usize] = stored,
948                None => self.entity_slots.push(stored),
949            }
950            self.name_table.insert(&self.interner, hash, name_id, slot);
951            created.push(Entity {
952                name: entity.name.clone(),
953                entity_type: entity.entity_type.clone(),
954                observations: entity.observations.clone(),
955            });
956        }
957        Ok(created)
958    }
959
960    pub fn create_relations(&mut self, relations: &[Relation]) -> Result<Vec<Relation>> {
961        // Validate up front so an invalid relation never produces partial writes.
962        for relation in relations {
963            if relation.from.is_empty() || relation.to.is_empty() {
964                return Err(MCSError::InvalidParams(
965                    "Relation endpoints must not be empty".into(),
966                ));
967            }
968        }
969        let mut created = Vec::new();
970        // Build a dedup set for O(1) duplicate checks (P5)
971        let mut rel_set: AHashSet<(StrId, StrId, StrId)> = AHashSet::new();
972        for rel in &self.relations {
973            rel_set.insert((rel.from, rel.to, rel.relation_type));
974        }
975        for relation in relations {
976            let from_id = self.interner.intern(&relation.from);
977            let to_id = self.interner.intern(&relation.to);
978            let type_id = self.interner.intern(&relation.relation_type);
979            if !rel_set.insert((from_id, to_id, type_id)) {
980                continue;
981            }
982            // Write-ahead: log before mutation
983            let mut buf = Vec::new();
984            store_enc::encode_create_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
985                .map_err(MCSError::IoError)?;
986            self.store.write_record(RecordKind::CreateRelation, &buf)
987                .map_err(MCSError::IoError)?;
988
989            self.relations.push(StoredRelation {
990                from: from_id,
991                to: to_id,
992                relation_type: type_id,
993            });
994            created.push(Relation {
995                from: relation.from.clone(),
996                to: relation.to.clone(),
997                relation_type: relation.relation_type.clone(),
998            });
999        }
1000        Ok(created)
1001    }
1002
1003    pub fn add_observations(&mut self, entity_name: &str, contents: &[String]) -> Result<Vec<String>> {
1004        let name_id = self.interner.get_optional(entity_name)
1005            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1006        let hash = self.interner.get_hash(name_id);
1007        let slot = self
1008            .name_table
1009            .lookup(hash, name_id)
1010            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1011        // Snapshot the current observations so we can compute the deduplicated
1012        // additions *without* mutating in-memory state yet.
1013        let existing: AHashSet<StrId> = self
1014            .entity_slots
1015            .get(slot as usize)
1016            .and_then(|e| e.as_ref())
1017            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?
1018            .observations
1019            .iter()
1020            .copied()
1021            .collect();
1022
1023        // Deduplicate against existing observations *and* within this batch, so
1024        // the live result matches what replay (which dedups one-by-one) rebuilds.
1025        let mut added = Vec::new();
1026        let mut interned_added = Vec::new();
1027        let mut seen: AHashSet<StrId> = AHashSet::new();
1028        for content in contents {
1029            let cid = self.interner.intern(content);
1030            if existing.contains(&cid) || !seen.insert(cid) {
1031                continue;
1032            }
1033            interned_added.push(cid);
1034            added.push(content.clone());
1035        }
1036        if added.is_empty() {
1037            return Ok(added);
1038        }
1039
1040        // Write-ahead: the record must hit the log *before* any in-memory
1041        // mutation, so a failed write leaves memory and disk in agreement (C3).
1042        let mut buf = Vec::new();
1043        store_enc::encode_add_observations(&mut buf, entity_name, &added)
1044            .map_err(MCSError::IoError)?;
1045        self.store.write_record(RecordKind::AddObservations, &buf)
1046            .map_err(MCSError::IoError)?;
1047
1048        // Logged — now apply to in-memory state.
1049        let stored = self
1050            .entity_slots
1051            .get_mut(slot as usize)
1052            .and_then(|e| e.as_mut())
1053            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1054        stored.observations.extend_from_slice(&interned_added);
1055
1056        // Incrementally index only the new observation tokens (P3) — no
1057        // full remove + re-index of the whole entity.
1058        self.search
1059            .index_additional(&mut self.interner, slot, &interned_added);
1060        Ok(added)
1061    }
1062
1063    pub fn delete_entities(&mut self, entity_names: &[String]) -> Result<()> {
1064        let mut deleted_names = Vec::new();
1065        for name in entity_names {
1066            let name_id_opt = self.interner.get_optional(name);
1067            if let Some(name_id) = name_id_opt {
1068                let hash = self.interner.get_hash(name_id);
1069                if let Some(slot) = self.name_table.lookup(hash, name_id)
1070                    && let Some(Some(_)) = self.entity_slots.get(slot as usize)
1071                {
1072                    // Write-ahead: log before mutation
1073                    let mut buf = Vec::new();
1074                    store_enc::encode_delete_entity(&mut buf, name)
1075                        .map_err(MCSError::IoError)?;
1076                    self.store.write_record(RecordKind::DeleteEntity, &buf)
1077                        .map_err(MCSError::IoError)?;
1078
1079                    self.entity_slots[slot as usize] = None;
1080                    self.free_slots.push(slot);
1081                    self.search.remove_entity(slot);
1082                    self.name_table.remove(&self.interner, hash, name_id);
1083                    deleted_names.push(name.clone());
1084                }
1085            }
1086        }
1087        if !deleted_names.is_empty() {
1088            // Use a AHashSet for O(1) retain checks (P5)
1089            let deleted_ids: AHashSet<StrId> = deleted_names.iter()
1090                .map(|n| self.interner.intern(n))
1091                .collect();
1092            self.relations
1093                .retain(|r| !deleted_ids.contains(&r.from) && !deleted_ids.contains(&r.to));
1094        }
1095        Ok(())
1096    }
1097
1098    pub fn delete_observations(&mut self, entity_name: &str, observations: &[String]) -> Result<()> {
1099        let name_id = self.interner.get_optional(entity_name)
1100            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1101        let hash = self.interner.get_hash(name_id);
1102        let slot = self
1103            .name_table
1104            .lookup(hash, name_id)
1105            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1106        // Confirm the slot is live before logging.
1107        self.entity_slots
1108            .get(slot as usize)
1109            .and_then(|e| e.as_ref())
1110            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1111        let remove_ids: AHashSet<StrId> = observations.iter().map(|o| self.interner.intern(o)).collect();
1112
1113        // Write-ahead: log before touching in-memory state (C3).
1114        let mut buf = Vec::new();
1115        store_enc::encode_delete_observations(&mut buf, entity_name, observations)
1116            .map_err(MCSError::IoError)?;
1117        self.store.write_record(RecordKind::DeleteObservations, &buf)
1118            .map_err(MCSError::IoError)?;
1119
1120        // Logged — now apply.
1121        let stored = self
1122            .entity_slots
1123            .get_mut(slot as usize)
1124            .and_then(|e| e.as_mut())
1125            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1126        stored.observations.retain(|o| !remove_ids.contains(o));
1127        self.search.remove_entity(slot);
1128        self.search
1129            .index_entity(&mut self.interner, slot, stored.name, stored.entity_type, &stored.observations);
1130        Ok(())
1131    }
1132
1133    pub fn delete_relations(&mut self, relations: &[Relation]) -> Result<()> {
1134        // Collect targets into a AHashSet for O(1) retain checks (P5)
1135        let rels: AHashSet<(StrId, StrId, StrId)> = relations
1136            .iter()
1137            .map(|r| {
1138                (
1139                    self.interner.intern(&r.from),
1140                    self.interner.intern(&r.to),
1141                    self.interner.intern(&r.relation_type),
1142                )
1143            })
1144            .collect();
1145        // Write-ahead: log every deletion before mutating in-memory state (C3),
1146        // so a failed write can't leave memory ahead of the log.
1147        for relation in relations {
1148            let mut buf = Vec::new();
1149            store_enc::encode_delete_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
1150                .map_err(MCSError::IoError)?;
1151            self.store.write_record(RecordKind::DeleteRelation, &buf)
1152                .map_err(MCSError::IoError)?;
1153        }
1154        self.relations
1155            .retain(|r| !rels.contains(&(r.from, r.to, r.relation_type)));
1156        Ok(())
1157    }
1158
1159    pub fn read_graph(&self) -> KnowledgeGraphOut {
1160        self.read_graph_view().to_owned_out()
1161    }
1162
1163    /// Borrowing, allocation-light view of the full graph (M6). Serializing it
1164    /// streams interned `&str` directly instead of materializing a `String`
1165    /// per name/type/observation.
1166    pub fn read_graph_view(&self) -> GraphView<'_> {
1167        let entities: Vec<&StoredEntity> = self
1168            .entity_slots
1169            .iter()
1170            .filter_map(|s| s.as_ref().filter(|e| e.is_live()))
1171            .collect();
1172        let relations: Vec<&StoredRelation> = self.relations.iter().collect();
1173        GraphView { kg: self, entities, relations }
1174    }
1175
1176    /// Relevance-ranked substring search returning all matches (no pagination).
1177    /// Equivalent to `search_nodes_filtered(query, None, 0, usize::MAX)`.
1178    pub fn search_nodes(&self, query: &str) -> KnowledgeGraphOut {
1179        self.search_nodes_filtered(query, None, 0, usize::MAX)
1180    }
1181
1182    pub fn open_nodes(&self, names: &[String]) -> KnowledgeGraphOut {
1183        self.open_nodes_view(names).to_owned_out()
1184    }
1185
1186    /// Borrowing view variant of [`open_nodes`] (M6).
1187    pub fn open_nodes_view(&self, names: &[String]) -> GraphView<'_> {
1188        let name_ids: AHashSet<StrId> = names.iter()
1189            .filter_map(|n| self.interner.get_optional(n))
1190            .collect();
1191        let entities: Vec<&StoredEntity> = self
1192            .entity_slots
1193            .iter()
1194            .filter_map(|s| {
1195                s.as_ref()
1196                    .filter(|stored| stored.is_live() && name_ids.contains(&stored.name))
1197            })
1198            .collect();
1199        let matched_names: AHashSet<StrId> = entities.iter().map(|e| e.name).collect();
1200        let relations: Vec<&StoredRelation> = self
1201            .relations
1202            .iter()
1203            .filter(|r| matched_names.contains(&r.from) || matched_names.contains(&r.to))
1204            .collect();
1205        GraphView { kg: self, entities, relations }
1206    }
1207
1208    // -----------------------------------------------------------------------
1209    // Internal helpers
1210    // -----------------------------------------------------------------------
1211
1212    fn entity_to_output(&self, stored: &StoredEntity) -> Entity {
1213        Entity {
1214            name: self.interner.lookup(stored.name).to_string(),
1215            entity_type: self.interner.lookup(stored.entity_type).to_string(),
1216            observations: stored
1217                .observations
1218                .iter()
1219                .map(|o| self.interner.lookup(*o).to_string())
1220                .collect(),
1221        }
1222    }
1223
1224    #[inline]
1225    fn relation_to_output(&self, r: &StoredRelation) -> Relation {
1226        Relation {
1227            from: self.interner.lookup(r.from).to_string(),
1228            to: self.interner.lookup(r.to).to_string(),
1229            relation_type: self.interner.lookup(r.relation_type).to_string(),
1230        }
1231    }
1232
1233    /// Resolve a name to its live entity slot, or `None` if absent/deleted.
1234    fn lookup_live_slot(&self, name: &str) -> Option<u32> {
1235        let name_id = self.interner.get_optional(name)?;
1236        let hash = self.interner.get_hash(name_id);
1237        let slot = self.name_table.lookup(hash, name_id)?;
1238        let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
1239        stored.is_live().then_some(slot)
1240    }
1241
1242    /// Materialize a live entity from its interned name id.
1243    fn entity_by_name_id(&self, name_id: StrId) -> Option<Entity> {
1244        let hash = self.interner.get_hash(name_id);
1245        let slot = self.name_table.lookup(hash, name_id)?;
1246        let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
1247        stored.is_live().then(|| self.entity_to_output(stored))
1248    }
1249
1250    /// Tally distinct entity types and their live-entity counts, ranked by
1251    /// count descending (ties broken by name). One linear pass over the dense
1252    /// slot vec; only the final names are allocated.
1253    pub fn entity_type_counts(&self) -> Vec<(String, usize)> {
1254        let mut counts: AHashMap<StrId, usize> = AHashMap::new();
1255        for st in self
1256            .entity_slots
1257            .iter()
1258            .filter_map(|s| s.as_ref())
1259            .filter(|e| e.is_live())
1260        {
1261            *counts.entry(st.entity_type).or_insert(0) += 1;
1262        }
1263        self.rank_counts(counts)
1264    }
1265
1266    /// Tally distinct relation types and their counts, ranked by count desc.
1267    pub fn relation_type_counts(&self) -> Vec<(String, usize)> {
1268        let mut counts: AHashMap<StrId, usize> = AHashMap::new();
1269        for r in &self.relations {
1270            *counts.entry(r.relation_type).or_insert(0) += 1;
1271        }
1272        self.rank_counts(counts)
1273    }
1274
1275    fn rank_counts(&self, counts: AHashMap<StrId, usize>) -> Vec<(String, usize)> {
1276        let mut out: Vec<(String, usize)> = counts
1277            .into_iter()
1278            .map(|(id, c)| (self.interner.lookup(id).to_string(), c))
1279            .collect();
1280        out.sort_unstable_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
1281        out
1282    }
1283
1284    /// Relevance-ranked, optionally type-filtered, paginated node search.
1285    /// Entities come back best-match-first (see [`SearchIndex::search_ranked`]).
1286    /// Relations touching any returned entity (either endpoint) are included.
1287    pub fn search_nodes_filtered(
1288        &self,
1289        query: &str,
1290        entity_type: Option<&str>,
1291        offset: usize,
1292        limit: usize,
1293    ) -> KnowledgeGraphOut {
1294        self.search_nodes_view(query, entity_type, offset, limit).to_owned_out()
1295    }
1296
1297    /// Borrowing view variant of [`search_nodes_filtered`] (M6).
1298    pub fn search_nodes_view(
1299        &self,
1300        query: &str,
1301        entity_type: Option<&str>,
1302        offset: usize,
1303        limit: usize,
1304    ) -> GraphView<'_> {
1305        let type_id = match entity_type {
1306            Some(t) => match self.interner.get_optional(t) {
1307                Some(id) => Some(id),
1308                None => return GraphView { kg: self, entities: Vec::new(), relations: Vec::new() },
1309            },
1310            None => None,
1311        };
1312
1313        let ranked = self.search.search_ranked(query, &self.interner);
1314        let mut selected: AHashSet<StrId> = AHashSet::new();
1315        let mut entities: Vec<&StoredEntity> = Vec::new();
1316        let mut skipped = 0usize;
1317        for (slot, _score) in ranked {
1318            let Some(st) = self
1319                .entity_slots
1320                .get(slot as usize)
1321                .and_then(|s| s.as_ref())
1322                .filter(|e| e.is_live())
1323            else {
1324                continue;
1325            };
1326            if type_id.is_some_and(|tid| st.entity_type != tid) {
1327                continue;
1328            }
1329            if skipped < offset {
1330                skipped += 1;
1331                continue;
1332            }
1333            if entities.len() >= limit {
1334                break;
1335            }
1336            selected.insert(st.name);
1337            entities.push(st);
1338        }
1339
1340        let relations: Vec<&StoredRelation> = self
1341            .relations
1342            .iter()
1343            .filter(|r| selected.contains(&r.from) || selected.contains(&r.to))
1344            .collect();
1345        GraphView { kg: self, entities, relations }
1346    }
1347
1348    /// Type-filtered, paginated view of the whole graph. Unlike [`read_graph`],
1349    /// relations are restricted to those whose **both** endpoints fall in the
1350    /// returned entity page, so the slice is internally consistent.
1351    pub fn read_graph_filtered(
1352        &self,
1353        entity_type: Option<&str>,
1354        offset: usize,
1355        limit: usize,
1356    ) -> KnowledgeGraphOut {
1357        self.read_graph_filtered_view(entity_type, offset, limit).to_owned_out()
1358    }
1359
1360    /// Borrowing view variant of [`read_graph_filtered`] (M6).
1361    pub fn read_graph_filtered_view(
1362        &self,
1363        entity_type: Option<&str>,
1364        offset: usize,
1365        limit: usize,
1366    ) -> GraphView<'_> {
1367        let type_id = match entity_type {
1368            Some(t) => match self.interner.get_optional(t) {
1369                Some(id) => Some(id),
1370                None => return GraphView { kg: self, entities: Vec::new(), relations: Vec::new() },
1371            },
1372            None => None,
1373        };
1374
1375        let mut selected: AHashSet<StrId> = AHashSet::new();
1376        let mut entities: Vec<&StoredEntity> = Vec::new();
1377        let mut skipped = 0usize;
1378        for st in self
1379            .entity_slots
1380            .iter()
1381            .filter_map(|s| s.as_ref())
1382            .filter(|e| e.is_live())
1383        {
1384            if type_id.is_some_and(|tid| st.entity_type != tid) {
1385                continue;
1386            }
1387            if skipped < offset {
1388                skipped += 1;
1389                continue;
1390            }
1391            if entities.len() >= limit {
1392                break;
1393            }
1394            selected.insert(st.name);
1395            entities.push(st);
1396        }
1397
1398        let relations: Vec<&StoredRelation> = self
1399            .relations
1400            .iter()
1401            .filter(|r| selected.contains(&r.from) && selected.contains(&r.to))
1402            .collect();
1403        GraphView { kg: self, entities, relations }
1404    }
1405
1406    /// Neighborhood expansion around `name` out to `depth` hops, following
1407    /// edges in the requested [`Direction`] and (optionally) of one relation
1408    /// type. Returns the origin plus reached entities, and every relation
1409    /// (passing the type filter) whose endpoints are both inside that set.
1410    ///
1411    /// `depth == 1` (the common case) is a single linear pass over the flat
1412    /// relation vec; deeper queries build an adjacency map once (O(E)) and BFS.
1413    pub fn neighbors(
1414        &self,
1415        name: &str,
1416        direction: Direction,
1417        rtype: Option<&str>,
1418        depth: u32,
1419    ) -> Result<KnowledgeGraphOut> {
1420        self.lookup_live_slot(name)
1421            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
1422        // Safe: lookup_live_slot succeeded, so the name is interned.
1423        let start = self.interner.get_optional(name).unwrap();
1424
1425        // An unknown relation-type filter can match nothing: return just origin.
1426        let rtype_id = match rtype {
1427            Some(r) => match self.interner.get_optional(r) {
1428                Some(id) => Some(id),
1429                None => {
1430                    let entities = self.entity_by_name_id(start).into_iter().collect();
1431                    return Ok(KnowledgeGraphOut { entities, relations: Vec::new() });
1432                }
1433            },
1434            None => None,
1435        };
1436
1437        let mut visited: AHashSet<StrId> = AHashSet::new();
1438        visited.insert(start);
1439
1440        let type_ok = |r: &StoredRelation| rtype_id.is_none_or(|rt| r.relation_type == rt);
1441
1442        if depth == 1 {
1443            for r in self.relations.iter().filter(|r| type_ok(r)) {
1444                match direction {
1445                    Direction::Out => {
1446                        if r.from == start {
1447                            visited.insert(r.to);
1448                        }
1449                    }
1450                    Direction::In => {
1451                        if r.to == start {
1452                            visited.insert(r.from);
1453                        }
1454                    }
1455                    Direction::Both => {
1456                        if r.from == start {
1457                            visited.insert(r.to);
1458                        } else if r.to == start {
1459                            visited.insert(r.from);
1460                        }
1461                    }
1462                }
1463            }
1464        } else if depth >= 2 {
1465            // Build a direction-aware adjacency map once, then BFS.
1466            let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
1467            for r in self.relations.iter().filter(|r| type_ok(r)) {
1468                match direction {
1469                    Direction::Out => adj.entry(r.from).or_default().push(r.to),
1470                    Direction::In => adj.entry(r.to).or_default().push(r.from),
1471                    Direction::Both => {
1472                        adj.entry(r.from).or_default().push(r.to);
1473                        adj.entry(r.to).or_default().push(r.from);
1474                    }
1475                }
1476            }
1477            let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
1478            queue.push_back((start, 0));
1479            while let Some((node, d)) = queue.pop_front() {
1480                if d >= depth {
1481                    continue;
1482                }
1483                if let Some(nbrs) = adj.get(&node) {
1484                    for &nb in nbrs {
1485                        if visited.insert(nb) {
1486                            queue.push_back((nb, d + 1));
1487                        }
1488                    }
1489                }
1490            }
1491        }
1492
1493        let mut entities = Vec::with_capacity(visited.len());
1494        for &nid in &visited {
1495            if let Some(e) = self.entity_by_name_id(nid) {
1496                entities.push(e);
1497            }
1498        }
1499        let relations = self
1500            .relations
1501            .iter()
1502            .filter(|r| type_ok(r) && visited.contains(&r.from) && visited.contains(&r.to))
1503            .map(|r| self.relation_to_output(r))
1504            .collect();
1505        Ok(KnowledgeGraphOut { entities, relations })
1506    }
1507
1508    /// One-shot context bundle for a single entity: the entity itself, every
1509    /// incident relation, its distinct neighbor names, and its degree. Saves an
1510    /// agent the get_entity + two search_relations round-trips.
1511    pub fn describe_entity(&self, name: &str) -> Result<serde_json::Value> {
1512        let name_id = self
1513            .interner
1514            .get_optional(name)
1515            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
1516        let entity = self
1517            .entity_by_name_id(name_id)
1518            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
1519
1520        let mut incident: Vec<Relation> = Vec::new();
1521        let mut neighbor_seen: AHashSet<StrId> = AHashSet::new();
1522        let mut neighbors: Vec<&str> = Vec::new();
1523        for r in &self.relations {
1524            if r.from == name_id || r.to == name_id {
1525                incident.push(self.relation_to_output(r));
1526                let other = if r.from == name_id { r.to } else { r.from };
1527                if other != name_id && neighbor_seen.insert(other) {
1528                    neighbors.push(self.interner.lookup(other));
1529                }
1530            }
1531        }
1532
1533        Ok(serde_json::json!({
1534            "entity": entity,
1535            "relations": incident,
1536            "neighbors": neighbors,
1537            "degree": incident.len(),
1538        }))
1539    }
1540
1541    /// Create-or-merge a batch of entities idempotently. Missing entities are
1542    /// created; existing ones keep their type and gain any new observations
1543    /// (deduplicated). Returns a per-entity outcome. The caller is responsible
1544    /// for flushing — every underlying op is already write-ahead logged.
1545    pub fn upsert_entities(&mut self, entities: &[Entity]) -> Result<Vec<serde_json::Value>> {
1546        for e in entities {
1547            if e.name.is_empty() {
1548                return Err(MCSError::InvalidParams(
1549                    "Entity name must not be empty".into(),
1550                ));
1551            }
1552        }
1553        let mut out = Vec::with_capacity(entities.len());
1554        for e in entities {
1555            if self.lookup_live_slot(&e.name).is_some() {
1556                let added = self.add_observations(&e.name, &e.observations)?;
1557                out.push(serde_json::json!({
1558                    "name": e.name,
1559                    "created": false,
1560                    "addedObservations": added,
1561                }));
1562            } else {
1563                let created = self.create_entities(std::slice::from_ref(e))?;
1564                out.push(serde_json::json!({
1565                    "name": e.name,
1566                    "created": !created.is_empty(),
1567                    "addedObservations": e.observations,
1568                }));
1569            }
1570        }
1571        Ok(out)
1572    }
1573
1574    /// Serialize the graph in one of: `json` (read_graph), `mermaid`, `dot`.
1575    pub fn export(&self, format: &str) -> Result<String> {
1576        match format {
1577            "json" => serde_json::to_string(&self.read_graph()).map_err(MCSError::JsonError),
1578            "mermaid" => Ok(self.export_mermaid()),
1579            "dot" => Ok(self.export_dot()),
1580            other => Err(MCSError::InvalidParams(format!(
1581                "Unknown export format '{other}' (expected json|mermaid|dot)"
1582            ))),
1583        }
1584    }
1585
1586    /// Assign each live entity a stable `n{k}` node id for diagram output.
1587    fn diagram_node_ids(&self) -> (AHashMap<StrId, usize>, Vec<(usize, StrId)>) {
1588        let mut ids: AHashMap<StrId, usize> = AHashMap::new();
1589        let mut order: Vec<(usize, StrId)> = Vec::new();
1590        for st in self
1591            .entity_slots
1592            .iter()
1593            .filter_map(|s| s.as_ref())
1594            .filter(|e| e.is_live())
1595        {
1596            let n = ids.len();
1597            ids.insert(st.name, n);
1598            order.push((n, st.name));
1599        }
1600        (ids, order)
1601    }
1602
1603    fn export_mermaid(&self) -> String {
1604        let (ids, order) = self.diagram_node_ids();
1605        let mut s = String::with_capacity(64 + order.len() * 32 + self.relations.len() * 32);
1606        s.push_str("graph LR\n");
1607        for (n, name_id) in &order {
1608            let label = sanitize_label(self.interner.lookup(*name_id));
1609            s.push_str(&format!("  n{n}[\"{label}\"]\n"));
1610        }
1611        for r in &self.relations {
1612            if let (Some(&a), Some(&b)) = (ids.get(&r.from), ids.get(&r.to)) {
1613                let rel = sanitize_label(self.interner.lookup(r.relation_type));
1614                s.push_str(&format!("  n{a} -->|{rel}| n{b}\n"));
1615            }
1616        }
1617        s
1618    }
1619
1620    fn export_dot(&self) -> String {
1621        let (ids, order) = self.diagram_node_ids();
1622        let mut s = String::with_capacity(64 + order.len() * 32 + self.relations.len() * 32);
1623        s.push_str("digraph G {\n");
1624        for (n, name_id) in &order {
1625            let label = sanitize_label(self.interner.lookup(*name_id));
1626            s.push_str(&format!("  n{n} [label=\"{label}\"];\n"));
1627        }
1628        for r in &self.relations {
1629            if let (Some(&a), Some(&b)) = (ids.get(&r.from), ids.get(&r.to)) {
1630                let rel = sanitize_label(self.interner.lookup(r.relation_type));
1631                s.push_str(&format!("  n{a} -> n{b} [label=\"{rel}\"];\n"));
1632            }
1633        }
1634        s.push_str("}\n");
1635        s
1636    }
1637
1638    // ------ High-level productivity tools ------
1639
1640    /// Merge `source` entity into `target` entity. All observations from
1641    /// source are moved to target (deduplicated), all relations involving
1642    /// source are redirected to target (deduplicated), and source is then
1643    /// deleted.
1644    ///
1645    /// The whole merge is **atomic**: every sub-record is written to the log
1646    /// inside a single `TxnBegin`…`TxnCommit` transaction *before* any in-memory
1647    /// mutation. A failed or torn write therefore leaves both memory and the
1648    /// log untouched (an uncommitted transaction is discarded on replay), so the
1649    /// graph can never observe a half-applied merge. Caller flushes.
1650    pub fn merge_entities(&mut self, source: &str, target: &str) -> Result<serde_json::Value> {
1651        if source == target {
1652            return Err(MCSError::InvalidParams(
1653                "Source and target must be different entities".into(),
1654            ));
1655        }
1656        self.lookup_live_slot(source).ok_or_else(|| {
1657            MCSError::InvalidParams(format!("Source entity '{source}' not found"))
1658        })?;
1659        let target_slot = self.lookup_live_slot(target).ok_or_else(|| {
1660            MCSError::InvalidParams(format!("Target entity '{target}' not found"))
1661        })?;
1662
1663        let source_entity = self.get_entity(source).unwrap();
1664        let moved_obs_count = source_entity.observations.len();
1665        let source_id = self.interner.get_optional(source).unwrap();
1666        let target_id = self.interner.get_optional(target).unwrap();
1667
1668        // Observations to move: dedup against target's existing set and within
1669        // the batch (matching what `add_observations` would have done).
1670        let target_existing: AHashSet<StrId> = self.entity_slots[target_slot as usize]
1671            .as_ref()
1672            .unwrap()
1673            .observations
1674            .iter()
1675            .copied()
1676            .collect();
1677        let mut obs_seen: AHashSet<StrId> = AHashSet::new();
1678        let mut obs_to_add: Vec<String> = Vec::new();
1679        for o in &source_entity.observations {
1680            if let Some(oid) = self.interner.get_optional(o)
1681                && !target_existing.contains(&oid)
1682                && obs_seen.insert(oid)
1683            {
1684                obs_to_add.push(o.clone());
1685            }
1686        }
1687
1688        // Relations to redirect: replace source with target, drop self-loops,
1689        // and dedup against existing relations and within the batch.
1690        let existing_rels: AHashSet<(StrId, StrId, StrId)> =
1691            self.relations.iter().map(|r| (r.from, r.to, r.relation_type)).collect();
1692        let mut rel_seen: AHashSet<(StrId, StrId, StrId)> = AHashSet::new();
1693        let mut redirect: Vec<Relation> = Vec::new();
1694        for r in &self.relations {
1695            if r.from != source_id && r.to != source_id {
1696                continue;
1697            }
1698            let new_from = if r.from == source_id { target_id } else { r.from };
1699            let new_to = if r.to == source_id { target_id } else { r.to };
1700            if new_from == new_to {
1701                continue; // self-loop after redirect
1702            }
1703            let key = (new_from, new_to, r.relation_type);
1704            if existing_rels.contains(&key) || !rel_seen.insert(key) {
1705                continue;
1706            }
1707            redirect.push(Relation {
1708                from: self.interner.lookup(new_from).to_string(),
1709                to: self.interner.lookup(new_to).to_string(),
1710                relation_type: self.interner.lookup(r.relation_type).to_string(),
1711            });
1712        }
1713
1714        let added_count = obs_to_add.len();
1715        let redirected = redirect.len() as u32;
1716
1717        // Build every record up front so writing is the only fallible step.
1718        let mut records: Vec<(RecordKind, Vec<u8>)> = Vec::new();
1719        if !obs_to_add.is_empty() {
1720            let mut buf = Vec::new();
1721            store_enc::encode_add_observations(&mut buf, target, &obs_to_add)
1722                .map_err(MCSError::IoError)?;
1723            records.push((RecordKind::AddObservations, buf));
1724        }
1725        for r in &redirect {
1726            let mut buf = Vec::new();
1727            store_enc::encode_create_relation(&mut buf, &r.from, &r.to, &r.relation_type)
1728                .map_err(MCSError::IoError)?;
1729            records.push((RecordKind::CreateRelation, buf));
1730        }
1731        let mut del_buf = Vec::new();
1732        store_enc::encode_delete_entity(&mut del_buf, source).map_err(MCSError::IoError)?;
1733        records.push((RecordKind::DeleteEntity, del_buf));
1734
1735        // Write-ahead, transactionally: begin, all records, commit.
1736        self.store.write_record(RecordKind::TxnBegin, &[]).map_err(MCSError::IoError)?;
1737        for (kind, data) in &records {
1738            self.store.write_record(*kind, data).map_err(MCSError::IoError)?;
1739        }
1740        self.store.write_record(RecordKind::TxnCommit, &[]).map_err(MCSError::IoError)?;
1741
1742        // Logged and committed — now apply to in-memory state (no more logging).
1743        for (kind, data) in &records {
1744            Self::apply_record(
1745                *kind, data, &mut self.interner, &mut self.entity_slots, &mut self.search,
1746                &mut self.name_table, &mut self.relations,
1747            );
1748        }
1749
1750        Ok(serde_json::json!({
1751            "source": source,
1752            "target": target,
1753            "movedObservations": moved_obs_count,
1754            "addedObservations": added_count,
1755            "redirectedRelations": redirected,
1756        }))
1757    }
1758
1759    /// Extract a connected subgraph around one or more seed entity names,
1760    /// expanding out to `depth` hops along all relations (undirected). Returns
1761    /// the set of reached entities and the relations among them.
1762    pub fn extract_subgraph(&self, names: &[String], depth: u32) -> Result<KnowledgeGraphOut> {
1763        if names.is_empty() {
1764            return Ok(KnowledgeGraphOut {
1765                entities: Vec::new(),
1766                relations: Vec::new(),
1767            });
1768        }
1769        // Seed the BFS queue from any names that exist.
1770        let mut visited: AHashSet<StrId> = AHashSet::new();
1771        let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
1772        for name in names {
1773            if let Some(id) = self.interner.get_optional(name)
1774                && visited.insert(id)
1775            {
1776                queue.push_back((id, 0));
1777            }
1778        }
1779        // Build an undirected adjacency map once.
1780        let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
1781        for r in &self.relations {
1782            adj.entry(r.from).or_default().push(r.to);
1783            adj.entry(r.to).or_default().push(r.from);
1784        }
1785        while let Some((node, d)) = queue.pop_front() {
1786            if d >= depth {
1787                continue;
1788            }
1789            if let Some(nbrs) = adj.get(&node) {
1790                for &nb in nbrs {
1791                    if visited.insert(nb) {
1792                        queue.push_back((nb, d + 1));
1793                    }
1794                }
1795            }
1796        }
1797        let mut entities: Vec<Entity> = Vec::with_capacity(visited.len());
1798        for &nid in &visited {
1799            if let Some(e) = self.entity_by_name_id(nid) {
1800                entities.push(e);
1801            }
1802        }
1803        let relations: Vec<Relation> = self
1804            .relations
1805            .iter()
1806            .filter(|r| visited.contains(&r.from) && visited.contains(&r.to))
1807            .map(|r| self.relation_to_output(r))
1808            .collect();
1809        Ok(KnowledgeGraphOut { entities, relations })
1810    }
1811
1812    /// Return full entities for a list of names. Missing names yield `None`.
1813    pub fn batch_get_entities(&self, names: &[String]) -> Vec<Option<Entity>> {
1814        names.iter().map(|n| self.get_entity(n)).collect()
1815    }
1816
1817    /// Recursive DFS helper — collects every simple path from `current` to
1818    /// `target` up to `max_depth` hops, capped at `max_paths` results.
1819    #[allow(clippy::too_many_arguments)]
1820    fn dfs_all_paths(
1821        adj: &AHashMap<StrId, Vec<StrId>>,
1822        current: StrId,
1823        target: StrId,
1824        max_depth: usize,
1825        max_paths: usize,
1826        visited: &mut AHashSet<StrId>,
1827        current_path: &mut Vec<StrId>,
1828        all_paths: &mut Vec<Vec<StrId>>,
1829    ) {
1830        if all_paths.len() >= max_paths {
1831            return;
1832        }
1833        if current == target && current_path.len() > 1 {
1834            all_paths.push(current_path.clone());
1835            return;
1836        }
1837        if current_path.len() > max_depth {
1838            return;
1839        }
1840        if let Some(neighbors) = adj.get(&current) {
1841            for &nb in neighbors {
1842                if visited.insert(nb) {
1843                    current_path.push(nb);
1844                    Self::dfs_all_paths(
1845                        adj, nb, target, max_depth, max_paths, visited, current_path, all_paths,
1846                    );
1847                    current_path.pop();
1848                    visited.remove(&nb);
1849                }
1850            }
1851        }
1852    }
1853
1854    /// Find all simple paths between `from` and `to` up to `max_depth` hops,
1855    /// returning at most `max_paths` results. Paths are found via DFS with
1856    /// backtracking and include both endpoints.
1857    pub fn find_all_paths(
1858        &self,
1859        from: &str,
1860        to: &str,
1861        max_depth: usize,
1862        max_paths: usize,
1863    ) -> Result<Vec<Vec<String>>> {
1864        let from_id = self
1865            .interner
1866            .get_optional(from)
1867            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
1868        let to_id = self
1869            .interner
1870            .get_optional(to)
1871            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
1872        // Verify both are live.
1873        if self.lookup_live_slot(from).is_none() {
1874            return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
1875        }
1876        if self.lookup_live_slot(to).is_none() {
1877            return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
1878        }
1879        if from_id == to_id {
1880            return Ok(vec![vec![from.to_string()]]);
1881        }
1882        // Build undirected adjacency.
1883        let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
1884        for r in &self.relations {
1885            adj.entry(r.from).or_default().push(r.to);
1886            adj.entry(r.to).or_default().push(r.from);
1887        }
1888        let mut all_paths: Vec<Vec<StrId>> = Vec::new();
1889        let mut current_path = Vec::new();
1890        let mut visited: AHashSet<StrId> = AHashSet::new();
1891        visited.insert(from_id);
1892        current_path.push(from_id);
1893        Self::dfs_all_paths(
1894            &adj,
1895            from_id,
1896            to_id,
1897            max_depth,
1898            max_paths,
1899            &mut visited,
1900            &mut current_path,
1901            &mut all_paths,
1902        );
1903        if all_paths.is_empty() {
1904            return Err(MCSError::MemoryError(format!(
1905                "No path found between '{from}' and '{to}'"
1906            )));
1907        }
1908        let result: Vec<Vec<String>> = all_paths
1909            .into_iter()
1910            .map(|path| {
1911                path.into_iter()
1912                    .map(|id| self.interner.lookup(id).to_string())
1913                    .collect()
1914            })
1915            .collect();
1916        Ok(result)
1917    }
1918
1919    // --- Flush & sync ---
1920
1921    /// Flush and fsync the log to stable storage.
1922    pub fn flush_and_sync(&mut self) -> Result<()> {
1923        self.store.flush_and_sync().map_err(MCSError::IoError)
1924    }
1925}