Skip to main content

mcp_memory/
kg.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2use std::path::Path;
3
4use crate::errors::{MCSError, Result};
5use crate::intern::{StrId, StringInterner};
6use crate::types::{Entity, Relation, KnowledgeGraphOut};
7use crate::search::SearchIndex;
8use crate::store::{self as store_enc, BinaryStore, RecordKind};
9
10const ENTITY_SLOT_LIVE: u8 = 1;
11const NAME_TABLE_SHARDS: usize = 4;
12
13// ---------------------------------------------------------------------------
14// Prefetch helper – issues a non-binding software prefetch hint to pull a
15// cache-line into L1/L2 while we finish probing the current entry.
16// ---------------------------------------------------------------------------
17#[cfg(target_arch = "x86_64")]
18#[inline(always)]
19unsafe fn prefetch_addr(addr: *const u8) {
20    // _MM_HINT_T0 = 3  (temporal prefetch to all cache levels)
21    std::arch::x86_64::_mm_prefetch::<3>(addr);
22}
23
24#[cfg(not(target_arch = "x86_64"))]
25#[inline(always)]
26const unsafe fn prefetch_addr(_addr: *const u8) {}
27
28// ---------------------------------------------------------------------------
29// StoredEntity / StoredRelation – internal representations using StrId.
30// ---------------------------------------------------------------------------
31struct StoredEntity {
32    state: u8,
33    name: StrId,
34    entity_type: StrId,
35    observations: Vec<StrId>,
36}
37
38impl StoredEntity {
39    const fn is_live(&self) -> bool {
40        self.state == ENTITY_SLOT_LIVE
41    }
42}
43
44struct StoredRelation {
45    from: StrId,
46    to: StrId,
47    relation_type: StrId,
48}
49
50// ---------------------------------------------------------------------------
51// ShardedNameTable – open-addressing hash map split into N independent shards.
52//
53// Each shard uses **ctrl-byte bucket** approach: a 1-byte metadata array
54// stores the 7-bit hash stamp (h2) for each slot, with `0xFF` = EMPTY.
55// On probe, the first memory access is a single byte (ctrl). The full key
56// (StrId) is only compared when the stamp matches — ~127/128 of probe steps
57// touch nothing but the ctrl byte.  See also SwissTable / hashbrown.
58// ---------------------------------------------------------------------------
59const EMPTY_SLOT: u8 = 0xFF;
60
61#[inline(always)]
62const fn h2(hash: u64) -> u8 {
63    (hash & 0x7F) as u8
64}
65
66#[inline(always)]
67const fn h1(hash: u64, mask: usize) -> usize {
68    ((hash >> 7) as usize) & mask
69}
70
71struct NameTableShard {
72    ctrl: Vec<u8>,      // 0xFF = empty; 0x00-0x7F = h2 stamp (bit 7 always clear)
73    hashes: Vec<u64>,   // full 64-bit hash (used only during grow/rehash)
74    names: Vec<StrId>,
75    slots: Vec<u32>,
76    mask: usize,
77    count: usize,
78}
79
80impl NameTableShard {
81    fn new(capacity: usize) -> Self {
82        let cap = capacity.next_power_of_two().max(16);
83        Self {
84            ctrl: vec![EMPTY_SLOT; cap],
85            hashes: vec![0; cap],
86            names: vec![StrId::EMPTY; cap],
87            slots: vec![u32::MAX; cap],
88            mask: cap - 1,
89            count: 0,
90        }
91    }
92
93    #[inline(always)]
94    fn lookup(&self, hash: u64, name: StrId) -> Option<u32> {
95        let stamp = h2(hash);
96        let mask = self.mask;
97        let mut idx = h1(hash, mask);
98        let ctrl = self.ctrl.as_ptr();
99        let names = self.names.as_ptr();
100        let slots = self.slots.as_ptr();
101        let len = self.ctrl.len();
102
103        for _ in 0..len {
104            // Prefetch the ctrl byte 4 slots ahead — overlaps memory latency.
105            let prefetch_idx = idx.wrapping_add(4) & mask;
106            unsafe { prefetch_addr(ctrl.add(prefetch_idx)) };
107
108            // SAFETY: idx always < len because of &mask on each iteration.
109            unsafe {
110                let c = *ctrl.add(idx);
111                // Bit 7 set → EMPTY → key not present.
112                if c & 0x80 != 0 {
113                    return None;
114                }
115                // Stamp match → compare full key (rare: ~1/128 probes).
116                if c == stamp && *names.add(idx) == name {
117                    return Some(*slots.add(idx));
118                }
119            }
120            idx = (idx + 1) & mask;
121        }
122        None
123    }
124
125    fn insert(&mut self, hash: u64, name: StrId, slot: u32) {
126        if self.count * 4 > self.ctrl.len() * 3 {
127            self.grow();
128        }
129        let stamp = h2(hash);
130        let mask = self.mask;
131        let mut idx = h1(hash, mask);
132        loop {
133            // SAFETY: idx & mask always < len for power-of-two capacity.
134            unsafe {
135                if *self.ctrl.get_unchecked(idx) & 0x80 != 0 {
136                    *self.ctrl.get_unchecked_mut(idx) = stamp;
137                    *self.hashes.get_unchecked_mut(idx) = hash;
138                    *self.names.get_unchecked_mut(idx) = name;
139                    *self.slots.get_unchecked_mut(idx) = slot;
140                    self.count += 1;
141                    return;
142                }
143            }
144            idx = (idx + 1) & mask;
145        }
146    }
147
148    fn remove(&mut self, hash: u64, name: StrId) {
149        let stamp = h2(hash);
150        let mask = self.mask;
151        let mut idx = h1(hash, mask);
152        let len = self.ctrl.len();
153        for _ in 0..len {
154            if self.ctrl[idx] & 0x80 != 0 {
155                return;
156            }
157            if self.ctrl[idx] == stamp && self.names[idx] == name {
158                // Found — remove with shift-back to preserve probe chains.
159                self.ctrl[idx] = EMPTY_SLOT;
160                self.hashes[idx] = 0;
161                self.names[idx] = StrId::EMPTY;
162                self.slots[idx] = u32::MAX;
163                self.count -= 1;
164
165                let mut next = (idx + 1) & mask;
166                while self.ctrl[next] & 0x80 == 0 {
167                    let nh = self.hashes[next];
168                    let nn = self.names[next];
169                    let ns = self.slots[next];
170                    self.ctrl[next] = EMPTY_SLOT;
171                    self.hashes[next] = 0;
172                    self.names[next] = StrId::EMPTY;
173                    self.slots[next] = u32::MAX;
174                    self.count -= 1;
175
176                    // Re-insert at its ideal bucket.
177                    let nstamp = h2(nh);
178                    let mut re_idx = h1(nh, mask);
179                    while self.ctrl[re_idx] & 0x80 == 0 {
180                        re_idx = (re_idx + 1) & mask;
181                    }
182                    self.ctrl[re_idx] = nstamp;
183                    self.hashes[re_idx] = nh;
184                    self.names[re_idx] = nn;
185                    self.slots[re_idx] = ns;
186                    self.count += 1;
187
188                    next = (next + 1) & mask;
189                }
190                return;
191            }
192            idx = (idx + 1) & mask;
193        }
194    }
195
196    fn grow(&mut self) {
197        let new_cap = self.ctrl.len() * 2;
198        let new_mask = new_cap - 1;
199        let mut new_ctrl = vec![EMPTY_SLOT; new_cap];
200        let mut new_hashes = vec![0u64; new_cap];
201        let mut new_names = vec![StrId::EMPTY; new_cap];
202        let mut new_slots = vec![u32::MAX; new_cap];
203
204        for i in 0..self.ctrl.len() {
205            if self.ctrl[i] & 0x80 == 0 {
206                let hash = self.hashes[i];
207                let stamp = h2(hash);
208                let mut idx = h1(hash, new_mask);
209                while new_ctrl[idx] & 0x80 == 0 {
210                    idx = (idx + 1) & new_mask;
211                }
212                new_ctrl[idx] = stamp;
213                new_hashes[idx] = hash;
214                new_names[idx] = self.names[i];
215                new_slots[idx] = self.slots[i];
216            }
217        }
218
219        self.ctrl = new_ctrl;
220        self.hashes = new_hashes;
221        self.names = new_names;
222        self.slots = new_slots;
223        self.mask = new_mask;
224    }
225}
226
227struct ShardedNameTable {
228    shards: [NameTableShard; NAME_TABLE_SHARDS],
229}
230
231impl ShardedNameTable {
232    fn new(capacity_per_shard: usize) -> Self {
233        Self {
234            shards: [
235                NameTableShard::new(capacity_per_shard),
236                NameTableShard::new(capacity_per_shard),
237                NameTableShard::new(capacity_per_shard),
238                NameTableShard::new(capacity_per_shard),
239            ],
240        }
241    }
242
243    #[inline(always)]
244    const fn shard(hash: u64) -> usize {
245        (hash as usize) & (NAME_TABLE_SHARDS - 1)
246    }
247
248    #[inline(always)]
249    fn lookup(&self, hash: u64, name: StrId) -> Option<u32> {
250        self.shards[Self::shard(hash)].lookup(hash, name)
251    }
252
253    #[inline(always)]
254    fn insert(&mut self, hash: u64, name: StrId, slot: u32) {
255        self.shards[Self::shard(hash)].insert(hash, name, slot);
256    }
257
258    #[inline(always)]
259    fn remove(&mut self, hash: u64, name: StrId) {
260        self.shards[Self::shard(hash)].remove(hash, name);
261    }
262}
263
264// ---------------------------------------------------------------------------
265// KnowledgeGraph – the central type.
266// ---------------------------------------------------------------------------
267pub struct KnowledgeGraph {
268    interner: StringInterner,
269    entity_slots: Vec<Option<StoredEntity>>,
270    name_table: ShardedNameTable,
271    relations: Vec<StoredRelation>,
272    search: SearchIndex,
273    store: BinaryStore,
274}
275
276impl KnowledgeGraph {
277    pub fn new(path: &Path) -> std::io::Result<Self> {
278        let store = BinaryStore::new(path)?;
279
280        // Replay into local collections, then assign into self — no raw pointers needed (X3).
281        let mut interner = StringInterner::with_capacity(65536, 1024);
282        let mut entity_slots: Vec<Option<StoredEntity>> = Vec::with_capacity(256);
283        let mut name_table = ShardedNameTable::new(64);
284        let mut relations: Vec<StoredRelation> = Vec::with_capacity(64);
285        let mut search = SearchIndex::new();
286
287        store.replay(|kind, data| {
288            match kind {
289                RecordKind::CreateEntity => {
290                    if let Some((name, etype, obs)) = store_enc::decode_create_entity(data) {
291                        Self::replay_create_entity(
292                            &mut interner, &mut entity_slots, &mut search, &mut name_table, name, etype, &obs,
293                        );
294                    }
295                }
296                RecordKind::CreateRelation => {
297                    if let Some((from, to, rtype)) = store_enc::decode_create_relation(data) {
298                        let from_id = interner.intern(from);
299                        let to_id = interner.intern(to);
300                        let type_id = interner.intern(rtype);
301                        relations.push(StoredRelation {
302                            from: from_id,
303                            to: to_id,
304                            relation_type: type_id,
305                        });
306                    }
307                }
308                RecordKind::AddObservations => {
309                    if let Some((name, obs)) = store_enc::decode_add_observations(data) {
310                        Self::replay_add_observations(
311                            &mut interner, &mut entity_slots, &mut search, &mut name_table, name, &obs,
312                        );
313                    }
314                }
315                RecordKind::DeleteEntity => {
316                    if let Some(name) = store_enc::decode_delete_entity(data) {
317                        Self::replay_delete_entity(
318                            &mut interner, &mut entity_slots, &mut relations, &mut search, &mut name_table, name,
319                        );
320                    }
321                }
322                RecordKind::DeleteObservations => {
323                    if let Some((name, obs)) = store_enc::decode_delete_observations(data) {
324                        Self::replay_delete_observations(
325                            &mut interner, &mut entity_slots, &mut search, &mut name_table, name, &obs,
326                        );
327                    }
328                }
329                RecordKind::DeleteRelation => {
330                    if let Some((from, to, rtype)) = store_enc::decode_delete_relation(data) {
331                        let from_id = interner.intern(from);
332                        let to_id = interner.intern(to);
333                        let type_id = interner.intern(rtype);
334                        relations.retain(|r| {
335                            !(r.from == from_id && r.to == to_id && r.relation_type == type_id)
336                        });
337                    }
338                }
339            }
340        })?;
341
342        Ok(Self {
343            interner,
344            entity_slots,
345            name_table,
346            relations,
347            search,
348            store,
349        })
350    }
351
352    // -----------------------------------------------------------------------
353    // Replay helpers (static to avoid borrow issues in the closure)
354    // -----------------------------------------------------------------------
355
356    #[allow(clippy::ptr_arg)]
357    fn replay_create_entity(
358        interner: &mut StringInterner,
359        entities: &mut Vec<Option<StoredEntity>>,
360        search: &mut SearchIndex,
361        name_table: &mut ShardedNameTable,
362        name: &str,
363        etype: &str,
364        observations: &[&str],
365    ) {
366        let name_id = interner.intern(name);
367        let type_id = interner.intern(etype);
368        let obs_ids: Vec<StrId> = observations.iter().map(|o| interner.intern(o)).collect();
369        let slot = entities.len() as u32;
370        entities.push(Some(StoredEntity {
371            state: ENTITY_SLOT_LIVE,
372            name: name_id,
373            entity_type: type_id,
374            observations: obs_ids.clone(),
375        }));
376        let hash = interner.get_hash(name_id);
377        name_table.insert(hash, name_id, slot);
378        search.index_entity(interner, slot, name_id, type_id, &obs_ids);
379    }
380
381    fn replay_add_observations(
382        interner: &mut StringInterner,
383        entities: &mut [Option<StoredEntity>],
384        search: &mut SearchIndex,
385        name_table: &mut ShardedNameTable,
386        name: &str,
387        observations: &[&str],
388    ) {
389        let name_id = interner.intern(name);
390        let hash = interner.get_hash(name_id);
391        if let Some(slot) = name_table.lookup(hash, name_id)
392            && let Some(Some(entity)) = entities.get_mut(slot as usize)
393        {
394            for &o in observations {
395                let oid = interner.intern(o);
396                if !entity.observations.contains(&oid) {
397                    entity.observations.push(oid);
398                }
399            }
400            search.remove_entity(slot);
401            search.index_entity(
402                interner,
403                slot,
404                entity.name,
405                entity.entity_type,
406                &entity.observations,
407            );
408        }
409    }
410
411    fn replay_delete_entity(
412        interner: &mut StringInterner,
413        entities: &mut [Option<StoredEntity>],
414        rels: &mut Vec<StoredRelation>,
415        search: &mut SearchIndex,
416        name_table: &mut ShardedNameTable,
417        name: &str,
418    ) {
419        let name_id = interner.intern(name);
420        let hash = interner.get_hash(name_id);
421        if let Some(slot) = name_table.lookup(hash, name_id)
422            && let Some(Some(_)) = entities.get(slot as usize)
423        {
424            entities[slot as usize] = None;
425            search.remove_entity(slot);
426            name_table.remove(hash, name_id);
427        }
428        rels.retain(|r| r.from != name_id && r.to != name_id);
429    }
430
431    fn replay_delete_observations(
432        interner: &mut StringInterner,
433        entities: &mut [Option<StoredEntity>],
434        search: &mut SearchIndex,
435        name_table: &mut ShardedNameTable,
436        name: &str,
437        observations: &[&str],
438    ) {
439        let name_id = interner.intern(name);
440        let hash = interner.get_hash(name_id);
441        if let Some(slot) = name_table.lookup(hash, name_id)
442            && let Some(Some(entity)) = entities.get_mut(slot as usize)
443        {
444            let remove_ids: Vec<StrId> = observations.iter().map(|o| interner.intern(o)).collect();
445            entity.observations.retain(|o| !remove_ids.contains(o));
446            search.remove_entity(slot);
447            search.index_entity(
448                interner,
449                slot,
450                entity.name,
451                entity.entity_type,
452                &entity.observations,
453            );
454        }
455    }
456
457    // -----------------------------------------------------------------------
458    // Public API
459    // -----------------------------------------------------------------------
460
461    pub const fn interner(&self) -> &StringInterner {
462        &self.interner
463    }
464
465    /// Return a single entity by exact name match.
466    pub fn get_entity(&self, name: &str) -> Option<Entity> {
467        let name_id = self.interner.get_optional(name)?;
468        let hash = self.interner.get_hash(name_id);
469        let slot = self.name_table.lookup(hash, name_id)?;
470        let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
471        if !stored.is_live() {
472            return None;
473        }
474        Some(self.entity_to_output(stored))
475    }
476
477    /// Return aggregate statistics about the graph.
478    pub fn graph_stats(&self) -> serde_json::Value {
479        let live_entities = self
480            .entity_slots
481            .iter()
482            .filter(|s| s.as_ref().is_some_and(|e| e.is_live()))
483            .count();
484        let total_relations = self.relations.len();
485        let index_entries = self.search.len();
486        let total_obs: usize = self
487            .entity_slots
488            .iter()
489            .filter_map(|s| s.as_ref())
490            .filter(|e| e.is_live())
491            .map(|e| e.observations.len())
492            .sum();
493
494        serde_json::json!({
495            "entities": live_entities,
496            "relations": total_relations,
497            "totalObservations": total_obs,
498            "searchIndexEntries": index_entries,
499            "internedStrings": self.interner.len(),
500            "internedBytes": self.interner.total_bytes(),
501        })
502    }
503
504    /// Search relations by optional filters: `from`, `to`, `relationType`.
505    /// Any filter that is absent matches everything. A filter value that does
506    /// not exist in the graph returns empty results.
507    pub fn search_relations(&self, from: Option<&str>, to: Option<&str>, rtype: Option<&str>) -> Vec<Relation> {
508        let from_id = match from {
509            Some(f) => match self.interner.get_optional(f) {
510                Some(id) => Some(id),
511                None => return Vec::new(),
512            },
513            None => None,
514        };
515        let to_id = match to {
516            Some(t) => match self.interner.get_optional(t) {
517                Some(id) => Some(id),
518                None => return Vec::new(),
519            },
520            None => None,
521        };
522        let rtype_id = match rtype {
523            Some(r) => match self.interner.get_optional(r) {
524                Some(id) => Some(id),
525                None => return Vec::new(),
526            },
527            None => None,
528        };
529
530        self.relations
531            .iter()
532            .filter(|r| {
533                from_id.is_none_or(|f| r.from == f)
534                    && to_id.is_none_or(|t| r.to == t)
535                    && rtype_id.is_none_or(|rt| r.relation_type == rt)
536            })
537            .map(|r| Relation {
538                from: self.interner.lookup(r.from).to_string(),
539                to: self.interner.lookup(r.to).to_string(),
540                relation_type: self.interner.lookup(r.relation_type).to_string(),
541            })
542            .collect()
543    }
544
545    /// BFS shortest-path between two entity names. Returns the sequence of
546    /// entity names along the path (inclusive of both endpoints).
547    pub fn find_path(&self, from: &str, to: &str) -> Result<Vec<String>> {
548        let from_id = self.interner.get_optional(from)
549            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
550        let to_id = self.interner.get_optional(to)
551            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
552        let hash_from = self.interner.get_hash(from_id);
553        let hash_to = self.interner.get_hash(to_id);
554
555        if self.name_table.lookup(hash_from, from_id).is_none() {
556            return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
557        }
558        if self.name_table.lookup(hash_to, to_id).is_none() {
559            return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
560        }
561        if from_id == to_id {
562            return Ok(vec![from.to_string()]);
563        }
564
565        // Build adjacency list (P4) — O(E) once, not O(V×E).
566        let mut adj: HashMap<StrId, Vec<(StrId, StrId)>> = HashMap::new();
567        for rel in &self.relations {
568            adj.entry(rel.from).or_default().push((rel.to, rel.relation_type));
569            adj.entry(rel.to).or_default().push((rel.from, rel.relation_type));
570        }
571
572        // BFS over adjacency list
573        let mut visited: HashSet<StrId> = HashSet::new();
574        let mut parent: HashMap<StrId, StrId> = HashMap::new();
575        let mut queue: VecDeque<StrId> = VecDeque::new();
576
577        visited.insert(from_id);
578        queue.push_back(from_id);
579
580        while let Some(current) = queue.pop_front() {
581            if current == to_id {
582                break;
583            }
584
585            if let Some(neighbors) = adj.get(&current) {
586                for &(neighbor, _) in neighbors {
587                    if visited.insert(neighbor) {
588                        parent.insert(neighbor, current);
589                        queue.push_back(neighbor);
590                    }
591                }
592            }
593        }
594
595        if !parent.contains_key(&to_id) && from_id != to_id {
596            return Err(MCSError::MemoryError(format!(
597                "No path found between '{from}' and '{to}'"
598            )));
599        }
600
601        // Reconstruct path
602        let mut path: Vec<String> = Vec::new();
603        let mut cur = to_id;
604        loop {
605            path.push(self.interner.lookup(cur).to_string());
606            if cur == from_id {
607                break;
608            }
609            cur = *parent.get(&cur).ok_or_else(|| {
610                MCSError::MemoryError("Path reconstruction failed".into())
611            })?;
612        }
613        path.reverse();
614        Ok(path)
615    }
616
617    /// Rewrite the binary log from the current in-memory state.
618    /// After compaction the log contains only the minimal set of records
619    /// needed to reconstruct the graph (all creates, no deletes).
620    /// Crash-safe: writes to a temp file, then atomically renames (C3).
621    pub fn compact(&mut self) -> Result<()> {
622        // 1. Collect current state as create-records
623        let mut create_entities: Vec<Entity> = Vec::new();
624        let mut create_relations: Vec<Relation> = Vec::new();
625
626        for slot in &self.entity_slots {
627            if let Some(stored) = slot.as_ref().filter(|e| e.is_live()) {
628                create_entities.push(self.entity_to_output(stored));
629            }
630        }
631        for rel in &self.relations {
632            create_relations.push(Relation {
633                from: self.interner.lookup(rel.from).to_string(),
634                to: self.interner.lookup(rel.to).to_string(),
635                relation_type: self.interner.lookup(rel.relation_type).to_string(),
636            });
637        }
638
639        // 2. Write to a temp file first
640        let tmp_path = self.store.path().with_extension("tmp");
641        let mut tmp_store = BinaryStore::new(&tmp_path).map_err(MCSError::IoError)?;
642        for entity in &create_entities {
643            let mut buf = Vec::new();
644            store_enc::encode_create_entity(&mut buf, &entity.name, &entity.entity_type, &entity.observations)
645                .map_err(MCSError::IoError)?;
646            tmp_store.write_record(RecordKind::CreateEntity, &buf).map_err(MCSError::IoError)?;
647        }
648        for relation in &create_relations {
649            let mut buf = Vec::new();
650            store_enc::encode_create_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
651                .map_err(MCSError::IoError)?;
652            tmp_store.write_record(RecordKind::CreateRelation, &buf).map_err(MCSError::IoError)?;
653        }
654        tmp_store.flush_and_sync().map_err(MCSError::IoError)?;
655        drop(tmp_store);
656
657        // 3. Atomically rename over the original (atomic on POSIX)
658        std::fs::rename(&tmp_path, self.store.path()).map_err(MCSError::IoError)?;
659
660        // 4. Reopen the store with the new file
661        self.store = BinaryStore::new(self.store.path()).map_err(MCSError::IoError)?;
662
663        Ok(())
664    }
665
666    // ---- Public API with write-ahead log (C1) and error propagation ----
667
668    pub fn create_entities(&mut self, entities: &[Entity]) -> Result<Vec<Entity>> {
669        // Validate up front so an invalid entity never produces partial writes.
670        for entity in entities {
671            if entity.name.is_empty() {
672                return Err(MCSError::InvalidParams(
673                    "Entity name must not be empty".into(),
674                ));
675            }
676        }
677        let mut created = Vec::new();
678        for entity in entities {
679            // Check dedup before writing (using non-interning lookup)
680            let existing = self.interner.get_optional(&entity.name)
681                .and_then(|id| {
682                    let hash = self.interner.get_hash(id);
683                    self.name_table.lookup(hash, id)
684                });
685            if existing.is_some() {
686                continue;
687            }
688            // Write-ahead: encode and log before mutating state
689            let mut buf = Vec::new();
690            store_enc::encode_create_entity(&mut buf, &entity.name, &entity.entity_type, &entity.observations)
691                .map_err(MCSError::IoError)?;
692            self.store.write_record(RecordKind::CreateEntity, &buf)
693                .map_err(MCSError::IoError)?;
694
695            let name_id = self.interner.intern(&entity.name);
696            let hash = self.interner.get_hash(name_id);
697            let type_id = self.interner.intern(&entity.entity_type);
698            let obs_ids: Vec<StrId> = entity
699                .observations
700                .iter()
701                .map(|o| self.interner.intern(o))
702                .collect();
703            let slot = self.entity_slots.len() as u32;
704            self.search
705                .index_entity(&mut self.interner, slot, name_id, type_id, &obs_ids);
706            self.entity_slots.push(Some(StoredEntity {
707                state: ENTITY_SLOT_LIVE,
708                name: name_id,
709                entity_type: type_id,
710                observations: obs_ids,
711            }));
712            self.name_table.insert(hash, name_id, slot);
713            created.push(Entity {
714                name: entity.name.clone(),
715                entity_type: entity.entity_type.clone(),
716                observations: entity.observations.clone(),
717            });
718        }
719        Ok(created)
720    }
721
722    pub fn create_relations(&mut self, relations: &[Relation]) -> Result<Vec<Relation>> {
723        // Validate up front so an invalid relation never produces partial writes.
724        for relation in relations {
725            if relation.from.is_empty() || relation.to.is_empty() {
726                return Err(MCSError::InvalidParams(
727                    "Relation endpoints must not be empty".into(),
728                ));
729            }
730        }
731        let mut created = Vec::new();
732        // Build a dedup set for O(1) duplicate checks (P5)
733        let mut rel_set: HashSet<(StrId, StrId, StrId)> = HashSet::new();
734        for rel in &self.relations {
735            rel_set.insert((rel.from, rel.to, rel.relation_type));
736        }
737        for relation in relations {
738            let from_id = self.interner.intern(&relation.from);
739            let to_id = self.interner.intern(&relation.to);
740            let type_id = self.interner.intern(&relation.relation_type);
741            if !rel_set.insert((from_id, to_id, type_id)) {
742                continue;
743            }
744            // Write-ahead: log before mutation
745            let mut buf = Vec::new();
746            store_enc::encode_create_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
747                .map_err(MCSError::IoError)?;
748            self.store.write_record(RecordKind::CreateRelation, &buf)
749                .map_err(MCSError::IoError)?;
750
751            self.relations.push(StoredRelation {
752                from: from_id,
753                to: to_id,
754                relation_type: type_id,
755            });
756            created.push(Relation {
757                from: relation.from.clone(),
758                to: relation.to.clone(),
759                relation_type: relation.relation_type.clone(),
760            });
761        }
762        Ok(created)
763    }
764
765    pub fn add_observations(&mut self, entity_name: &str, contents: &[String]) -> Result<Vec<String>> {
766        let name_id = self.interner.get_optional(entity_name)
767            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
768        let hash = self.interner.get_hash(name_id);
769        let slot = self
770            .name_table
771            .lookup(hash, name_id)
772            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
773        let stored = self
774            .entity_slots
775            .get_mut(slot as usize)
776            .and_then(|e| e.as_mut())
777            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
778
779        // Deduplicate new observations (P7) — use HashSet for O(1) lookups
780        let existing: HashSet<StrId> = stored.observations.iter().copied().collect();
781        let mut added = Vec::new();
782        let mut interned_added = Vec::new();
783        for content in contents {
784            let cid = self.interner.intern(content);
785            if existing.contains(&cid) {
786                continue;
787            }
788            stored.observations.push(cid);
789            interned_added.push(cid);
790            added.push(content.clone());
791        }
792        if !added.is_empty() {
793            // Write-ahead: log before re-indexing
794            let mut buf = Vec::new();
795            store_enc::encode_add_observations(&mut buf, entity_name, &added)
796                .map_err(MCSError::IoError)?;
797            self.store.write_record(RecordKind::AddObservations, &buf)
798                .map_err(MCSError::IoError)?;
799
800            self.search.remove_entity(slot);
801            self.search
802                .index_entity(&mut self.interner, slot, stored.name, stored.entity_type, &stored.observations);
803        }
804        Ok(added)
805    }
806
807    pub fn delete_entities(&mut self, entity_names: &[String]) -> Result<()> {
808        let mut deleted_names = Vec::new();
809        for name in entity_names {
810            let name_id_opt = self.interner.get_optional(name);
811            if let Some(name_id) = name_id_opt {
812                let hash = self.interner.get_hash(name_id);
813                if let Some(slot) = self.name_table.lookup(hash, name_id)
814                    && let Some(Some(_)) = self.entity_slots.get(slot as usize)
815                {
816                    // Write-ahead: log before mutation
817                    let mut buf = Vec::new();
818                    store_enc::encode_delete_entity(&mut buf, name)
819                        .map_err(MCSError::IoError)?;
820                    self.store.write_record(RecordKind::DeleteEntity, &buf)
821                        .map_err(MCSError::IoError)?;
822
823                    self.entity_slots[slot as usize] = None;
824                    self.search.remove_entity(slot);
825                    self.name_table.remove(hash, name_id);
826                    deleted_names.push(name.clone());
827                }
828            }
829        }
830        if !deleted_names.is_empty() {
831            // Use a HashSet for O(1) retain checks (P5)
832            let deleted_ids: HashSet<StrId> = deleted_names.iter()
833                .map(|n| self.interner.intern(n))
834                .collect();
835            self.relations
836                .retain(|r| !deleted_ids.contains(&r.from) && !deleted_ids.contains(&r.to));
837        }
838        Ok(())
839    }
840
841    pub fn delete_observations(&mut self, entity_name: &str, observations: &[String]) -> Result<()> {
842        let name_id = self.interner.get_optional(entity_name)
843            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
844        let hash = self.interner.get_hash(name_id);
845        let slot = self
846            .name_table
847            .lookup(hash, name_id)
848            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
849        let stored = self
850            .entity_slots
851            .get_mut(slot as usize)
852            .and_then(|e| e.as_mut())
853            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
854        let remove_ids: HashSet<StrId> = observations.iter().map(|o| self.interner.intern(o)).collect();
855        stored.observations.retain(|o| !remove_ids.contains(o));
856        // Write-ahead: log before re-indexing
857        let mut buf = Vec::new();
858        store_enc::encode_delete_observations(&mut buf, entity_name, observations)
859            .map_err(MCSError::IoError)?;
860        self.store.write_record(RecordKind::DeleteObservations, &buf)
861            .map_err(MCSError::IoError)?;
862
863        self.search.remove_entity(slot);
864        self.search
865            .index_entity(&mut self.interner, slot, stored.name, stored.entity_type, &stored.observations);
866        Ok(())
867    }
868
869    pub fn delete_relations(&mut self, relations: &[Relation]) -> Result<()> {
870        // Collect targets into a HashSet for O(1) retain checks (P5)
871        let rels: HashSet<(StrId, StrId, StrId)> = relations
872            .iter()
873            .map(|r| {
874                (
875                    self.interner.intern(&r.from),
876                    self.interner.intern(&r.to),
877                    self.interner.intern(&r.relation_type),
878                )
879            })
880            .collect();
881        self.relations
882            .retain(|r| !rels.contains(&(r.from, r.to, r.relation_type)));
883        for relation in relations {
884            let mut buf = Vec::new();
885            store_enc::encode_delete_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
886                .map_err(MCSError::IoError)?;
887            self.store.write_record(RecordKind::DeleteRelation, &buf)
888                .map_err(MCSError::IoError)?;
889        }
890        Ok(())
891    }
892
893    pub fn read_graph(&self) -> KnowledgeGraphOut {
894        let entities: Vec<Entity> = self
895            .entity_slots
896            .iter()
897            .filter_map(|s| s.as_ref().filter(|e| e.is_live()))
898            .map(|stored| self.entity_to_output(stored))
899            .collect();
900        let rels: Vec<Relation> = self
901            .relations
902            .iter()
903            .map(|r| Relation {
904                from: self.interner.lookup(r.from).to_string(),
905                to: self.interner.lookup(r.to).to_string(),
906                relation_type: self.interner.lookup(r.relation_type).to_string(),
907            })
908            .collect();
909        KnowledgeGraphOut { entities, relations: rels }
910    }
911
912    pub fn search_nodes(&self, query: &str) -> KnowledgeGraphOut {
913        let matched = self.search.search(query, &self.interner);
914        let entities: Vec<Entity> = matched
915            .iter()
916            .filter_map(|&slot| {
917                self.entity_slots
918                    .get(slot as usize)?
919                    .as_ref()
920                    .filter(|e| e.is_live())
921                    .map(|stored| self.entity_to_output(stored))
922            })
923            .collect();
924        let entity_names: HashSet<StrId> = entities.iter()
925            .filter_map(|e| self.interner.get_optional(&e.name))
926            .collect();
927        let rels: Vec<Relation> = self
928            .relations
929            .iter()
930            .filter(|r| entity_names.contains(&r.from) || entity_names.contains(&r.to))
931            .map(|r| Relation {
932                from: self.interner.lookup(r.from).to_string(),
933                to: self.interner.lookup(r.to).to_string(),
934                relation_type: self.interner.lookup(r.relation_type).to_string(),
935            })
936            .collect();
937        KnowledgeGraphOut { entities, relations: rels }
938    }
939
940    pub fn open_nodes(&self, names: &[String]) -> KnowledgeGraphOut {
941        let name_ids: HashSet<StrId> = names.iter()
942            .filter_map(|n| self.interner.get_optional(n))
943            .collect();
944        let entities: Vec<Entity> = self
945            .entity_slots
946            .iter()
947            .filter_map(|s| {
948                s.as_ref().and_then(|stored| {
949                    if stored.is_live() && name_ids.contains(&stored.name) {
950                        Some(self.entity_to_output(stored))
951                    } else {
952                        None
953                    }
954                })
955            })
956            .collect();
957        let matched_names: HashSet<StrId> = entities.iter()
958            .filter_map(|e| self.interner.get_optional(&e.name))
959            .collect();
960        let rels: Vec<Relation> = self
961            .relations
962            .iter()
963            .filter(|r| matched_names.contains(&r.from) || matched_names.contains(&r.to))
964            .map(|r| Relation {
965                from: self.interner.lookup(r.from).to_string(),
966                to: self.interner.lookup(r.to).to_string(),
967                relation_type: self.interner.lookup(r.relation_type).to_string(),
968            })
969            .collect();
970        KnowledgeGraphOut { entities, relations: rels }
971    }
972
973    // -----------------------------------------------------------------------
974    // Internal helpers
975    // -----------------------------------------------------------------------
976
977    fn entity_to_output(&self, stored: &StoredEntity) -> Entity {
978        Entity {
979            name: self.interner.lookup(stored.name).to_string(),
980            entity_type: self.interner.lookup(stored.entity_type).to_string(),
981            observations: stored
982                .observations
983                .iter()
984                .map(|o| self.interner.lookup(*o).to_string())
985                .collect(),
986        }
987    }
988
989    // --- Flush & sync ---
990
991    /// Flush and fsync the log to stable storage.
992    pub fn flush_and_sync(&mut self) -> Result<()> {
993        self.store.flush_and_sync().map_err(MCSError::IoError)
994    }
995}