1use std::collections::VecDeque;
2use std::path::Path;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::{Arc, Condvar, Mutex as StdMutex};
5
6use ahash::{AHashMap, AHashSet};
7use arc_swap::ArcSwap;
8
9use serde::ser::{Serialize, SerializeSeq, SerializeStruct, Serializer};
10
11use crate::errors::{MCSError, Result};
12use crate::intern::{StrId, StringInterner};
13use crate::types::{Entity, Relation, KnowledgeGraphOut};
14use crate::search::SearchIndex;
15use crate::store::{self as store_enc, BinaryStore, RecordKind};
16
17const ENTITY_SLOT_LIVE: u8 = 1;
18const NAME_TABLE_SHARDS: usize = 4;
19
20#[cfg(target_arch = "x86_64")]
25#[inline(always)]
26unsafe fn prefetch_addr(addr: *const u8) {
27 std::arch::x86_64::_mm_prefetch::<3>(addr);
29}
30
31#[cfg(not(target_arch = "x86_64"))]
32#[inline(always)]
33const unsafe fn prefetch_addr(_addr: *const u8) {}
34
35fn sync_parent_dir(path: &Path) -> std::io::Result<()> {
38 let dir = path.parent().filter(|p| !p.as_os_str().is_empty());
39 let dir = match dir {
40 Some(d) => d,
41 None => Path::new("."),
42 };
43 match std::fs::File::open(dir) {
44 Ok(f) => match f.sync_all() {
45 Ok(()) => Ok(()),
46 Err(e) if e.kind() == std::io::ErrorKind::InvalidInput => Ok(()),
48 Err(e) => Err(e),
49 },
50 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
51 Err(e) => Err(e),
52 }
53}
54
55#[derive(Clone)]
64#[cfg_attr(feature = "cache_align", repr(align(64)))]
65pub(crate) struct StoredEntity {
66 state: u8,
67 pub(crate) name: StrId,
68 pub(crate) entity_type: StrId,
69 pub(crate) observations: Vec<StrId>,
70}
71
72impl StoredEntity {
73 pub(crate) const fn is_live(&self) -> bool {
74 self.state == ENTITY_SLOT_LIVE
75 }
76}
77
78#[derive(Clone)]
82#[cfg_attr(feature = "cache_align", repr(align(16)))]
83pub(crate) struct StoredRelation {
84 pub(crate) from: StrId,
85 pub(crate) to: StrId,
86 pub(crate) relation_type: StrId,
87}
88
89pub struct GraphView<'a> {
103 kg: &'a KnowledgeGraph,
104 entities: Vec<&'a StoredEntity>,
105 relations: Vec<&'a StoredRelation>,
106}
107
108impl GraphView<'_> {
109 pub fn to_owned_out(&self) -> KnowledgeGraphOut {
113 KnowledgeGraphOut {
114 entities: self.entities.iter().map(|e| self.kg.entity_to_output(e)).collect(),
115 relations: self.relations.iter().map(|r| self.kg.relation_to_output(r)).collect(),
116 }
117 }
118}
119
120impl Serialize for GraphView<'_> {
121 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
122 let mut st = s.serialize_struct("KnowledgeGraphOut", 2)?;
123 st.serialize_field("entities", &EntityListRef { kg: self.kg, items: &self.entities })?;
124 st.serialize_field("relations", &RelationListRef { kg: self.kg, items: &self.relations })?;
125 st.end()
126 }
127}
128
129struct EntityListRef<'a> { kg: &'a KnowledgeGraph, items: &'a [&'a StoredEntity] }
130impl Serialize for EntityListRef<'_> {
131 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
132 let mut seq = s.serialize_seq(Some(self.items.len()))?;
133 for &e in self.items {
134 seq.serialize_element(&EntityRef { kg: self.kg, e })?;
135 }
136 seq.end()
137 }
138}
139
140struct RelationListRef<'a> { kg: &'a KnowledgeGraph, items: &'a [&'a StoredRelation] }
141impl Serialize for RelationListRef<'_> {
142 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
143 let mut seq = s.serialize_seq(Some(self.items.len()))?;
144 for &r in self.items {
145 seq.serialize_element(&RelationRef { kg: self.kg, r })?;
146 }
147 seq.end()
148 }
149}
150
151struct EntityRef<'a> { kg: &'a KnowledgeGraph, e: &'a StoredEntity }
152impl Serialize for EntityRef<'_> {
153 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
154 let mut st = s.serialize_struct("Entity", 3)?;
155 st.serialize_field("name", self.kg.interner.lookup(self.e.name))?;
156 st.serialize_field("entityType", self.kg.interner.lookup(self.e.entity_type))?;
157 st.serialize_field("observations", &ObsRef { kg: self.kg, obs: &self.e.observations })?;
158 st.end()
159 }
160}
161
162struct ObsRef<'a> { kg: &'a KnowledgeGraph, obs: &'a [StrId] }
163impl Serialize for ObsRef<'_> {
164 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
165 let mut seq = s.serialize_seq(Some(self.obs.len()))?;
166 for &o in self.obs {
167 seq.serialize_element(self.kg.interner.lookup(o))?;
168 }
169 seq.end()
170 }
171}
172
173struct RelationRef<'a> { kg: &'a KnowledgeGraph, r: &'a StoredRelation }
174impl Serialize for RelationRef<'_> {
175 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
176 let mut st = s.serialize_struct("Relation", 3)?;
177 st.serialize_field("from", self.kg.interner.lookup(self.r.from))?;
178 st.serialize_field("to", self.kg.interner.lookup(self.r.to))?;
179 st.serialize_field("relationType", self.kg.interner.lookup(self.r.relation_type))?;
180 st.end()
181 }
182}
183
184#[derive(Clone, Copy, PartialEq, Eq, Debug)]
186pub enum Direction {
187 Out,
189 In,
191 Both,
193}
194
195impl Direction {
196 pub fn parse(s: Option<&str>) -> Self {
198 match s {
199 Some("out") => Direction::Out,
200 Some("in") => Direction::In,
201 _ => Direction::Both,
202 }
203 }
204}
205
206fn sanitize_label(s: &str) -> String {
208 let mut out = String::with_capacity(s.len());
209 for c in s.chars() {
210 match c {
211 '"' => out.push('\''),
212 '\n' | '\r' => out.push(' '),
213 _ => out.push(c),
214 }
215 }
216 out
217}
218
219const EMPTY_SLOT: u8 = 0xFF;
229
230#[inline(always)]
231const fn h2(hash: u64) -> u8 {
232 (hash & 0x7F) as u8
233}
234
235#[inline(always)]
236const fn h1(hash: u64, mask: usize) -> usize {
237 ((hash >> 7) as usize) & mask
238}
239
240#[derive(Clone)]
241struct NameTableShard {
242 ctrl: Vec<u8>, names: Vec<StrId>,
244 slots: Vec<u32>,
245 mask: usize,
246 count: usize,
247}
248
249impl NameTableShard {
250 fn new(capacity: usize) -> Self {
251 let cap = capacity.next_power_of_two().max(16);
252 Self {
253 ctrl: vec![EMPTY_SLOT; cap],
254 names: vec![StrId::EMPTY; cap],
255 slots: vec![u32::MAX; cap],
256 mask: cap - 1,
257 count: 0,
258 }
259 }
260
261 #[inline(always)]
262 fn lookup(&self, hash: u64, name: StrId) -> Option<u32> {
263 let stamp = h2(hash);
264 let mask = self.mask;
265 let mut idx = h1(hash, mask);
266 let ctrl = self.ctrl.as_ptr();
267 let names = self.names.as_ptr();
268 let slots = self.slots.as_ptr();
269 let len = self.ctrl.len();
270
271 for _ in 0..len {
272 let prefetch_idx = idx.wrapping_add(4) & mask;
274 unsafe { prefetch_addr(ctrl.add(prefetch_idx)) };
275
276 unsafe {
278 let c = *ctrl.add(idx);
279 if c & 0x80 != 0 {
281 return None;
282 }
283 if c == stamp && *names.add(idx) == name {
285 return Some(*slots.add(idx));
286 }
287 }
288 idx = (idx + 1) & mask;
289 }
290 None
291 }
292
293 fn insert(&mut self, interner: &StringInterner, hash: u64, name: StrId, slot: u32) {
294 if self.count * 4 > self.ctrl.len() * 3 {
295 self.grow(interner);
296 }
297 let stamp = h2(hash);
298 let mask = self.mask;
299 let mut idx = h1(hash, mask);
300 loop {
301 unsafe {
303 if *self.ctrl.get_unchecked(idx) & 0x80 != 0 {
304 *self.ctrl.get_unchecked_mut(idx) = stamp;
305 *self.names.get_unchecked_mut(idx) = name;
306 *self.slots.get_unchecked_mut(idx) = slot;
307 self.count += 1;
308 return;
309 }
310 }
311 idx = (idx + 1) & mask;
312 }
313 }
314
315 fn remove(&mut self, interner: &StringInterner, hash: u64, name: StrId) {
316 let stamp = h2(hash);
317 let mask = self.mask;
318 let mut idx = h1(hash, mask);
319 let len = self.ctrl.len();
320 for _ in 0..len {
321 if self.ctrl[idx] & 0x80 != 0 {
322 return;
323 }
324 if self.ctrl[idx] == stamp && self.names[idx] == name {
325 self.ctrl[idx] = EMPTY_SLOT;
327 self.names[idx] = StrId::EMPTY;
328 self.slots[idx] = u32::MAX;
329 self.count -= 1;
330
331 let mut next = (idx + 1) & mask;
332 while self.ctrl[next] & 0x80 == 0 {
333 let nn = self.names[next];
334 let ns = self.slots[next];
335 let nh = interner.get_hash(nn);
338 self.ctrl[next] = EMPTY_SLOT;
339 self.names[next] = StrId::EMPTY;
340 self.slots[next] = u32::MAX;
341 self.count -= 1;
342
343 let nstamp = h2(nh);
345 let mut re_idx = h1(nh, mask);
346 while self.ctrl[re_idx] & 0x80 == 0 {
347 re_idx = (re_idx + 1) & mask;
348 }
349 self.ctrl[re_idx] = nstamp;
350 self.names[re_idx] = nn;
351 self.slots[re_idx] = ns;
352 self.count += 1;
353
354 next = (next + 1) & mask;
355 }
356 return;
357 }
358 idx = (idx + 1) & mask;
359 }
360 }
361
362 fn grow(&mut self, interner: &StringInterner) {
363 let new_cap = self.ctrl.len() * 2;
364 let new_mask = new_cap - 1;
365 let mut new_ctrl = vec![EMPTY_SLOT; new_cap];
366 let mut new_names = vec![StrId::EMPTY; new_cap];
367 let mut new_slots = vec![u32::MAX; new_cap];
368
369 for i in 0..self.ctrl.len() {
370 if self.ctrl[i] & 0x80 == 0 {
371 let name = self.names[i];
373 let hash = interner.get_hash(name);
374 let stamp = h2(hash);
375 let mut idx = h1(hash, new_mask);
376 while new_ctrl[idx] & 0x80 == 0 {
377 idx = (idx + 1) & new_mask;
378 }
379 new_ctrl[idx] = stamp;
380 new_names[idx] = name;
381 new_slots[idx] = self.slots[i];
382 }
383 }
384
385 self.ctrl = new_ctrl;
386 self.names = new_names;
387 self.slots = new_slots;
388 self.mask = new_mask;
389 }
390}
391
392#[derive(Clone)]
393struct ShardedNameTable {
394 shards: [NameTableShard; NAME_TABLE_SHARDS],
395}
396
397impl ShardedNameTable {
398 fn new(capacity_per_shard: usize) -> Self {
399 Self {
400 shards: [
401 NameTableShard::new(capacity_per_shard),
402 NameTableShard::new(capacity_per_shard),
403 NameTableShard::new(capacity_per_shard),
404 NameTableShard::new(capacity_per_shard),
405 ],
406 }
407 }
408
409 #[inline(always)]
410 const fn shard(hash: u64) -> usize {
411 (hash as usize) & (NAME_TABLE_SHARDS - 1)
412 }
413
414 #[inline(always)]
415 fn lookup(&self, hash: u64, name: StrId) -> Option<u32> {
416 self.shards[Self::shard(hash)].lookup(hash, name)
417 }
418
419 #[inline(always)]
420 fn insert(&mut self, interner: &StringInterner, hash: u64, name: StrId, slot: u32) {
421 self.shards[Self::shard(hash)].insert(interner, hash, name, slot);
422 }
423
424 #[inline(always)]
425 fn remove(&mut self, interner: &StringInterner, hash: u64, name: StrId) {
426 self.shards[Self::shard(hash)].remove(interner, hash, name);
427 }
428}
429
430pub struct KnowledgeGraph {
434 interner: StringInterner,
435 entity_slots: Vec<Option<StoredEntity>>,
436 free_slots: Vec<u32>,
439 name_table: ShardedNameTable,
440 relations: Vec<StoredRelation>,
441 adjacency: AHashMap<StrId, Vec<(StrId, StrId)>>,
445 search: SearchIndex,
446 store: BinaryStore,
447}
448
449#[derive(Clone)]
454pub struct ReadSnapshot {
455 pub(crate) interner: StringInterner,
456 pub(crate) entity_slots: Arc<[Option<StoredEntity>]>,
457 #[allow(dead_code)]
458 free_slots: Vec<u32>,
459 name_table: ShardedNameTable,
460 pub(crate) relations: Arc<[StoredRelation]>,
461 adjacency: AHashMap<StrId, Vec<(StrId, StrId)>>,
462 search: SearchIndex,
463}
464
465pub(crate) fn push_json_str(buf: &mut String, s: &str) {
468 buf.push('"');
469 for c in s.chars() {
470 match c {
471 '"' => buf.push_str("\\\""),
472 '\\' => buf.push_str("\\\\"),
473 '\n' => buf.push_str("\\n"),
474 '\r' => buf.push_str("\\r"),
475 '\t' => buf.push_str("\\t"),
476 c if c.is_control() => {
477 use std::fmt::Write;
478 write!(buf, "\\u{:04x}", c as u32).unwrap();
479 }
480 c => buf.push(c),
481 }
482 }
483 buf.push('"');
484}
485
486pub struct ReadGraphView<'a> {
489 snap: &'a ReadSnapshot,
490 entities: Vec<&'a StoredEntity>,
491 relations: Vec<&'a StoredRelation>,
492}
493
494impl Serialize for ReadGraphView<'_> {
495 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
496 let mut st = s.serialize_struct("KnowledgeGraphOut", 2)?;
497 st.serialize_field("entities", &ReadEntityListRef { snap: self.snap, items: &self.entities })?;
498 st.serialize_field("relations", &ReadRelationListRef { snap: self.snap, items: &self.relations })?;
499 st.end()
500 }
501}
502
503struct ReadEntityListRef<'a> { snap: &'a ReadSnapshot, items: &'a [&'a StoredEntity] }
504impl Serialize for ReadEntityListRef<'_> {
505 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
506 let mut seq = s.serialize_seq(Some(self.items.len()))?;
507 for &e in self.items {
508 seq.serialize_element(&ReadEntityRef { snap: self.snap, e })?;
509 }
510 seq.end()
511 }
512}
513
514struct ReadRelationListRef<'a> { snap: &'a ReadSnapshot, items: &'a [&'a StoredRelation] }
515impl Serialize for ReadRelationListRef<'_> {
516 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
517 let mut seq = s.serialize_seq(Some(self.items.len()))?;
518 for &r in self.items {
519 seq.serialize_element(&ReadRelationRef { snap: self.snap, r })?;
520 }
521 seq.end()
522 }
523}
524
525struct ReadEntityRef<'a> { snap: &'a ReadSnapshot, e: &'a StoredEntity }
526impl Serialize for ReadEntityRef<'_> {
527 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
528 let mut st = s.serialize_struct("Entity", 3)?;
529 st.serialize_field("name", self.snap.interner.lookup(self.e.name))?;
530 st.serialize_field("entityType", self.snap.interner.lookup(self.e.entity_type))?;
531 st.serialize_field("observations", &ReadObsRef { snap: self.snap, obs: &self.e.observations })?;
532 st.end()
533 }
534}
535
536struct ReadObsRef<'a> { snap: &'a ReadSnapshot, obs: &'a [StrId] }
537impl Serialize for ReadObsRef<'_> {
538 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
539 let mut seq = s.serialize_seq(Some(self.obs.len()))?;
540 for &o in self.obs {
541 seq.serialize_element(self.snap.interner.lookup(o))?;
542 }
543 seq.end()
544 }
545}
546
547struct ReadRelationRef<'a> { snap: &'a ReadSnapshot, r: &'a StoredRelation }
548impl Serialize for ReadRelationRef<'_> {
549 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
550 let mut st = s.serialize_struct("Relation", 3)?;
551 st.serialize_field("from", self.snap.interner.lookup(self.r.from))?;
552 st.serialize_field("to", self.snap.interner.lookup(self.r.to))?;
553 st.serialize_field("relationType", self.snap.interner.lookup(self.r.relation_type))?;
554 st.end()
555 }
556}
557
558impl ReadSnapshot {
560
561 pub fn read_graph_json(&self) -> String {
564 let cap = self.entity_slots.len() * 64 + self.relations.len() * 60 + 128;
566 let mut buf = String::with_capacity(cap);
567
568 buf.push_str(r#"{"entities":["#);
570 let mut first = true;
571 for slot in self.entity_slots.iter() {
572 let Some(e) = slot.as_ref().filter(|e| e.is_live()) else { continue };
573 if first { first = false } else { buf.push(',') }
574 buf.push('{');
575 buf.push_str(r#""name":"#);
577 push_json_str(&mut buf, self.interner.lookup(e.name));
578 buf.push(',');
579 buf.push_str(r#""entityType":"#);
581 push_json_str(&mut buf, self.interner.lookup(e.entity_type));
582 buf.push(',');
583 buf.push_str(r#""observations":["#);
585 for (oi, o) in e.observations.iter().enumerate() {
586 if oi > 0 { buf.push(',') }
587 push_json_str(&mut buf, self.interner.lookup(*o));
588 }
589 buf.push_str("]}");
590 }
591
592 buf.push_str(r#"],"relations":["#);
594 first = true;
595 for r in self.relations.iter() {
596 if first { first = false } else { buf.push(',') }
597 buf.push('{');
598 buf.push_str(r#""from":"#);
599 push_json_str(&mut buf, self.interner.lookup(r.from));
600 buf.push(',');
601 buf.push_str(r#""to":"#);
602 push_json_str(&mut buf, self.interner.lookup(r.to));
603 buf.push(',');
604 buf.push_str(r#""relationType":"#);
605 push_json_str(&mut buf, self.interner.lookup(r.relation_type));
606 buf.push('}');
607 }
608 buf.push_str("]}");
609
610 buf
611 }
612
613 pub fn read_graph_view(&self) -> ReadGraphView<'_> {
616 let entities: Vec<&StoredEntity> = self
617 .entity_slots
618 .iter()
619 .filter_map(|s| s.as_ref().filter(|e| e.is_live()))
620 .collect();
621 let relations: Vec<&StoredRelation> = self.relations.iter().collect();
622 ReadGraphView { snap: self, entities, relations }
623 }
624
625 fn lookup_live_slot(&self, name: &str) -> Option<u32> {
626 let name_id = self.interner.get_optional(name)?;
627 let hash = self.interner.get_hash(name_id);
628 let slot = self.name_table.lookup(hash, name_id)?;
629 self.entity_slots
630 .get(slot as usize)
631 .and_then(|s| s.as_ref())
632 .filter(|e| e.is_live())?;
633 Some(slot)
634 }
635
636 fn entity_by_name_id(&self, name_id: StrId) -> Option<Entity> {
637 let hash = self.interner.get_hash(name_id);
638 let slot = self.name_table.lookup(hash, name_id)?;
639 let e = self.entity_slots.get(slot as usize)?.as_ref()?;
640 Some(self.entity_to_output(e))
641 }
642
643 pub(crate) fn entity_to_output(&self, e: &StoredEntity) -> Entity {
644 Entity {
645 name: self.interner.lookup(e.name).to_string(),
646 entity_type: self.interner.lookup(e.entity_type).to_string(),
647 observations: e
648 .observations
649 .iter()
650 .map(|o| self.interner.lookup(*o).to_string())
651 .collect(),
652 }
653 }
654
655 pub(crate) fn relation_to_output(&self, r: &StoredRelation) -> Relation {
656 Relation {
657 from: self.interner.lookup(r.from).to_string(),
658 to: self.interner.lookup(r.to).to_string(),
659 relation_type: self.interner.lookup(r.relation_type).to_string(),
660 }
661 }
662
663 pub fn open_nodes(&self, names: &[String]) -> KnowledgeGraphOut {
665 let name_ids: std::collections::HashSet<StrId> = names
666 .iter()
667 .filter_map(|n| self.interner.get_optional(n))
668 .collect();
669 let entities: Vec<Entity> = self
670 .entity_slots
671 .iter()
672 .filter_map(|s| {
673 let e = s.as_ref()?;
674 if e.is_live() && name_ids.contains(&e.name) {
675 Some(self.entity_to_output(e))
676 } else {
677 None
678 }
679 })
680 .collect();
681 let matched: std::collections::HashSet<StrId> = entities.iter()
682 .filter_map(|e| self.interner.get_optional(&e.name))
683 .collect();
684 let relations: Vec<Relation> = self
685 .relations
686 .iter()
687 .filter(|r| matched.contains(&r.from) || matched.contains(&r.to))
688 .map(|r| self.relation_to_output(r))
689 .collect();
690 KnowledgeGraphOut { entities, relations }
691 }
692
693 pub fn read_graph(&self) -> KnowledgeGraphOut {
695 let entities: Vec<Entity> = self
696 .entity_slots
697 .iter()
698 .filter_map(|s| s.as_ref().filter(|e| e.is_live()))
699 .map(|e| self.entity_to_output(e))
700 .collect();
701 let relations: Vec<Relation> = self
702 .relations
703 .iter()
704 .map(|r| self.relation_to_output(r))
705 .collect();
706 KnowledgeGraphOut { entities, relations }
707 }
708
709 pub fn get_entity(&self, name: &str) -> Option<Entity> {
711 self.lookup_live_slot(name)?;
712 let name_id = self.interner.get_optional(name)?;
713 self.entity_by_name_id(name_id)
714 }
715
716 pub fn neighbors(
718 &self,
719 name: &str,
720 direction: Direction,
721 rtype: Option<&str>,
722 depth: u32,
723 ) -> Result<KnowledgeGraphOut> {
724 self.lookup_live_slot(name)
725 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
726 let start = self.interner.get_optional(name).unwrap();
727
728 let rtype_id = match rtype {
729 Some(r) => match self.interner.get_optional(r) {
730 Some(id) => Some(id),
731 None => {
732 let entities = self.entity_by_name_id(start).into_iter().collect();
733 return Ok(KnowledgeGraphOut { entities, relations: Vec::new() });
734 }
735 },
736 None => None,
737 };
738
739 let mut visited: AHashSet<StrId> = AHashSet::new();
740 visited.insert(start);
741
742 let type_ok = |r: &StoredRelation, rt: Option<StrId>| rt.is_none_or(|rt_id| r.relation_type == rt_id);
743
744 if depth == 1 {
745 for r in self.relations.iter().filter(|r| type_ok(r, rtype_id)) {
746 match direction {
747 Direction::Out => {
748 if r.from == start { visited.insert(r.to); }
749 }
750 Direction::In => {
751 if r.to == start { visited.insert(r.from); }
752 }
753 Direction::Both => {
754 if r.from == start { visited.insert(r.to); }
755 else if r.to == start { visited.insert(r.from); }
756 }
757 }
758 }
759 } else if depth >= 2 {
760 let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
761 match direction {
762 Direction::Both => {
763 for (&node, edges) in &self.adjacency {
764 for &(nb, rt) in edges {
765 if rtype_id.is_none_or(|rt_id| rt == rt_id) {
766 adj.entry(node).or_default().push(nb);
767 }
768 }
769 }
770 }
771 Direction::Out | Direction::In => {
772 for r in self.relations.iter().filter(|r| type_ok(r, rtype_id)) {
773 match direction {
774 Direction::Out => adj.entry(r.from).or_default().push(r.to),
775 Direction::In => adj.entry(r.to).or_default().push(r.from),
776 _ => unreachable!(),
777 }
778 }
779 }
780 }
781 let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
782 queue.push_back((start, 0));
783 while let Some((node, d)) = queue.pop_front() {
784 if d >= depth { continue; }
785 if let Some(nbrs) = adj.get(&node) {
786 for &nb in nbrs {
787 if visited.insert(nb) {
788 queue.push_back((nb, d + 1));
789 }
790 }
791 }
792 }
793 }
794
795 let mut entities = Vec::with_capacity(visited.len());
796 for &nid in &visited {
797 if let Some(e) = self.entity_by_name_id(nid) {
798 entities.push(e);
799 }
800 }
801 let relations: Vec<Relation> = self
802 .relations
803 .iter()
804 .filter(|r| type_ok(r, rtype_id) && visited.contains(&r.from) && visited.contains(&r.to))
805 .map(|r| self.relation_to_output(r))
806 .collect();
807 Ok(KnowledgeGraphOut { entities, relations })
808 }
809
810 pub fn describe_entity(&self, name: &str) -> Result<serde_json::Value> {
812 let name_id = self
813 .interner
814 .get_optional(name)
815 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
816 let entity = self
817 .entity_by_name_id(name_id)
818 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
819
820 let mut incident: Vec<Relation> = Vec::new();
821 let mut neighbor_seen: AHashSet<StrId> = AHashSet::new();
822 let mut neighbors: Vec<&str> = Vec::new();
823 for r in self.relations.iter() {
824 if r.from == name_id || r.to == name_id {
825 incident.push(self.relation_to_output(r));
826 let other = if r.from == name_id { r.to } else { r.from };
827 if other != name_id && neighbor_seen.insert(other) {
828 neighbors.push(self.interner.lookup(other));
829 }
830 }
831 }
832
833 Ok(serde_json::json!({
834 "entity": entity,
835 "relations": incident,
836 "neighbors": neighbors,
837 "degree": incident.len(),
838 }))
839 }
840
841 pub fn find_path(&self, from: &str, to: &str) -> Result<Vec<String>> {
843 let from_id = self
844 .interner
845 .get_optional(from)
846 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
847 let to_id = self
848 .interner
849 .get_optional(to)
850 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
851 if self.lookup_live_slot(from).is_none() {
852 return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
853 }
854 if self.lookup_live_slot(to).is_none() {
855 return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
856 }
857
858 let mut visited: AHashSet<StrId> = AHashSet::new();
860 let mut parent: AHashMap<StrId, StrId> = AHashMap::new();
861 let mut queue: VecDeque<StrId> = VecDeque::new();
862
863 visited.insert(from_id);
864 queue.push_back(from_id);
865
866 while let Some(current) = queue.pop_front() {
867 if current == to_id { break; }
868 if let Some(neighbors) = self.adjacency.get(¤t) {
869 for &(neighbor, _) in neighbors {
870 if visited.insert(neighbor) {
871 parent.insert(neighbor, current);
872 queue.push_back(neighbor);
873 }
874 }
875 }
876 }
877
878 if !visited.contains(&to_id) {
879 return Err(MCSError::MemoryError(format!(
880 "No path found between '{from}' and '{to}'"
881 )));
882 }
883
884 let mut path = Vec::new();
885 let mut cur = to_id;
886 path.push(self.interner.lookup(cur).to_string());
887 while let Some(&p) = parent.get(&cur) {
888 path.push(self.interner.lookup(p).to_string());
889 cur = p;
890 }
891 path.reverse();
892 Ok(path)
893 }
894
895 pub fn extract_subgraph(&self, names: &[String], depth: u32) -> Result<KnowledgeGraphOut> {
897 if names.is_empty() {
898 return Ok(KnowledgeGraphOut { entities: Vec::new(), relations: Vec::new() });
899 }
900 let mut visited: AHashSet<StrId> = AHashSet::new();
901 let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
902 for name in names {
903 if let Some(id) = self.interner.get_optional(name)
904 && visited.insert(id)
905 {
906 queue.push_back((id, 0));
907 }
908 }
909 let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
910 for (&node, edges) in &self.adjacency {
911 let nbrs: Vec<StrId> = edges.iter().map(|(to, _)| *to).collect();
912 adj.insert(node, nbrs);
913 }
914 while let Some((node, d)) = queue.pop_front() {
915 if d >= depth { continue; }
916 if let Some(nbrs) = adj.get(&node) {
917 for &nb in nbrs {
918 if visited.insert(nb) {
919 queue.push_back((nb, d + 1));
920 }
921 }
922 }
923 }
924 let mut entities: Vec<Entity> = Vec::with_capacity(visited.len());
925 for &nid in &visited {
926 if let Some(e) = self.entity_by_name_id(nid) {
927 entities.push(e);
928 }
929 }
930 let relations: Vec<Relation> = self
931 .relations
932 .iter()
933 .filter(|r| visited.contains(&r.from) && visited.contains(&r.to))
934 .map(|r| self.relation_to_output(r))
935 .collect();
936 Ok(KnowledgeGraphOut { entities, relations })
937 }
938
939 pub fn batch_get_entities(&self, names: &[String]) -> Vec<Option<Entity>> {
941 names.iter().map(|n| self.get_entity(n)).collect()
942 }
943
944 pub fn graph_stats(&self) -> serde_json::Value {
946 let entity_count = self
947 .entity_slots
948 .iter()
949 .filter(|s| s.as_ref().is_some_and(|e| e.is_live()))
950 .count();
951 let relation_count = self.relations.len();
952 let type_counts = self.entity_type_counts();
953 let relation_type_counts = self.relation_type_counts();
954 serde_json::json!({
955 "entities": entity_count,
956 "relations": relation_count,
957 "entityTypes": type_counts,
958 "relationTypes": relation_type_counts,
959 })
960 }
961
962 pub fn search_relations(&self, from: Option<&str>, to: Option<&str>, rtype: Option<&str>) -> Vec<Relation> {
964 let from_id = from.and_then(|n| self.interner.get_optional(n));
965 let to_id = to.and_then(|n| self.interner.get_optional(n));
966 let rtype_id = rtype.and_then(|n| self.interner.get_optional(n));
967 self.relations
968 .iter()
969 .filter(|r| {
970 from_id.is_none_or(|id| r.from == id)
971 && to_id.is_none_or(|id| r.to == id)
972 && rtype_id.is_none_or(|id| r.relation_type == id)
973 })
974 .map(|r| self.relation_to_output(r))
975 .collect()
976 }
977
978 pub fn entity_type_counts(&self) -> Vec<(String, usize)> {
980 let mut counts: AHashMap<StrId, usize> = AHashMap::new();
981 for slot in self.entity_slots.iter() {
982 if let Some(e) = slot.as_ref().filter(|e| e.is_live()) {
983 *counts.entry(e.entity_type).or_default() += 1;
984 }
985 }
986 let mut result: Vec<(String, usize)> = counts
987 .into_iter()
988 .map(|(id, c)| (self.interner.lookup(id).to_string(), c))
989 .collect();
990 result.sort_by(|a, b| a.0.cmp(&b.0));
991 result
992 }
993
994 pub fn relation_type_counts(&self) -> Vec<(String, usize)> {
996 let mut counts: AHashMap<StrId, usize> = AHashMap::new();
997 for r in self.relations.iter() {
998 *counts.entry(r.relation_type).or_default() += 1;
999 }
1000 let mut result: Vec<(String, usize)> = counts
1001 .into_iter()
1002 .map(|(id, c)| (self.interner.lookup(id).to_string(), c))
1003 .collect();
1004 result.sort_by(|a, b| a.0.cmp(&b.0));
1005 result
1006 }
1007
1008 pub fn export(&self, format: &str) -> Result<String> {
1010 match format {
1011 "json" => serde_json::to_string(&self.read_graph()).map_err(MCSError::JsonError),
1012 "mermaid" => Ok(self.export_mermaid()),
1013 "dot" => Ok(self.export_dot()),
1014 other => Err(MCSError::InvalidParams(format!(
1015 "Unknown export format '{other}' (expected json|mermaid|dot)"
1016 ))),
1017 }
1018 }
1019
1020 fn export_mermaid(&self) -> String {
1021 let mut out = String::with_capacity(4096);
1022 out.push_str("graph LR\n");
1023 for r in self.relations.iter() {
1024 let from = sanitize_label(self.interner.lookup(r.from));
1025 let to = sanitize_label(self.interner.lookup(r.to));
1026 let rt = sanitize_label(self.interner.lookup(r.relation_type));
1027 out.push_str(&format!(" {} -- \"{}\" --> {}\n", from, rt, to));
1028 }
1029 out
1030 }
1031
1032 fn export_dot(&self) -> String {
1033 let mut out = String::with_capacity(4096);
1034 out.push_str("digraph KG {\n");
1035 out.push_str(" rankdir=LR;\n");
1036 for slot in self.entity_slots.iter() {
1037 if let Some(e) = slot.as_ref().filter(|e| e.is_live()) {
1038 let name = sanitize_label(self.interner.lookup(e.name));
1039 let etype = sanitize_label(self.interner.lookup(e.entity_type));
1040 out.push_str(&format!(" \"{}\" [label=\"{}\n({})\"];\n", name, name, etype));
1041 }
1042 }
1043 for r in self.relations.iter() {
1044 let from = sanitize_label(self.interner.lookup(r.from));
1045 let to = sanitize_label(self.interner.lookup(r.to));
1046 let rt = sanitize_label(self.interner.lookup(r.relation_type));
1047 out.push_str(&format!(" \"{}\" -> \"{}\" [label=\"{}\"];\n", from, to, rt));
1048 }
1049 out.push_str("}\n");
1050 out
1051 }
1052
1053 pub fn find_all_paths(
1055 &self,
1056 from: &str,
1057 to: &str,
1058 max_depth: usize,
1059 max_paths: usize,
1060 ) -> Result<Vec<Vec<String>>> {
1061 let from_id = self
1062 .interner
1063 .get_optional(from)
1064 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
1065 let to_id = self
1066 .interner
1067 .get_optional(to)
1068 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
1069 if self.lookup_live_slot(from).is_none() {
1070 return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
1071 }
1072 if self.lookup_live_slot(to).is_none() {
1073 return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
1074 }
1075 if from_id == to_id {
1076 return Ok(vec![vec![from.to_string()]]);
1077 }
1078 let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::with_capacity(self.adjacency.len());
1079 for (&node, edges) in &self.adjacency {
1080 let nbrs: Vec<StrId> = edges.iter().map(|(to, _)| *to).collect();
1081 adj.insert(node, nbrs);
1082 }
1083 let mut all_paths: Vec<Vec<StrId>> = Vec::new();
1084 let mut current_path = Vec::new();
1085 let mut visited: AHashSet<StrId> = AHashSet::new();
1086 visited.insert(from_id);
1087 current_path.push(from_id);
1088 Self::dfs_all_paths(
1089 &adj,
1090 from_id,
1091 to_id,
1092 max_depth,
1093 max_paths,
1094 &mut visited,
1095 &mut current_path,
1096 &mut all_paths,
1097 );
1098 if all_paths.is_empty() {
1099 return Err(MCSError::MemoryError(format!(
1100 "No path found between '{from}' and '{to}'"
1101 )));
1102 }
1103 let result: Vec<Vec<String>> = all_paths
1104 .into_iter()
1105 .map(|path| {
1106 path.into_iter()
1107 .map(|id| self.interner.lookup(id).to_string())
1108 .collect()
1109 })
1110 .collect();
1111 Ok(result)
1112 }
1113
1114 fn dfs_all_paths(
1115 adj: &AHashMap<StrId, Vec<StrId>>,
1116 current: StrId,
1117 target: StrId,
1118 max_depth: usize,
1119 max_paths: usize,
1120 visited: &mut AHashSet<StrId>,
1121 current_path: &mut Vec<StrId>,
1122 all_paths: &mut Vec<Vec<StrId>>,
1123 ) {
1124 if all_paths.len() >= max_paths { return; }
1125 if current == target && current_path.len() > 1 {
1126 all_paths.push(current_path.clone());
1127 return;
1128 }
1129 if current_path.len() > max_depth { return; }
1130 if let Some(neighbors) = adj.get(¤t) {
1131 for &nb in neighbors {
1132 if !visited.contains(&nb) {
1133 visited.insert(nb);
1134 current_path.push(nb);
1135 Self::dfs_all_paths(adj, nb, target, max_depth, max_paths, visited, current_path, all_paths);
1136 current_path.pop();
1137 visited.remove(&nb);
1138 }
1139 }
1140 }
1141 }
1142
1143 pub fn search_entities(&self, query: &str) -> Result<Vec<Entity>> {
1145 let token = query.to_lowercase();
1146 let matching = self.search.search(&token, &self.interner);
1147 Ok(matching
1148 .iter()
1149 .filter_map(|idx| {
1150 self.entity_slots
1151 .get(*idx as usize)?
1152 .as_ref()
1153 .filter(|e| e.is_live())
1154 .map(|e| self.entity_to_output(e))
1155 })
1156 .collect())
1157 }
1158}
1159
1160impl KnowledgeGraph {
1161 pub fn new(path: &Path) -> std::io::Result<Self> {
1162 let store = BinaryStore::new(path)?;
1163
1164 let mut interner = StringInterner::with_capacity(65536, 1024);
1166 let mut entity_slots: Vec<Option<StoredEntity>> = Vec::with_capacity(256);
1167 let mut name_table = ShardedNameTable::new(64);
1168 let mut relations: Vec<StoredRelation> = Vec::with_capacity(64);
1169 let mut search = SearchIndex::new();
1170
1171 let mut pending: Option<Vec<(RecordKind, Vec<u8>)>> = None;
1176 store.replay(|kind, data| {
1177 match kind {
1178 RecordKind::TxnBegin => pending = Some(Vec::new()),
1179 RecordKind::TxnCommit => {
1180 if let Some(buffered) = pending.take() {
1181 for (k, d) in &buffered {
1182 Self::apply_record(
1183 *k, d, &mut interner, &mut entity_slots, &mut search,
1184 &mut name_table, &mut relations,
1185 );
1186 }
1187 }
1188 }
1189 other => match pending.as_mut() {
1190 Some(buffered) => buffered.push((other, data.to_vec())),
1191 None => Self::apply_record(
1192 other, data, &mut interner, &mut entity_slots, &mut search,
1193 &mut name_table, &mut relations,
1194 ),
1195 },
1196 }
1197 })?;
1198
1199 let free_slots: Vec<u32> = entity_slots
1201 .iter()
1202 .enumerate()
1203 .filter(|(_, s)| s.is_none())
1204 .map(|(i, _)| i as u32)
1205 .collect();
1206
1207 let mut adjacency: AHashMap<StrId, Vec<(StrId, StrId)>> = AHashMap::new();
1208 for rel in &relations {
1209 adjacency.entry(rel.from).or_default().push((rel.to, rel.relation_type));
1210 adjacency.entry(rel.to).or_default().push((rel.from, rel.relation_type));
1211 }
1212
1213 Ok(Self {
1214 interner,
1215 entity_slots,
1216 free_slots,
1217 name_table,
1218 relations,
1219 adjacency,
1220 search,
1221 store,
1222 })
1223 }
1224
1225 #[allow(clippy::too_many_arguments)]
1233 fn apply_record(
1234 kind: RecordKind,
1235 data: &[u8],
1236 interner: &mut StringInterner,
1237 entity_slots: &mut Vec<Option<StoredEntity>>,
1238 search: &mut SearchIndex,
1239 name_table: &mut ShardedNameTable,
1240 relations: &mut Vec<StoredRelation>,
1241 ) {
1242 match kind {
1243 RecordKind::CreateEntity => {
1244 if let Some((name, etype, obs)) = store_enc::decode_create_entity(data) {
1245 Self::replay_create_entity(
1246 interner, entity_slots, search, name_table, name, etype, &obs,
1247 );
1248 }
1249 }
1250 RecordKind::CreateRelation => {
1251 if let Some((from, to, rtype)) = store_enc::decode_create_relation(data) {
1252 let from_id = interner.intern(from);
1253 let to_id = interner.intern(to);
1254 let type_id = interner.intern(rtype);
1255 relations.push(StoredRelation {
1256 from: from_id,
1257 to: to_id,
1258 relation_type: type_id,
1259 });
1260 }
1261 }
1262 RecordKind::AddObservations => {
1263 if let Some((name, obs)) = store_enc::decode_add_observations(data) {
1264 Self::replay_add_observations(
1265 interner, entity_slots, search, name_table, name, &obs,
1266 );
1267 }
1268 }
1269 RecordKind::DeleteEntity => {
1270 if let Some(name) = store_enc::decode_delete_entity(data) {
1271 Self::replay_delete_entity(
1272 interner, entity_slots, relations, search, name_table, name,
1273 );
1274 }
1275 }
1276 RecordKind::DeleteObservations => {
1277 if let Some((name, obs)) = store_enc::decode_delete_observations(data) {
1278 Self::replay_delete_observations(
1279 interner, entity_slots, search, name_table, name, &obs,
1280 );
1281 }
1282 }
1283 RecordKind::DeleteRelation => {
1284 if let Some((from, to, rtype)) = store_enc::decode_delete_relation(data) {
1285 let from_id = interner.intern(from);
1286 let to_id = interner.intern(to);
1287 let type_id = interner.intern(rtype);
1288 relations.retain(|r| {
1289 !(r.from == from_id && r.to == to_id && r.relation_type == type_id)
1290 });
1291 }
1292 }
1293 RecordKind::TxnBegin | RecordKind::TxnCommit => {}
1294 }
1295 }
1296
1297 #[allow(clippy::ptr_arg)]
1298 fn replay_create_entity(
1299 interner: &mut StringInterner,
1300 entities: &mut Vec<Option<StoredEntity>>,
1301 search: &mut SearchIndex,
1302 name_table: &mut ShardedNameTable,
1303 name: &str,
1304 etype: &str,
1305 observations: &[&str],
1306 ) {
1307 let name_id = interner.intern(name);
1308 let type_id = interner.intern(etype);
1309 let obs_ids: Vec<StrId> = observations.iter().map(|o| interner.intern(o)).collect();
1310 let slot = entities.len() as u32;
1311 entities.push(Some(StoredEntity {
1312 state: ENTITY_SLOT_LIVE,
1313 name: name_id,
1314 entity_type: type_id,
1315 observations: obs_ids.clone(),
1316 }));
1317 let hash = interner.get_hash(name_id);
1318 name_table.insert(&*interner, hash, name_id, slot);
1319 search.index_entity(interner, slot, name_id, type_id, &obs_ids);
1320 }
1321
1322 fn replay_add_observations(
1323 interner: &mut StringInterner,
1324 entities: &mut [Option<StoredEntity>],
1325 search: &mut SearchIndex,
1326 name_table: &mut ShardedNameTable,
1327 name: &str,
1328 observations: &[&str],
1329 ) {
1330 let name_id = interner.intern(name);
1331 let hash = interner.get_hash(name_id);
1332 if let Some(slot) = name_table.lookup(hash, name_id)
1333 && let Some(Some(entity)) = entities.get_mut(slot as usize)
1334 {
1335 for &o in observations {
1336 let oid = interner.intern(o);
1337 if !entity.observations.contains(&oid) {
1338 entity.observations.push(oid);
1339 }
1340 }
1341 search.remove_entity(slot);
1342 search.index_entity(
1343 interner,
1344 slot,
1345 entity.name,
1346 entity.entity_type,
1347 &entity.observations,
1348 );
1349 }
1350 }
1351
1352 fn replay_delete_entity(
1353 interner: &mut StringInterner,
1354 entities: &mut [Option<StoredEntity>],
1355 rels: &mut Vec<StoredRelation>,
1356 search: &mut SearchIndex,
1357 name_table: &mut ShardedNameTable,
1358 name: &str,
1359 ) {
1360 let name_id = interner.intern(name);
1361 let hash = interner.get_hash(name_id);
1362 if let Some(slot) = name_table.lookup(hash, name_id)
1363 && let Some(Some(_)) = entities.get(slot as usize)
1364 {
1365 entities[slot as usize] = None;
1366 search.remove_entity(slot);
1367 name_table.remove(&*interner, hash, name_id);
1368 }
1369 rels.retain(|r| r.from != name_id && r.to != name_id);
1370 }
1371
1372 fn replay_delete_observations(
1373 interner: &mut StringInterner,
1374 entities: &mut [Option<StoredEntity>],
1375 search: &mut SearchIndex,
1376 name_table: &mut ShardedNameTable,
1377 name: &str,
1378 observations: &[&str],
1379 ) {
1380 let name_id = interner.intern(name);
1381 let hash = interner.get_hash(name_id);
1382 if let Some(slot) = name_table.lookup(hash, name_id)
1383 && let Some(Some(entity)) = entities.get_mut(slot as usize)
1384 {
1385 let remove_ids: Vec<StrId> = observations.iter().map(|o| interner.intern(o)).collect();
1386 entity.observations.retain(|o| !remove_ids.contains(o));
1387 search.remove_entity(slot);
1388 search.index_entity(
1389 interner,
1390 slot,
1391 entity.name,
1392 entity.entity_type,
1393 &entity.observations,
1394 );
1395 }
1396 }
1397
1398 pub const fn interner(&self) -> &StringInterner {
1403 &self.interner
1404 }
1405
1406 pub fn get_entity(&self, name: &str) -> Option<Entity> {
1408 let name_id = self.interner.get_optional(name)?;
1409 let hash = self.interner.get_hash(name_id);
1410 let slot = self.name_table.lookup(hash, name_id)?;
1411 let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
1412 if !stored.is_live() {
1413 return None;
1414 }
1415 Some(self.entity_to_output(stored))
1416 }
1417
1418 pub fn graph_stats(&self) -> serde_json::Value {
1420 let live_entities = self
1421 .entity_slots
1422 .iter()
1423 .filter(|s| s.as_ref().is_some_and(|e| e.is_live()))
1424 .count();
1425 let total_relations = self.relations.len();
1426 let index_entries = self.search.len();
1427 let total_obs: usize = self
1428 .entity_slots
1429 .iter()
1430 .filter_map(|s| s.as_ref())
1431 .filter(|e| e.is_live())
1432 .map(|e| e.observations.len())
1433 .sum();
1434
1435 serde_json::json!({
1436 "entities": live_entities,
1437 "relations": total_relations,
1438 "totalObservations": total_obs,
1439 "searchIndexEntries": index_entries,
1440 "internedStrings": self.interner.len(),
1441 "internedBytes": self.interner.total_bytes(),
1442 })
1443 }
1444
1445 pub fn search_relations(&self, from: Option<&str>, to: Option<&str>, rtype: Option<&str>) -> Vec<Relation> {
1449 let from_id = match from {
1450 Some(f) => match self.interner.get_optional(f) {
1451 Some(id) => Some(id),
1452 None => return Vec::new(),
1453 },
1454 None => None,
1455 };
1456 let to_id = match to {
1457 Some(t) => match self.interner.get_optional(t) {
1458 Some(id) => Some(id),
1459 None => return Vec::new(),
1460 },
1461 None => None,
1462 };
1463 let rtype_id = match rtype {
1464 Some(r) => match self.interner.get_optional(r) {
1465 Some(id) => Some(id),
1466 None => return Vec::new(),
1467 },
1468 None => None,
1469 };
1470
1471 self.relations
1472 .iter()
1473 .filter(|r| {
1474 from_id.is_none_or(|f| r.from == f)
1475 && to_id.is_none_or(|t| r.to == t)
1476 && rtype_id.is_none_or(|rt| r.relation_type == rt)
1477 })
1478 .map(|r| Relation {
1479 from: self.interner.lookup(r.from).to_string(),
1480 to: self.interner.lookup(r.to).to_string(),
1481 relation_type: self.interner.lookup(r.relation_type).to_string(),
1482 })
1483 .collect()
1484 }
1485
1486 pub fn find_path(&self, from: &str, to: &str) -> Result<Vec<String>> {
1489 let from_id = self.interner.get_optional(from)
1490 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
1491 let to_id = self.interner.get_optional(to)
1492 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
1493 let hash_from = self.interner.get_hash(from_id);
1494 let hash_to = self.interner.get_hash(to_id);
1495
1496 if self.name_table.lookup(hash_from, from_id).is_none() {
1497 return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
1498 }
1499 if self.name_table.lookup(hash_to, to_id).is_none() {
1500 return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
1501 }
1502 if from_id == to_id {
1503 return Ok(vec![from.to_string()]);
1504 }
1505
1506 let mut visited: AHashSet<StrId> = AHashSet::new();
1508 let mut parent: AHashMap<StrId, StrId> = AHashMap::new();
1509 let mut queue: VecDeque<StrId> = VecDeque::new();
1510
1511 visited.insert(from_id);
1512 queue.push_back(from_id);
1513
1514 while let Some(current) = queue.pop_front() {
1515 if current == to_id {
1516 break;
1517 }
1518
1519 if let Some(neighbors) = self.adjacency.get(¤t) {
1520 for &(neighbor, _) in neighbors {
1521 if visited.insert(neighbor) {
1522 parent.insert(neighbor, current);
1523 queue.push_back(neighbor);
1524 }
1525 }
1526 }
1527 }
1528
1529 if !parent.contains_key(&to_id) && from_id != to_id {
1530 return Err(MCSError::MemoryError(format!(
1531 "No path found between '{from}' and '{to}'"
1532 )));
1533 }
1534
1535 let mut path: Vec<String> = Vec::new();
1537 let mut cur = to_id;
1538 loop {
1539 path.push(self.interner.lookup(cur).to_string());
1540 if cur == from_id {
1541 break;
1542 }
1543 cur = *parent.get(&cur).ok_or_else(|| {
1544 MCSError::MemoryError("Path reconstruction failed".into())
1545 })?;
1546 }
1547 path.reverse();
1548 Ok(path)
1549 }
1550
1551 pub fn compact(&mut self) -> Result<()> {
1556 let mut create_entities: Vec<Entity> = Vec::new();
1558 let mut create_relations: Vec<Relation> = Vec::new();
1559
1560 for slot in &self.entity_slots {
1561 if let Some(stored) = slot.as_ref().filter(|e| e.is_live()) {
1562 create_entities.push(self.entity_to_output(stored));
1563 }
1564 }
1565 for rel in &self.relations {
1566 create_relations.push(Relation {
1567 from: self.interner.lookup(rel.from).to_string(),
1568 to: self.interner.lookup(rel.to).to_string(),
1569 relation_type: self.interner.lookup(rel.relation_type).to_string(),
1570 });
1571 }
1572
1573 let tmp_path = self.store.path().with_extension("tmp");
1580 if let Err(e) = std::fs::remove_file(&tmp_path)
1581 && e.kind() != std::io::ErrorKind::NotFound
1582 {
1583 return Err(MCSError::IoError(e));
1584 }
1585 let mut tmp_store = BinaryStore::new(&tmp_path).map_err(MCSError::IoError)?;
1586 for entity in &create_entities {
1587 let mut buf = Vec::new();
1588 store_enc::encode_create_entity(&mut buf, &entity.name, &entity.entity_type, &entity.observations)
1589 .map_err(MCSError::IoError)?;
1590 tmp_store.write_record(RecordKind::CreateEntity, &buf).map_err(MCSError::IoError)?;
1591 }
1592 for relation in &create_relations {
1593 let mut buf = Vec::new();
1594 store_enc::encode_create_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
1595 .map_err(MCSError::IoError)?;
1596 tmp_store.write_record(RecordKind::CreateRelation, &buf).map_err(MCSError::IoError)?;
1597 }
1598 tmp_store.flush_and_sync().map_err(MCSError::IoError)?;
1599 drop(tmp_store);
1600
1601 std::fs::rename(&tmp_path, self.store.path()).map_err(MCSError::IoError)?;
1606 sync_parent_dir(self.store.path()).map_err(MCSError::IoError)?;
1607
1608 let path = self.store.path().clone();
1614 *self = KnowledgeGraph::new(&path).map_err(MCSError::IoError)?;
1615
1616 Ok(())
1617 }
1618
1619 pub fn create_entities(&mut self, entities: &[Entity]) -> Result<Vec<Entity>> {
1622 for entity in entities {
1624 if entity.name.is_empty() {
1625 return Err(MCSError::InvalidParams(
1626 "Entity name must not be empty".into(),
1627 ));
1628 }
1629 }
1630 let mut created = Vec::new();
1631 for entity in entities {
1632 let existing = self.interner.get_optional(&entity.name)
1634 .and_then(|id| {
1635 let hash = self.interner.get_hash(id);
1636 self.name_table.lookup(hash, id)
1637 });
1638 if existing.is_some() {
1639 continue;
1640 }
1641 let mut buf = Vec::new();
1643 store_enc::encode_create_entity(&mut buf, &entity.name, &entity.entity_type, &entity.observations)
1644 .map_err(MCSError::IoError)?;
1645 self.store.write_record(RecordKind::CreateEntity, &buf)
1646 .map_err(MCSError::IoError)?;
1647
1648 let name_id = self.interner.intern(&entity.name);
1649 let hash = self.interner.get_hash(name_id);
1650 let type_id = self.interner.intern(&entity.entity_type);
1651 let obs_ids: Vec<StrId> = entity
1652 .observations
1653 .iter()
1654 .map(|o| self.interner.intern(o))
1655 .collect();
1656 let reused = self.free_slots.pop();
1659 let slot = reused.unwrap_or(self.entity_slots.len() as u32);
1660 self.search
1661 .index_entity(&mut self.interner, slot, name_id, type_id, &obs_ids);
1662 let stored = Some(StoredEntity {
1663 state: ENTITY_SLOT_LIVE,
1664 name: name_id,
1665 entity_type: type_id,
1666 observations: obs_ids,
1667 });
1668 match reused {
1669 Some(s) => self.entity_slots[s as usize] = stored,
1670 None => self.entity_slots.push(stored),
1671 }
1672 self.name_table.insert(&self.interner, hash, name_id, slot);
1673 created.push(Entity {
1674 name: entity.name.clone(),
1675 entity_type: entity.entity_type.clone(),
1676 observations: entity.observations.clone(),
1677 });
1678 }
1679 Ok(created)
1680 }
1681
1682 pub fn create_relations(&mut self, relations: &[Relation]) -> Result<Vec<Relation>> {
1683 for relation in relations {
1685 if relation.from.is_empty() || relation.to.is_empty() {
1686 return Err(MCSError::InvalidParams(
1687 "Relation endpoints must not be empty".into(),
1688 ));
1689 }
1690 }
1691 let mut created = Vec::new();
1692 let mut rel_set: AHashSet<(StrId, StrId, StrId)> = AHashSet::new();
1694 for rel in &self.relations {
1695 rel_set.insert((rel.from, rel.to, rel.relation_type));
1696 }
1697 for relation in relations {
1698 let from_id = self.interner.intern(&relation.from);
1699 let to_id = self.interner.intern(&relation.to);
1700 let type_id = self.interner.intern(&relation.relation_type);
1701 if !rel_set.insert((from_id, to_id, type_id)) {
1702 continue;
1703 }
1704 let mut buf = Vec::new();
1706 store_enc::encode_create_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
1707 .map_err(MCSError::IoError)?;
1708 self.store.write_record(RecordKind::CreateRelation, &buf)
1709 .map_err(MCSError::IoError)?;
1710
1711 self.relations.push(StoredRelation {
1712 from: from_id,
1713 to: to_id,
1714 relation_type: type_id,
1715 });
1716 self.adjacency.entry(from_id).or_default().push((to_id, type_id));
1717 self.adjacency.entry(to_id).or_default().push((from_id, type_id));
1718 created.push(Relation {
1719 from: relation.from.clone(),
1720 to: relation.to.clone(),
1721 relation_type: relation.relation_type.clone(),
1722 });
1723 }
1724 Ok(created)
1725 }
1726
1727 pub fn add_observations(&mut self, entity_name: &str, contents: &[String]) -> Result<Vec<String>> {
1728 let name_id = self.interner.get_optional(entity_name)
1729 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1730 let hash = self.interner.get_hash(name_id);
1731 let slot = self
1732 .name_table
1733 .lookup(hash, name_id)
1734 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1735 let existing: AHashSet<StrId> = self
1738 .entity_slots
1739 .get(slot as usize)
1740 .and_then(|e| e.as_ref())
1741 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?
1742 .observations
1743 .iter()
1744 .copied()
1745 .collect();
1746
1747 let mut added = Vec::new();
1750 let mut interned_added = Vec::new();
1751 let mut seen: AHashSet<StrId> = AHashSet::new();
1752 for content in contents {
1753 let cid = self.interner.intern(content);
1754 if existing.contains(&cid) || !seen.insert(cid) {
1755 continue;
1756 }
1757 interned_added.push(cid);
1758 added.push(content.clone());
1759 }
1760 if added.is_empty() {
1761 return Ok(added);
1762 }
1763
1764 let mut buf = Vec::new();
1767 store_enc::encode_add_observations(&mut buf, entity_name, &added)
1768 .map_err(MCSError::IoError)?;
1769 self.store.write_record(RecordKind::AddObservations, &buf)
1770 .map_err(MCSError::IoError)?;
1771
1772 let stored = self
1774 .entity_slots
1775 .get_mut(slot as usize)
1776 .and_then(|e| e.as_mut())
1777 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1778 stored.observations.extend_from_slice(&interned_added);
1779
1780 self.search
1783 .index_additional(&mut self.interner, slot, &interned_added);
1784 Ok(added)
1785 }
1786
1787 pub fn delete_entities(&mut self, entity_names: &[String]) -> Result<()> {
1788 let mut deleted_names = Vec::new();
1789 for name in entity_names {
1790 let name_id_opt = self.interner.get_optional(name);
1791 if let Some(name_id) = name_id_opt {
1792 let hash = self.interner.get_hash(name_id);
1793 if let Some(slot) = self.name_table.lookup(hash, name_id)
1794 && let Some(Some(_)) = self.entity_slots.get(slot as usize)
1795 {
1796 let mut buf = Vec::new();
1798 store_enc::encode_delete_entity(&mut buf, name)
1799 .map_err(MCSError::IoError)?;
1800 self.store.write_record(RecordKind::DeleteEntity, &buf)
1801 .map_err(MCSError::IoError)?;
1802
1803 self.entity_slots[slot as usize] = None;
1804 self.free_slots.push(slot);
1805 self.search.remove_entity(slot);
1806 self.name_table.remove(&self.interner, hash, name_id);
1807 deleted_names.push(name.clone());
1808 }
1809 }
1810 }
1811 if !deleted_names.is_empty() {
1812 let deleted_ids: AHashSet<StrId> = deleted_names.iter()
1814 .map(|n| self.interner.intern(n))
1815 .collect();
1816 self.relations
1817 .retain(|r| !deleted_ids.contains(&r.from) && !deleted_ids.contains(&r.to));
1818 for id in &deleted_ids {
1820 self.adjacency.remove(id);
1821 for list in self.adjacency.values_mut() {
1823 list.retain(|(to, _)| !deleted_ids.contains(to));
1824 }
1825 }
1826 }
1827 Ok(())
1828 }
1829
1830 pub fn delete_observations(&mut self, entity_name: &str, observations: &[String]) -> Result<()> {
1831 let name_id = self.interner.get_optional(entity_name)
1832 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1833 let hash = self.interner.get_hash(name_id);
1834 let slot = self
1835 .name_table
1836 .lookup(hash, name_id)
1837 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1838 self.entity_slots
1840 .get(slot as usize)
1841 .and_then(|e| e.as_ref())
1842 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1843 let remove_ids: AHashSet<StrId> = observations.iter().map(|o| self.interner.intern(o)).collect();
1844
1845 let mut buf = Vec::new();
1847 store_enc::encode_delete_observations(&mut buf, entity_name, observations)
1848 .map_err(MCSError::IoError)?;
1849 self.store.write_record(RecordKind::DeleteObservations, &buf)
1850 .map_err(MCSError::IoError)?;
1851
1852 let stored = self
1854 .entity_slots
1855 .get_mut(slot as usize)
1856 .and_then(|e| e.as_mut())
1857 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1858 stored.observations.retain(|o| !remove_ids.contains(o));
1859 self.search.remove_entity(slot);
1860 self.search
1861 .index_entity(&mut self.interner, slot, stored.name, stored.entity_type, &stored.observations);
1862 Ok(())
1863 }
1864
1865 pub fn delete_relations(&mut self, relations: &[Relation]) -> Result<()> {
1866 let rels: AHashSet<(StrId, StrId, StrId)> = relations
1868 .iter()
1869 .map(|r| {
1870 (
1871 self.interner.intern(&r.from),
1872 self.interner.intern(&r.to),
1873 self.interner.intern(&r.relation_type),
1874 )
1875 })
1876 .collect();
1877 for relation in relations {
1880 let mut buf = Vec::new();
1881 store_enc::encode_delete_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
1882 .map_err(MCSError::IoError)?;
1883 self.store.write_record(RecordKind::DeleteRelation, &buf)
1884 .map_err(MCSError::IoError)?;
1885 }
1886 self.relations
1887 .retain(|r| !rels.contains(&(r.from, r.to, r.relation_type)));
1888 for (f, t, rt) in &rels {
1890 if let Some(edges) = self.adjacency.get_mut(f) {
1891 edges.retain(|(to, rtype)| to != t || rtype != rt);
1892 if edges.is_empty() {
1893 self.adjacency.remove(f);
1894 }
1895 }
1896 if let Some(edges) = self.adjacency.get_mut(t) {
1897 edges.retain(|(to, rtype)| to != f || rtype != rt);
1898 if edges.is_empty() {
1899 self.adjacency.remove(t);
1900 }
1901 }
1902 }
1903 Ok(())
1904 }
1905
1906 pub fn read_graph(&self) -> KnowledgeGraphOut {
1907 self.read_graph_view().to_owned_out()
1908 }
1909
1910 pub fn read_graph_view(&self) -> GraphView<'_> {
1914 let entities: Vec<&StoredEntity> = self
1915 .entity_slots
1916 .iter()
1917 .filter_map(|s| s.as_ref().filter(|e| e.is_live()))
1918 .collect();
1919 let relations: Vec<&StoredRelation> = self.relations.iter().collect();
1920 GraphView { kg: self, entities, relations }
1921 }
1922
1923 pub fn search_nodes(&self, query: &str) -> KnowledgeGraphOut {
1926 self.search_nodes_filtered(query, None, 0, usize::MAX)
1927 }
1928
1929 pub fn open_nodes(&self, names: &[String]) -> KnowledgeGraphOut {
1930 self.open_nodes_view(names).to_owned_out()
1931 }
1932
1933 pub fn open_nodes_view(&self, names: &[String]) -> GraphView<'_> {
1935 let name_ids: AHashSet<StrId> = names.iter()
1936 .filter_map(|n| self.interner.get_optional(n))
1937 .collect();
1938 let entities: Vec<&StoredEntity> = self
1939 .entity_slots
1940 .iter()
1941 .filter_map(|s| {
1942 s.as_ref()
1943 .filter(|stored| stored.is_live() && name_ids.contains(&stored.name))
1944 })
1945 .collect();
1946 let matched_names: AHashSet<StrId> = entities.iter().map(|e| e.name).collect();
1947 let relations: Vec<&StoredRelation> = self
1948 .relations
1949 .iter()
1950 .filter(|r| matched_names.contains(&r.from) || matched_names.contains(&r.to))
1951 .collect();
1952 GraphView { kg: self, entities, relations }
1953 }
1954
1955 fn entity_to_output(&self, stored: &StoredEntity) -> Entity {
1960 Entity {
1961 name: self.interner.lookup(stored.name).to_string(),
1962 entity_type: self.interner.lookup(stored.entity_type).to_string(),
1963 observations: stored
1964 .observations
1965 .iter()
1966 .map(|o| self.interner.lookup(*o).to_string())
1967 .collect(),
1968 }
1969 }
1970
1971 #[inline]
1972 fn relation_to_output(&self, r: &StoredRelation) -> Relation {
1973 Relation {
1974 from: self.interner.lookup(r.from).to_string(),
1975 to: self.interner.lookup(r.to).to_string(),
1976 relation_type: self.interner.lookup(r.relation_type).to_string(),
1977 }
1978 }
1979
1980 fn lookup_live_slot(&self, name: &str) -> Option<u32> {
1982 let name_id = self.interner.get_optional(name)?;
1983 let hash = self.interner.get_hash(name_id);
1984 let slot = self.name_table.lookup(hash, name_id)?;
1985 let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
1986 stored.is_live().then_some(slot)
1987 }
1988
1989 fn entity_by_name_id(&self, name_id: StrId) -> Option<Entity> {
1991 let hash = self.interner.get_hash(name_id);
1992 let slot = self.name_table.lookup(hash, name_id)?;
1993 let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
1994 stored.is_live().then(|| self.entity_to_output(stored))
1995 }
1996
1997 pub fn entity_type_counts(&self) -> Vec<(String, usize)> {
2001 let mut counts: AHashMap<StrId, usize> = AHashMap::new();
2002 for st in self
2003 .entity_slots
2004 .iter()
2005 .filter_map(|s| s.as_ref())
2006 .filter(|e| e.is_live())
2007 {
2008 *counts.entry(st.entity_type).or_insert(0) += 1;
2009 }
2010 self.rank_counts(counts)
2011 }
2012
2013 pub fn relation_type_counts(&self) -> Vec<(String, usize)> {
2015 let mut counts: AHashMap<StrId, usize> = AHashMap::new();
2016 for r in &self.relations {
2017 *counts.entry(r.relation_type).or_insert(0) += 1;
2018 }
2019 self.rank_counts(counts)
2020 }
2021
2022 fn rank_counts(&self, counts: AHashMap<StrId, usize>) -> Vec<(String, usize)> {
2023 let mut out: Vec<(String, usize)> = counts
2024 .into_iter()
2025 .map(|(id, c)| (self.interner.lookup(id).to_string(), c))
2026 .collect();
2027 out.sort_unstable_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
2028 out
2029 }
2030
2031 pub fn search_nodes_filtered(
2035 &self,
2036 query: &str,
2037 entity_type: Option<&str>,
2038 offset: usize,
2039 limit: usize,
2040 ) -> KnowledgeGraphOut {
2041 self.search_nodes_view(query, entity_type, offset, limit).to_owned_out()
2042 }
2043
2044 pub fn search_nodes_view(
2046 &self,
2047 query: &str,
2048 entity_type: Option<&str>,
2049 offset: usize,
2050 limit: usize,
2051 ) -> GraphView<'_> {
2052 let type_id = match entity_type {
2053 Some(t) => match self.interner.get_optional(t) {
2054 Some(id) => Some(id),
2055 None => return GraphView { kg: self, entities: Vec::new(), relations: Vec::new() },
2056 },
2057 None => None,
2058 };
2059
2060 let ranked = self.search.search_ranked(query, &self.interner);
2061 let mut selected: AHashSet<StrId> = AHashSet::new();
2062 let mut entities: Vec<&StoredEntity> = Vec::new();
2063 let mut skipped = 0usize;
2064 for (slot, _score) in ranked {
2065 let Some(st) = self
2066 .entity_slots
2067 .get(slot as usize)
2068 .and_then(|s| s.as_ref())
2069 .filter(|e| e.is_live())
2070 else {
2071 continue;
2072 };
2073 if type_id.is_some_and(|tid| st.entity_type != tid) {
2074 continue;
2075 }
2076 if skipped < offset {
2077 skipped += 1;
2078 continue;
2079 }
2080 if entities.len() >= limit {
2081 break;
2082 }
2083 selected.insert(st.name);
2084 entities.push(st);
2085 }
2086
2087 let relations: Vec<&StoredRelation> = self
2088 .relations
2089 .iter()
2090 .filter(|r| selected.contains(&r.from) || selected.contains(&r.to))
2091 .collect();
2092 GraphView { kg: self, entities, relations }
2093 }
2094
2095 pub fn read_graph_filtered(
2099 &self,
2100 entity_type: Option<&str>,
2101 offset: usize,
2102 limit: usize,
2103 ) -> KnowledgeGraphOut {
2104 self.read_graph_filtered_view(entity_type, offset, limit).to_owned_out()
2105 }
2106
2107 pub fn read_graph_filtered_view(
2109 &self,
2110 entity_type: Option<&str>,
2111 offset: usize,
2112 limit: usize,
2113 ) -> GraphView<'_> {
2114 let type_id = match entity_type {
2115 Some(t) => match self.interner.get_optional(t) {
2116 Some(id) => Some(id),
2117 None => return GraphView { kg: self, entities: Vec::new(), relations: Vec::new() },
2118 },
2119 None => None,
2120 };
2121
2122 let mut selected: AHashSet<StrId> = AHashSet::new();
2123 let mut entities: Vec<&StoredEntity> = Vec::new();
2124 let mut skipped = 0usize;
2125 for st in self
2126 .entity_slots
2127 .iter()
2128 .filter_map(|s| s.as_ref())
2129 .filter(|e| e.is_live())
2130 {
2131 if type_id.is_some_and(|tid| st.entity_type != tid) {
2132 continue;
2133 }
2134 if skipped < offset {
2135 skipped += 1;
2136 continue;
2137 }
2138 if entities.len() >= limit {
2139 break;
2140 }
2141 selected.insert(st.name);
2142 entities.push(st);
2143 }
2144
2145 let relations: Vec<&StoredRelation> = self
2146 .relations
2147 .iter()
2148 .filter(|r| selected.contains(&r.from) && selected.contains(&r.to))
2149 .collect();
2150 GraphView { kg: self, entities, relations }
2151 }
2152
2153 pub fn neighbors(
2161 &self,
2162 name: &str,
2163 direction: Direction,
2164 rtype: Option<&str>,
2165 depth: u32,
2166 ) -> Result<KnowledgeGraphOut> {
2167 self.lookup_live_slot(name)
2168 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
2169 let start = self.interner.get_optional(name).unwrap();
2171
2172 let rtype_id = match rtype {
2174 Some(r) => match self.interner.get_optional(r) {
2175 Some(id) => Some(id),
2176 None => {
2177 let entities = self.entity_by_name_id(start).into_iter().collect();
2178 return Ok(KnowledgeGraphOut { entities, relations: Vec::new() });
2179 }
2180 },
2181 None => None,
2182 };
2183
2184 let mut visited: AHashSet<StrId> = AHashSet::new();
2185 visited.insert(start);
2186
2187 let type_ok = |r: &StoredRelation| rtype_id.is_none_or(|rt| r.relation_type == rt);
2188
2189 if depth == 1 {
2190 for r in self.relations.iter().filter(|r| type_ok(r)) {
2191 match direction {
2192 Direction::Out => {
2193 if r.from == start {
2194 visited.insert(r.to);
2195 }
2196 }
2197 Direction::In => {
2198 if r.to == start {
2199 visited.insert(r.from);
2200 }
2201 }
2202 Direction::Both => {
2203 if r.from == start {
2204 visited.insert(r.to);
2205 } else if r.to == start {
2206 visited.insert(r.from);
2207 }
2208 }
2209 }
2210 }
2211 } else if depth >= 2 {
2212 let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
2216 match direction {
2217 Direction::Both => {
2218 for (&node, edges) in &self.adjacency {
2219 for &(nb, rt) in edges {
2220 if rtype_id.is_none_or(|rt_id| rt == rt_id) {
2221 adj.entry(node).or_default().push(nb);
2222 }
2223 }
2224 }
2225 }
2226 Direction::Out | Direction::In => {
2227 for r in self.relations.iter().filter(|r| type_ok(r)) {
2228 match direction {
2229 Direction::Out => adj.entry(r.from).or_default().push(r.to),
2230 Direction::In => adj.entry(r.to).or_default().push(r.from),
2231 _ => unreachable!(),
2232 }
2233 }
2234 }
2235 }
2236 let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
2237 queue.push_back((start, 0));
2238 while let Some((node, d)) = queue.pop_front() {
2239 if d >= depth {
2240 continue;
2241 }
2242 if let Some(nbrs) = adj.get(&node) {
2243 for &nb in nbrs {
2244 if visited.insert(nb) {
2245 queue.push_back((nb, d + 1));
2246 }
2247 }
2248 }
2249 }
2250 }
2251
2252 let mut entities = Vec::with_capacity(visited.len());
2253 for &nid in &visited {
2254 if let Some(e) = self.entity_by_name_id(nid) {
2255 entities.push(e);
2256 }
2257 }
2258 let relations = self
2259 .relations
2260 .iter()
2261 .filter(|r| type_ok(r) && visited.contains(&r.from) && visited.contains(&r.to))
2262 .map(|r| self.relation_to_output(r))
2263 .collect();
2264 Ok(KnowledgeGraphOut { entities, relations })
2265 }
2266
2267 pub fn describe_entity(&self, name: &str) -> Result<serde_json::Value> {
2271 let name_id = self
2272 .interner
2273 .get_optional(name)
2274 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
2275 let entity = self
2276 .entity_by_name_id(name_id)
2277 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
2278
2279 let mut incident: Vec<Relation> = Vec::new();
2280 let mut neighbor_seen: AHashSet<StrId> = AHashSet::new();
2281 let mut neighbors: Vec<&str> = Vec::new();
2282 for r in &self.relations {
2283 if r.from == name_id || r.to == name_id {
2284 incident.push(self.relation_to_output(r));
2285 let other = if r.from == name_id { r.to } else { r.from };
2286 if other != name_id && neighbor_seen.insert(other) {
2287 neighbors.push(self.interner.lookup(other));
2288 }
2289 }
2290 }
2291
2292 Ok(serde_json::json!({
2293 "entity": entity,
2294 "relations": incident,
2295 "neighbors": neighbors,
2296 "degree": incident.len(),
2297 }))
2298 }
2299
2300 pub fn upsert_entities(&mut self, entities: &[Entity]) -> Result<Vec<serde_json::Value>> {
2305 for e in entities {
2306 if e.name.is_empty() {
2307 return Err(MCSError::InvalidParams(
2308 "Entity name must not be empty".into(),
2309 ));
2310 }
2311 }
2312 let mut out = Vec::with_capacity(entities.len());
2313 for e in entities {
2314 if self.lookup_live_slot(&e.name).is_some() {
2315 let added = self.add_observations(&e.name, &e.observations)?;
2316 out.push(serde_json::json!({
2317 "name": e.name,
2318 "created": false,
2319 "addedObservations": added,
2320 }));
2321 } else {
2322 let created = self.create_entities(std::slice::from_ref(e))?;
2323 out.push(serde_json::json!({
2324 "name": e.name,
2325 "created": !created.is_empty(),
2326 "addedObservations": e.observations,
2327 }));
2328 }
2329 }
2330 Ok(out)
2331 }
2332
2333 pub fn export(&self, format: &str) -> Result<String> {
2335 match format {
2336 "json" => serde_json::to_string(&self.read_graph()).map_err(MCSError::JsonError),
2337 "mermaid" => Ok(self.export_mermaid()),
2338 "dot" => Ok(self.export_dot()),
2339 other => Err(MCSError::InvalidParams(format!(
2340 "Unknown export format '{other}' (expected json|mermaid|dot)"
2341 ))),
2342 }
2343 }
2344
2345 fn diagram_node_ids(&self) -> (AHashMap<StrId, usize>, Vec<(usize, StrId)>) {
2347 let mut ids: AHashMap<StrId, usize> = AHashMap::new();
2348 let mut order: Vec<(usize, StrId)> = Vec::new();
2349 for st in self
2350 .entity_slots
2351 .iter()
2352 .filter_map(|s| s.as_ref())
2353 .filter(|e| e.is_live())
2354 {
2355 let n = ids.len();
2356 ids.insert(st.name, n);
2357 order.push((n, st.name));
2358 }
2359 (ids, order)
2360 }
2361
2362 fn export_mermaid(&self) -> String {
2363 let (ids, order) = self.diagram_node_ids();
2364 let mut s = String::with_capacity(64 + order.len() * 32 + self.relations.len() * 32);
2365 s.push_str("graph LR\n");
2366 for (n, name_id) in &order {
2367 let label = sanitize_label(self.interner.lookup(*name_id));
2368 s.push_str(&format!(" n{n}[\"{label}\"]\n"));
2369 }
2370 for r in &self.relations {
2371 if let (Some(&a), Some(&b)) = (ids.get(&r.from), ids.get(&r.to)) {
2372 let rel = sanitize_label(self.interner.lookup(r.relation_type));
2373 s.push_str(&format!(" n{a} -->|{rel}| n{b}\n"));
2374 }
2375 }
2376 s
2377 }
2378
2379 fn export_dot(&self) -> String {
2380 let (ids, order) = self.diagram_node_ids();
2381 let mut s = String::with_capacity(64 + order.len() * 32 + self.relations.len() * 32);
2382 s.push_str("digraph G {\n");
2383 for (n, name_id) in &order {
2384 let label = sanitize_label(self.interner.lookup(*name_id));
2385 s.push_str(&format!(" n{n} [label=\"{label}\"];\n"));
2386 }
2387 for r in &self.relations {
2388 if let (Some(&a), Some(&b)) = (ids.get(&r.from), ids.get(&r.to)) {
2389 let rel = sanitize_label(self.interner.lookup(r.relation_type));
2390 s.push_str(&format!(" n{a} -> n{b} [label=\"{rel}\"];\n"));
2391 }
2392 }
2393 s.push_str("}\n");
2394 s
2395 }
2396
2397 pub fn merge_entities(&mut self, source: &str, target: &str) -> Result<serde_json::Value> {
2410 if source == target {
2411 return Err(MCSError::InvalidParams(
2412 "Source and target must be different entities".into(),
2413 ));
2414 }
2415 self.lookup_live_slot(source).ok_or_else(|| {
2416 MCSError::InvalidParams(format!("Source entity '{source}' not found"))
2417 })?;
2418 let target_slot = self.lookup_live_slot(target).ok_or_else(|| {
2419 MCSError::InvalidParams(format!("Target entity '{target}' not found"))
2420 })?;
2421
2422 let source_entity = self.get_entity(source).unwrap();
2423 let moved_obs_count = source_entity.observations.len();
2424 let source_id = self.interner.get_optional(source).unwrap();
2425 let target_id = self.interner.get_optional(target).unwrap();
2426
2427 let target_existing: AHashSet<StrId> = self.entity_slots[target_slot as usize]
2430 .as_ref()
2431 .unwrap()
2432 .observations
2433 .iter()
2434 .copied()
2435 .collect();
2436 let mut obs_seen: AHashSet<StrId> = AHashSet::new();
2437 let mut obs_to_add: Vec<String> = Vec::new();
2438 for o in &source_entity.observations {
2439 if let Some(oid) = self.interner.get_optional(o)
2440 && !target_existing.contains(&oid)
2441 && obs_seen.insert(oid)
2442 {
2443 obs_to_add.push(o.clone());
2444 }
2445 }
2446
2447 let existing_rels: AHashSet<(StrId, StrId, StrId)> =
2450 self.relations.iter().map(|r| (r.from, r.to, r.relation_type)).collect();
2451 let mut rel_seen: AHashSet<(StrId, StrId, StrId)> = AHashSet::new();
2452 let mut redirect: Vec<Relation> = Vec::new();
2453 for r in &self.relations {
2454 if r.from != source_id && r.to != source_id {
2455 continue;
2456 }
2457 let new_from = if r.from == source_id { target_id } else { r.from };
2458 let new_to = if r.to == source_id { target_id } else { r.to };
2459 if new_from == new_to {
2460 continue; }
2462 let key = (new_from, new_to, r.relation_type);
2463 if existing_rels.contains(&key) || !rel_seen.insert(key) {
2464 continue;
2465 }
2466 redirect.push(Relation {
2467 from: self.interner.lookup(new_from).to_string(),
2468 to: self.interner.lookup(new_to).to_string(),
2469 relation_type: self.interner.lookup(r.relation_type).to_string(),
2470 });
2471 }
2472
2473 let added_count = obs_to_add.len();
2474 let redirected = redirect.len() as u32;
2475
2476 let mut records: Vec<(RecordKind, Vec<u8>)> = Vec::new();
2478 if !obs_to_add.is_empty() {
2479 let mut buf = Vec::new();
2480 store_enc::encode_add_observations(&mut buf, target, &obs_to_add)
2481 .map_err(MCSError::IoError)?;
2482 records.push((RecordKind::AddObservations, buf));
2483 }
2484 for r in &redirect {
2485 let mut buf = Vec::new();
2486 store_enc::encode_create_relation(&mut buf, &r.from, &r.to, &r.relation_type)
2487 .map_err(MCSError::IoError)?;
2488 records.push((RecordKind::CreateRelation, buf));
2489 }
2490 let mut del_buf = Vec::new();
2491 store_enc::encode_delete_entity(&mut del_buf, source).map_err(MCSError::IoError)?;
2492 records.push((RecordKind::DeleteEntity, del_buf));
2493
2494 self.store.write_record(RecordKind::TxnBegin, &[]).map_err(MCSError::IoError)?;
2496 for (kind, data) in &records {
2497 self.store.write_record(*kind, data).map_err(MCSError::IoError)?;
2498 }
2499 self.store.write_record(RecordKind::TxnCommit, &[]).map_err(MCSError::IoError)?;
2500
2501 for (kind, data) in &records {
2503 Self::apply_record(
2504 *kind, data, &mut self.interner, &mut self.entity_slots, &mut self.search,
2505 &mut self.name_table, &mut self.relations,
2506 );
2507 }
2508
2509 Ok(serde_json::json!({
2510 "source": source,
2511 "target": target,
2512 "movedObservations": moved_obs_count,
2513 "addedObservations": added_count,
2514 "redirectedRelations": redirected,
2515 }))
2516 }
2517
2518 pub fn extract_subgraph(&self, names: &[String], depth: u32) -> Result<KnowledgeGraphOut> {
2522 if names.is_empty() {
2523 return Ok(KnowledgeGraphOut {
2524 entities: Vec::new(),
2525 relations: Vec::new(),
2526 });
2527 }
2528 let mut visited: AHashSet<StrId> = AHashSet::new();
2530 let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
2531 for name in names {
2532 if let Some(id) = self.interner.get_optional(name)
2533 && visited.insert(id)
2534 {
2535 queue.push_back((id, 0));
2536 }
2537 }
2538 let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
2540 for (&node, edges) in &self.adjacency {
2541 let nb: Vec<StrId> = edges.iter().map(|(to, _)| *to).collect();
2542 adj.insert(node, nb);
2543 }
2544 while let Some((node, d)) = queue.pop_front() {
2545 if d >= depth {
2546 continue;
2547 }
2548 if let Some(nbrs) = adj.get(&node) {
2549 for &nb in nbrs {
2550 if visited.insert(nb) {
2551 queue.push_back((nb, d + 1));
2552 }
2553 }
2554 }
2555 }
2556 let mut entities: Vec<Entity> = Vec::with_capacity(visited.len());
2557 for &nid in &visited {
2558 if let Some(e) = self.entity_by_name_id(nid) {
2559 entities.push(e);
2560 }
2561 }
2562 let relations: Vec<Relation> = self
2563 .relations
2564 .iter()
2565 .filter(|r| visited.contains(&r.from) && visited.contains(&r.to))
2566 .map(|r| self.relation_to_output(r))
2567 .collect();
2568 Ok(KnowledgeGraphOut { entities, relations })
2569 }
2570
2571 pub fn batch_get_entities(&self, names: &[String]) -> Vec<Option<Entity>> {
2573 names.iter().map(|n| self.get_entity(n)).collect()
2574 }
2575
2576 #[allow(clippy::too_many_arguments)]
2579 fn dfs_all_paths(
2580 adj: &AHashMap<StrId, Vec<StrId>>,
2581 current: StrId,
2582 target: StrId,
2583 max_depth: usize,
2584 max_paths: usize,
2585 visited: &mut AHashSet<StrId>,
2586 current_path: &mut Vec<StrId>,
2587 all_paths: &mut Vec<Vec<StrId>>,
2588 ) {
2589 if all_paths.len() >= max_paths {
2590 return;
2591 }
2592 if current == target && current_path.len() > 1 {
2593 all_paths.push(current_path.clone());
2594 return;
2595 }
2596 if current_path.len() > max_depth {
2597 return;
2598 }
2599 if let Some(neighbors) = adj.get(¤t) {
2600 for &nb in neighbors {
2601 if visited.insert(nb) {
2602 current_path.push(nb);
2603 Self::dfs_all_paths(
2604 adj, nb, target, max_depth, max_paths, visited, current_path, all_paths,
2605 );
2606 current_path.pop();
2607 visited.remove(&nb);
2608 }
2609 }
2610 }
2611 }
2612
2613 pub fn find_all_paths(
2617 &self,
2618 from: &str,
2619 to: &str,
2620 max_depth: usize,
2621 max_paths: usize,
2622 ) -> Result<Vec<Vec<String>>> {
2623 let from_id = self
2624 .interner
2625 .get_optional(from)
2626 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
2627 let to_id = self
2628 .interner
2629 .get_optional(to)
2630 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
2631 if self.lookup_live_slot(from).is_none() {
2633 return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
2634 }
2635 if self.lookup_live_slot(to).is_none() {
2636 return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
2637 }
2638 if from_id == to_id {
2639 return Ok(vec![vec![from.to_string()]]);
2640 }
2641 let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::with_capacity(self.adjacency.len());
2643 for (&node, edges) in &self.adjacency {
2644 let nbrs: Vec<StrId> = edges.iter().map(|(to, _)| *to).collect();
2645 adj.insert(node, nbrs);
2646 }
2647 let mut all_paths: Vec<Vec<StrId>> = Vec::new();
2648 let mut current_path = Vec::new();
2649 let mut visited: AHashSet<StrId> = AHashSet::new();
2650 visited.insert(from_id);
2651 current_path.push(from_id);
2652 Self::dfs_all_paths(
2653 &adj,
2654 from_id,
2655 to_id,
2656 max_depth,
2657 max_paths,
2658 &mut visited,
2659 &mut current_path,
2660 &mut all_paths,
2661 );
2662 if all_paths.is_empty() {
2663 return Err(MCSError::MemoryError(format!(
2664 "No path found between '{from}' and '{to}'"
2665 )));
2666 }
2667 let result: Vec<Vec<String>> = all_paths
2668 .into_iter()
2669 .map(|path| {
2670 path.into_iter()
2671 .map(|id| self.interner.lookup(id).to_string())
2672 .collect()
2673 })
2674 .collect();
2675 Ok(result)
2676 }
2677
2678 pub fn snapshot(&self) -> ReadSnapshot {
2683 ReadSnapshot {
2684 interner: self.interner.clone(),
2685 entity_slots: Arc::from_iter(self.entity_slots.iter().cloned()),
2686 free_slots: self.free_slots.clone(),
2687 name_table: self.name_table.clone(),
2688 relations: Arc::from_iter(self.relations.iter().cloned()),
2689 adjacency: self.adjacency.clone(),
2690 search: self.search.clone(),
2691 }
2692 }
2693
2694 pub fn flush(&mut self) -> Result<()> {
2698 self.store.flush().map_err(MCSError::IoError)
2699 }
2700
2701 pub fn sync(&mut self) -> Result<()> {
2704 self.store.sync().map_err(MCSError::IoError)
2705 }
2706
2707 pub fn flush_and_sync(&mut self) -> Result<()> {
2709 self.store.flush_and_sync().map_err(MCSError::IoError)
2710 }
2711}
2712
2713
2714
2715pub struct GraphHandle {
2734 inner: Arc<parking_lot::Mutex<KnowledgeGraph>>,
2735 snapshot: ArcSwap<ReadSnapshot>,
2736 read_cache: ArcSwap<Option<Arc<str>>>,
2738 sync_notify: Arc<(StdMutex<bool>, Condvar)>,
2742 stop_sync: Arc<AtomicBool>,
2744}
2745
2746pub struct WriteGuard<'a> {
2748 guard: parking_lot::MutexGuard<'a, KnowledgeGraph>,
2749 snapshot: &'a ArcSwap<ReadSnapshot>,
2750 read_cache: &'a ArcSwap<Option<Arc<str>>>,
2751 sync_notify: &'a (StdMutex<bool>, Condvar),
2752 did_publish: bool,
2753}
2754
2755impl WriteGuard<'_> {
2756 pub fn publish(&mut self) {
2760 if let Err(e) = self.guard.flush() {
2761 tracing::error!("WAL flush failed: {e}");
2762 }
2763 let snap = Arc::new(self.guard.snapshot());
2764 self.snapshot.store(snap);
2765 self.read_cache.store(Arc::new(None));
2766 self.did_publish = true;
2767 let (lock, cvar) = self.sync_notify;
2769 let mut pending = lock.lock().unwrap_or_else(|e| e.into_inner());
2770 *pending = true;
2771 cvar.notify_one();
2772 }
2773
2774 pub fn graph(&mut self) -> &mut KnowledgeGraph {
2776 &mut self.guard
2777 }
2778}
2779
2780impl std::ops::Deref for WriteGuard<'_> {
2781 type Target = KnowledgeGraph;
2782 fn deref(&self) -> &KnowledgeGraph {
2783 &self.guard
2784 }
2785}
2786
2787impl std::ops::DerefMut for WriteGuard<'_> {
2788 fn deref_mut(&mut self) -> &mut KnowledgeGraph {
2789 &mut self.guard
2790 }
2791}
2792
2793impl Drop for WriteGuard<'_> {
2794 fn drop(&mut self) {
2795 if !self.did_publish {
2796 self.publish();
2797 }
2798 }
2799}
2800
2801impl Drop for GraphHandle {
2802 fn drop(&mut self) {
2803 self.stop_sync.store(true, Ordering::Relaxed);
2804 let (lock, cvar) = &*self.sync_notify;
2807 let mut pending = lock.lock().unwrap_or_else(|e| e.into_inner());
2808 *pending = true;
2809 cvar.notify_one();
2810 }
2811}
2812
2813impl GraphHandle {
2814 pub fn new(path: &Path) -> std::io::Result<Self> {
2818 let kg = KnowledgeGraph::new(path)?;
2819 let snapshot = Arc::new(kg.snapshot());
2820 let sync_file = Arc::clone(&kg.store.sync_file);
2822 let inner = Arc::new(parking_lot::Mutex::new(kg));
2823
2824 let sync_notify: Arc<(StdMutex<bool>, Condvar)> =
2825 Arc::new((StdMutex::new(false), Condvar::new()));
2826 let notify = Arc::clone(&sync_notify);
2827 let stop_sync = Arc::new(AtomicBool::new(false));
2828
2829 let sync_stop = Arc::clone(&stop_sync);
2833 std::thread::Builder::new()
2834 .name("mcp-memory-sync".into())
2835 .spawn(move || {
2836 let (lock, cvar) = &*notify;
2837 loop {
2838 let mut guard = cvar
2841 .wait_timeout_while(
2842 lock.lock().unwrap_or_else(|e| e.into_inner()),
2843 std::time::Duration::from_secs(1),
2844 |p| !*p,
2845 )
2846 .unwrap_or_else(|e| e.into_inner())
2847 .0;
2848
2849 let should_sync = *guard;
2850 *guard = false;
2851 drop(guard);
2854
2855 if should_sync {
2856 if let Err(e) = sync_file.sync_data() {
2857 tracing::error!("WAL fsync failed: {e}");
2858 }
2859 }
2860
2861 if sync_stop.load(Ordering::Relaxed) {
2862 if let Err(e) = sync_file.sync_data() {
2864 tracing::error!("WAL final fsync failed: {e}");
2865 }
2866 break;
2867 }
2868 }
2869 })
2870 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
2871
2872 Ok(Self {
2873 inner,
2874 snapshot: ArcSwap::new(snapshot),
2875 read_cache: ArcSwap::new(Arc::new(None)),
2876 sync_notify,
2877 stop_sync,
2878 })
2879 }
2880
2881 pub fn read_graph_cached(&self) -> Arc<str> {
2884 if let Some(cached) = self.read_cache.load().as_ref() {
2885 return cached.clone();
2886 }
2887 let graph = self.read();
2888 let json: Arc<str> = Arc::from(graph.read_graph_json().into_boxed_str());
2889 self.read_cache.store(Arc::new(Some(json.clone())));
2890 json
2891 }
2892
2893 pub fn read(&self) -> ReadSnapshot {
2895 (**self.snapshot.load()).clone()
2896 }
2897
2898 pub fn write(&self) -> WriteGuard<'_> {
2901 WriteGuard {
2902 guard: self.inner.lock(),
2903 snapshot: &self.snapshot,
2904 read_cache: &self.read_cache,
2905 sync_notify: &self.sync_notify,
2906 did_publish: false,
2907 }
2908 }
2909}
2910
2911