1use std::collections::VecDeque;
2use std::path::Path;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::{Arc, Condvar, Mutex as StdMutex};
5
6use ahash::{AHashMap, AHashSet};
7use arc_swap::ArcSwap;
8
9use serde::ser::{Serialize, SerializeSeq, SerializeStruct, Serializer};
10
11use crate::errors::{MCSError, Result};
12use crate::intern::{StrId, StringInterner};
13use crate::types::{Entity, Relation, KnowledgeGraphOut};
14use crate::search::SearchIndex;
15use crate::store::{self as store_enc, BinaryStore, RecordKind};
16
17const ENTITY_SLOT_LIVE: u8 = 1;
18const NAME_TABLE_SHARDS: usize = 4;
19
20#[cfg(target_arch = "x86_64")]
25#[inline(always)]
26unsafe fn prefetch_addr(addr: *const u8) {
27 std::arch::x86_64::_mm_prefetch::<3>(addr);
29}
30
31#[cfg(not(target_arch = "x86_64"))]
32#[inline(always)]
33const unsafe fn prefetch_addr(_addr: *const u8) {}
34
35fn sync_parent_dir(path: &Path) -> std::io::Result<()> {
38 let dir = path.parent().filter(|p| !p.as_os_str().is_empty());
39 let dir = match dir {
40 Some(d) => d,
41 None => Path::new("."),
42 };
43 match std::fs::File::open(dir) {
44 Ok(f) => match f.sync_all() {
45 Ok(()) => Ok(()),
46 Err(e) if e.kind() == std::io::ErrorKind::InvalidInput => Ok(()),
48 Err(e) => Err(e),
49 },
50 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
51 Err(e) => Err(e),
52 }
53}
54
55#[derive(Clone)]
64#[cfg_attr(feature = "cache_align", repr(align(64)))]
65pub(crate) struct StoredEntity {
66 state: u8,
67 pub(crate) name: StrId,
68 pub(crate) entity_type: StrId,
69 pub(crate) observations: Vec<StrId>,
70}
71
72impl StoredEntity {
73 pub(crate) const fn is_live(&self) -> bool {
74 self.state == ENTITY_SLOT_LIVE
75 }
76}
77
78#[derive(Clone)]
82#[cfg_attr(feature = "cache_align", repr(align(16)))]
83pub(crate) struct StoredRelation {
84 pub(crate) from: StrId,
85 pub(crate) to: StrId,
86 pub(crate) relation_type: StrId,
87}
88
89pub struct GraphView<'a> {
103 kg: &'a KnowledgeGraph,
104 entities: Vec<&'a StoredEntity>,
105 relations: Vec<&'a StoredRelation>,
106}
107
108impl GraphView<'_> {
109 pub fn to_owned_out(&self) -> KnowledgeGraphOut {
113 KnowledgeGraphOut {
114 entities: self.entities.iter().map(|e| self.kg.entity_to_output(e)).collect(),
115 relations: self.relations.iter().map(|r| self.kg.relation_to_output(r)).collect(),
116 }
117 }
118}
119
120impl Serialize for GraphView<'_> {
121 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
122 let mut st = s.serialize_struct("KnowledgeGraphOut", 2)?;
123 st.serialize_field("entities", &EntityListRef { kg: self.kg, items: &self.entities })?;
124 st.serialize_field("relations", &RelationListRef { kg: self.kg, items: &self.relations })?;
125 st.end()
126 }
127}
128
129struct EntityListRef<'a> { kg: &'a KnowledgeGraph, items: &'a [&'a StoredEntity] }
130impl Serialize for EntityListRef<'_> {
131 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
132 let mut seq = s.serialize_seq(Some(self.items.len()))?;
133 for &e in self.items {
134 seq.serialize_element(&EntityRef { kg: self.kg, e })?;
135 }
136 seq.end()
137 }
138}
139
140struct RelationListRef<'a> { kg: &'a KnowledgeGraph, items: &'a [&'a StoredRelation] }
141impl Serialize for RelationListRef<'_> {
142 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
143 let mut seq = s.serialize_seq(Some(self.items.len()))?;
144 for &r in self.items {
145 seq.serialize_element(&RelationRef { kg: self.kg, r })?;
146 }
147 seq.end()
148 }
149}
150
151struct EntityRef<'a> { kg: &'a KnowledgeGraph, e: &'a StoredEntity }
152impl Serialize for EntityRef<'_> {
153 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
154 let mut st = s.serialize_struct("Entity", 3)?;
155 st.serialize_field("name", self.kg.interner.lookup(self.e.name))?;
156 st.serialize_field("entityType", self.kg.interner.lookup(self.e.entity_type))?;
157 st.serialize_field("observations", &ObsRef { kg: self.kg, obs: &self.e.observations })?;
158 st.end()
159 }
160}
161
162struct ObsRef<'a> { kg: &'a KnowledgeGraph, obs: &'a [StrId] }
163impl Serialize for ObsRef<'_> {
164 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
165 let mut seq = s.serialize_seq(Some(self.obs.len()))?;
166 for &o in self.obs {
167 seq.serialize_element(self.kg.interner.lookup(o))?;
168 }
169 seq.end()
170 }
171}
172
173struct RelationRef<'a> { kg: &'a KnowledgeGraph, r: &'a StoredRelation }
174impl Serialize for RelationRef<'_> {
175 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
176 let mut st = s.serialize_struct("Relation", 3)?;
177 st.serialize_field("from", self.kg.interner.lookup(self.r.from))?;
178 st.serialize_field("to", self.kg.interner.lookup(self.r.to))?;
179 st.serialize_field("relationType", self.kg.interner.lookup(self.r.relation_type))?;
180 st.end()
181 }
182}
183
184#[derive(Clone, Copy, PartialEq, Eq, Debug)]
186pub enum Direction {
187 Out,
189 In,
191 Both,
193}
194
195impl Direction {
196 pub fn parse(s: Option<&str>) -> Self {
198 match s {
199 Some("out") => Direction::Out,
200 Some("in") => Direction::In,
201 _ => Direction::Both,
202 }
203 }
204}
205
206fn sanitize_label(s: &str) -> String {
208 let mut out = String::with_capacity(s.len());
209 for c in s.chars() {
210 match c {
211 '"' => out.push('\''),
212 '\n' | '\r' => out.push(' '),
213 _ => out.push(c),
214 }
215 }
216 out
217}
218
219const EMPTY_SLOT: u8 = 0xFF;
229
230#[inline(always)]
231const fn h2(hash: u64) -> u8 {
232 (hash & 0x7F) as u8
233}
234
235#[inline(always)]
236const fn h1(hash: u64, mask: usize) -> usize {
237 ((hash >> 7) as usize) & mask
238}
239
240#[derive(Clone)]
241struct NameTableShard {
242 ctrl: Vec<u8>, names: Vec<StrId>,
244 slots: Vec<u32>,
245 mask: usize,
246 count: usize,
247}
248
249impl NameTableShard {
250 fn new(capacity: usize) -> Self {
251 let cap = capacity.next_power_of_two().max(16);
252 Self {
253 ctrl: vec![EMPTY_SLOT; cap],
254 names: vec![StrId::EMPTY; cap],
255 slots: vec![u32::MAX; cap],
256 mask: cap - 1,
257 count: 0,
258 }
259 }
260
261 #[inline(always)]
262 fn lookup(&self, hash: u64, name: StrId) -> Option<u32> {
263 let stamp = h2(hash);
264 let mask = self.mask;
265 let mut idx = h1(hash, mask);
266 let ctrl = self.ctrl.as_ptr();
267 let names = self.names.as_ptr();
268 let slots = self.slots.as_ptr();
269 let len = self.ctrl.len();
270
271 for _ in 0..len {
272 let prefetch_idx = idx.wrapping_add(4) & mask;
274 unsafe { prefetch_addr(ctrl.add(prefetch_idx)) };
275
276 unsafe {
278 let c = *ctrl.add(idx);
279 if c & 0x80 != 0 {
281 return None;
282 }
283 if c == stamp && *names.add(idx) == name {
285 return Some(*slots.add(idx));
286 }
287 }
288 idx = (idx + 1) & mask;
289 }
290 None
291 }
292
293 fn insert(&mut self, interner: &StringInterner, hash: u64, name: StrId, slot: u32) {
294 if self.count * 4 > self.ctrl.len() * 3 {
295 self.grow(interner);
296 }
297 let stamp = h2(hash);
298 let mask = self.mask;
299 let mut idx = h1(hash, mask);
300 loop {
301 unsafe {
303 if *self.ctrl.get_unchecked(idx) & 0x80 != 0 {
304 *self.ctrl.get_unchecked_mut(idx) = stamp;
305 *self.names.get_unchecked_mut(idx) = name;
306 *self.slots.get_unchecked_mut(idx) = slot;
307 self.count += 1;
308 return;
309 }
310 }
311 idx = (idx + 1) & mask;
312 }
313 }
314
315 fn remove(&mut self, interner: &StringInterner, hash: u64, name: StrId) {
316 let stamp = h2(hash);
317 let mask = self.mask;
318 let mut idx = h1(hash, mask);
319 let len = self.ctrl.len();
320 for _ in 0..len {
321 if self.ctrl[idx] & 0x80 != 0 {
322 return;
323 }
324 if self.ctrl[idx] == stamp && self.names[idx] == name {
325 self.ctrl[idx] = EMPTY_SLOT;
327 self.names[idx] = StrId::EMPTY;
328 self.slots[idx] = u32::MAX;
329 self.count -= 1;
330
331 let mut next = (idx + 1) & mask;
332 while self.ctrl[next] & 0x80 == 0 {
333 let nn = self.names[next];
334 let ns = self.slots[next];
335 let nh = interner.get_hash(nn);
338 self.ctrl[next] = EMPTY_SLOT;
339 self.names[next] = StrId::EMPTY;
340 self.slots[next] = u32::MAX;
341 self.count -= 1;
342
343 let nstamp = h2(nh);
345 let mut re_idx = h1(nh, mask);
346 while self.ctrl[re_idx] & 0x80 == 0 {
347 re_idx = (re_idx + 1) & mask;
348 }
349 self.ctrl[re_idx] = nstamp;
350 self.names[re_idx] = nn;
351 self.slots[re_idx] = ns;
352 self.count += 1;
353
354 next = (next + 1) & mask;
355 }
356 return;
357 }
358 idx = (idx + 1) & mask;
359 }
360 }
361
362 fn grow(&mut self, interner: &StringInterner) {
363 let new_cap = self.ctrl.len() * 2;
364 let new_mask = new_cap - 1;
365 let mut new_ctrl = vec![EMPTY_SLOT; new_cap];
366 let mut new_names = vec![StrId::EMPTY; new_cap];
367 let mut new_slots = vec![u32::MAX; new_cap];
368
369 for i in 0..self.ctrl.len() {
370 if self.ctrl[i] & 0x80 == 0 {
371 let name = self.names[i];
373 let hash = interner.get_hash(name);
374 let stamp = h2(hash);
375 let mut idx = h1(hash, new_mask);
376 while new_ctrl[idx] & 0x80 == 0 {
377 idx = (idx + 1) & new_mask;
378 }
379 new_ctrl[idx] = stamp;
380 new_names[idx] = name;
381 new_slots[idx] = self.slots[i];
382 }
383 }
384
385 self.ctrl = new_ctrl;
386 self.names = new_names;
387 self.slots = new_slots;
388 self.mask = new_mask;
389 }
390}
391
392#[derive(Clone)]
393struct ShardedNameTable {
394 shards: [NameTableShard; NAME_TABLE_SHARDS],
395}
396
397impl ShardedNameTable {
398 fn new(capacity_per_shard: usize) -> Self {
399 Self {
400 shards: [
401 NameTableShard::new(capacity_per_shard),
402 NameTableShard::new(capacity_per_shard),
403 NameTableShard::new(capacity_per_shard),
404 NameTableShard::new(capacity_per_shard),
405 ],
406 }
407 }
408
409 #[inline(always)]
410 const fn shard(hash: u64) -> usize {
411 (hash as usize) & (NAME_TABLE_SHARDS - 1)
412 }
413
414 #[inline(always)]
415 fn lookup(&self, hash: u64, name: StrId) -> Option<u32> {
416 self.shards[Self::shard(hash)].lookup(hash, name)
417 }
418
419 #[inline(always)]
420 fn insert(&mut self, interner: &StringInterner, hash: u64, name: StrId, slot: u32) {
421 self.shards[Self::shard(hash)].insert(interner, hash, name, slot);
422 }
423
424 #[inline(always)]
425 fn remove(&mut self, interner: &StringInterner, hash: u64, name: StrId) {
426 self.shards[Self::shard(hash)].remove(interner, hash, name);
427 }
428}
429
430pub struct KnowledgeGraph {
434 interner: StringInterner,
435 entity_slots: Vec<Option<StoredEntity>>,
436 free_slots: Vec<u32>,
439 name_table: ShardedNameTable,
440 relations: Vec<StoredRelation>,
441 adjacency: AHashMap<StrId, Vec<(StrId, StrId)>>,
445 search: SearchIndex,
446 store: BinaryStore,
447}
448
449#[derive(Clone)]
454pub struct ReadSnapshot {
455 pub(crate) interner: StringInterner,
456 pub(crate) entity_slots: Arc<[Option<StoredEntity>]>,
457 #[allow(dead_code)]
458 free_slots: Vec<u32>,
459 name_table: ShardedNameTable,
460 pub(crate) relations: Arc<[StoredRelation]>,
461 adjacency: AHashMap<StrId, Vec<(StrId, StrId)>>,
462 search: SearchIndex,
463}
464
465pub(crate) fn push_json_str(buf: &mut String, s: &str) {
468 buf.push('"');
469 for c in s.chars() {
470 match c {
471 '"' => buf.push_str("\\\""),
472 '\\' => buf.push_str("\\\\"),
473 '\n' => buf.push_str("\\n"),
474 '\r' => buf.push_str("\\r"),
475 '\t' => buf.push_str("\\t"),
476 c if c.is_control() => {
477 use std::fmt::Write;
478 write!(buf, "\\u{:04x}", c as u32).unwrap();
479 }
480 c => buf.push(c),
481 }
482 }
483 buf.push('"');
484}
485
486pub struct ReadGraphView<'a> {
489 snap: &'a ReadSnapshot,
490 entities: Vec<&'a StoredEntity>,
491 relations: Vec<&'a StoredRelation>,
492}
493
494impl Serialize for ReadGraphView<'_> {
495 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
496 let mut st = s.serialize_struct("KnowledgeGraphOut", 2)?;
497 st.serialize_field("entities", &ReadEntityListRef { snap: self.snap, items: &self.entities })?;
498 st.serialize_field("relations", &ReadRelationListRef { snap: self.snap, items: &self.relations })?;
499 st.end()
500 }
501}
502
503struct ReadEntityListRef<'a> { snap: &'a ReadSnapshot, items: &'a [&'a StoredEntity] }
504impl Serialize for ReadEntityListRef<'_> {
505 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
506 let mut seq = s.serialize_seq(Some(self.items.len()))?;
507 for &e in self.items {
508 seq.serialize_element(&ReadEntityRef { snap: self.snap, e })?;
509 }
510 seq.end()
511 }
512}
513
514struct ReadRelationListRef<'a> { snap: &'a ReadSnapshot, items: &'a [&'a StoredRelation] }
515impl Serialize for ReadRelationListRef<'_> {
516 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
517 let mut seq = s.serialize_seq(Some(self.items.len()))?;
518 for &r in self.items {
519 seq.serialize_element(&ReadRelationRef { snap: self.snap, r })?;
520 }
521 seq.end()
522 }
523}
524
525struct ReadEntityRef<'a> { snap: &'a ReadSnapshot, e: &'a StoredEntity }
526impl Serialize for ReadEntityRef<'_> {
527 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
528 let mut st = s.serialize_struct("Entity", 3)?;
529 st.serialize_field("name", self.snap.interner.lookup(self.e.name))?;
530 st.serialize_field("entityType", self.snap.interner.lookup(self.e.entity_type))?;
531 st.serialize_field("observations", &ReadObsRef { snap: self.snap, obs: &self.e.observations })?;
532 st.end()
533 }
534}
535
536struct ReadObsRef<'a> { snap: &'a ReadSnapshot, obs: &'a [StrId] }
537impl Serialize for ReadObsRef<'_> {
538 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
539 let mut seq = s.serialize_seq(Some(self.obs.len()))?;
540 for &o in self.obs {
541 seq.serialize_element(self.snap.interner.lookup(o))?;
542 }
543 seq.end()
544 }
545}
546
547struct ReadRelationRef<'a> { snap: &'a ReadSnapshot, r: &'a StoredRelation }
548impl Serialize for ReadRelationRef<'_> {
549 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
550 let mut st = s.serialize_struct("Relation", 3)?;
551 st.serialize_field("from", self.snap.interner.lookup(self.r.from))?;
552 st.serialize_field("to", self.snap.interner.lookup(self.r.to))?;
553 st.serialize_field("relationType", self.snap.interner.lookup(self.r.relation_type))?;
554 st.end()
555 }
556}
557
558impl ReadSnapshot {
560
561 pub fn read_graph_json(&self) -> String {
564 let cap = self.entity_slots.len() * 64 + self.relations.len() * 60 + 128;
566 let mut buf = String::with_capacity(cap);
567
568 buf.push_str(r#"{"entities":["#);
570 let mut first = true;
571 for slot in self.entity_slots.iter() {
572 let Some(e) = slot.as_ref().filter(|e| e.is_live()) else { continue };
573 if first { first = false } else { buf.push(',') }
574 buf.push('{');
575 buf.push_str(r#""name":"#);
577 push_json_str(&mut buf, self.interner.lookup(e.name));
578 buf.push(',');
579 buf.push_str(r#""entityType":"#);
581 push_json_str(&mut buf, self.interner.lookup(e.entity_type));
582 buf.push(',');
583 buf.push_str(r#""observations":["#);
585 for (oi, o) in e.observations.iter().enumerate() {
586 if oi > 0 { buf.push(',') }
587 push_json_str(&mut buf, self.interner.lookup(*o));
588 }
589 buf.push_str("]}");
590 }
591
592 buf.push_str(r#"],"relations":["#);
594 first = true;
595 for r in self.relations.iter() {
596 if first { first = false } else { buf.push(',') }
597 buf.push('{');
598 buf.push_str(r#""from":"#);
599 push_json_str(&mut buf, self.interner.lookup(r.from));
600 buf.push(',');
601 buf.push_str(r#""to":"#);
602 push_json_str(&mut buf, self.interner.lookup(r.to));
603 buf.push(',');
604 buf.push_str(r#""relationType":"#);
605 push_json_str(&mut buf, self.interner.lookup(r.relation_type));
606 buf.push('}');
607 }
608 buf.push_str("]}");
609
610 buf
611 }
612
613 pub fn read_graph_view(&self) -> ReadGraphView<'_> {
616 let entities: Vec<&StoredEntity> = self
617 .entity_slots
618 .iter()
619 .filter_map(|s| s.as_ref().filter(|e| e.is_live()))
620 .collect();
621 let relations: Vec<&StoredRelation> = self.relations.iter().collect();
622 ReadGraphView { snap: self, entities, relations }
623 }
624
625 fn lookup_live_slot(&self, name: &str) -> Option<u32> {
626 let name_id = self.interner.get_optional(name)?;
627 let hash = self.interner.get_hash(name_id);
628 let slot = self.name_table.lookup(hash, name_id)?;
629 self.entity_slots
630 .get(slot as usize)
631 .and_then(|s| s.as_ref())
632 .filter(|e| e.is_live())?;
633 Some(slot)
634 }
635
636 fn entity_by_name_id(&self, name_id: StrId) -> Option<Entity> {
637 let hash = self.interner.get_hash(name_id);
638 let slot = self.name_table.lookup(hash, name_id)?;
639 let e = self.entity_slots.get(slot as usize)?.as_ref()?;
640 Some(self.entity_to_output(e))
641 }
642
643 pub(crate) fn entity_to_output(&self, e: &StoredEntity) -> Entity {
644 Entity {
645 name: self.interner.lookup(e.name).to_string(),
646 entity_type: self.interner.lookup(e.entity_type).to_string(),
647 observations: e
648 .observations
649 .iter()
650 .map(|o| self.interner.lookup(*o).to_string())
651 .collect(),
652 }
653 }
654
655 pub(crate) fn relation_to_output(&self, r: &StoredRelation) -> Relation {
656 Relation {
657 from: self.interner.lookup(r.from).to_string(),
658 to: self.interner.lookup(r.to).to_string(),
659 relation_type: self.interner.lookup(r.relation_type).to_string(),
660 }
661 }
662
663 pub fn open_nodes(&self, names: &[String]) -> KnowledgeGraphOut {
665 let name_ids: std::collections::HashSet<StrId> = names
666 .iter()
667 .filter_map(|n| self.interner.get_optional(n))
668 .collect();
669 let entities: Vec<Entity> = self
670 .entity_slots
671 .iter()
672 .filter_map(|s| {
673 let e = s.as_ref()?;
674 if e.is_live() && name_ids.contains(&e.name) {
675 Some(self.entity_to_output(e))
676 } else {
677 None
678 }
679 })
680 .collect();
681 let matched: std::collections::HashSet<StrId> = entities.iter()
682 .filter_map(|e| self.interner.get_optional(&e.name))
683 .collect();
684 let relations: Vec<Relation> = self
685 .relations
686 .iter()
687 .filter(|r| matched.contains(&r.from) || matched.contains(&r.to))
688 .map(|r| self.relation_to_output(r))
689 .collect();
690 KnowledgeGraphOut { entities, relations }
691 }
692
693 pub fn read_graph(&self) -> KnowledgeGraphOut {
695 let entities: Vec<Entity> = self
696 .entity_slots
697 .iter()
698 .filter_map(|s| s.as_ref().filter(|e| e.is_live()))
699 .map(|e| self.entity_to_output(e))
700 .collect();
701 let relations: Vec<Relation> = self
702 .relations
703 .iter()
704 .map(|r| self.relation_to_output(r))
705 .collect();
706 KnowledgeGraphOut { entities, relations }
707 }
708
709 pub fn get_entity(&self, name: &str) -> Option<Entity> {
711 self.lookup_live_slot(name)?;
712 let name_id = self.interner.get_optional(name)?;
713 self.entity_by_name_id(name_id)
714 }
715
716 pub fn neighbors(
718 &self,
719 name: &str,
720 direction: Direction,
721 rtype: Option<&str>,
722 depth: u32,
723 ) -> Result<KnowledgeGraphOut> {
724 self.lookup_live_slot(name)
725 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
726 let start = self.interner.get_optional(name).unwrap();
727
728 let rtype_id = match rtype {
729 Some(r) => match self.interner.get_optional(r) {
730 Some(id) => Some(id),
731 None => {
732 let entities = self.entity_by_name_id(start).into_iter().collect();
733 return Ok(KnowledgeGraphOut { entities, relations: Vec::new() });
734 }
735 },
736 None => None,
737 };
738
739 let mut visited: AHashSet<StrId> = AHashSet::new();
740 visited.insert(start);
741
742 let type_ok = |r: &StoredRelation, rt: Option<StrId>| rt.is_none_or(|rt_id| r.relation_type == rt_id);
743
744 if depth == 1 {
745 for r in self.relations.iter().filter(|r| type_ok(r, rtype_id)) {
746 match direction {
747 Direction::Out => {
748 if r.from == start { visited.insert(r.to); }
749 }
750 Direction::In => {
751 if r.to == start { visited.insert(r.from); }
752 }
753 Direction::Both => {
754 if r.from == start { visited.insert(r.to); }
755 else if r.to == start { visited.insert(r.from); }
756 }
757 }
758 }
759 } else if depth >= 2 {
760 let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
761 match direction {
762 Direction::Both => {
763 for (&node, edges) in &self.adjacency {
764 for &(nb, rt) in edges {
765 if rtype_id.is_none_or(|rt_id| rt == rt_id) {
766 adj.entry(node).or_default().push(nb);
767 }
768 }
769 }
770 }
771 Direction::Out | Direction::In => {
772 for r in self.relations.iter().filter(|r| type_ok(r, rtype_id)) {
773 match direction {
774 Direction::Out => adj.entry(r.from).or_default().push(r.to),
775 Direction::In => adj.entry(r.to).or_default().push(r.from),
776 _ => unreachable!(),
777 }
778 }
779 }
780 }
781 let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
782 queue.push_back((start, 0));
783 while let Some((node, d)) = queue.pop_front() {
784 if d >= depth { continue; }
785 if let Some(nbrs) = adj.get(&node) {
786 for &nb in nbrs {
787 if visited.insert(nb) {
788 queue.push_back((nb, d + 1));
789 }
790 }
791 }
792 }
793 }
794
795 let mut entities = Vec::with_capacity(visited.len());
796 for &nid in &visited {
797 if let Some(e) = self.entity_by_name_id(nid) {
798 entities.push(e);
799 }
800 }
801 let relations: Vec<Relation> = self
802 .relations
803 .iter()
804 .filter(|r| type_ok(r, rtype_id) && visited.contains(&r.from) && visited.contains(&r.to))
805 .map(|r| self.relation_to_output(r))
806 .collect();
807 Ok(KnowledgeGraphOut { entities, relations })
808 }
809
810 pub fn describe_entity(&self, name: &str) -> Result<serde_json::Value> {
812 let name_id = self
813 .interner
814 .get_optional(name)
815 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
816 let entity = self
817 .entity_by_name_id(name_id)
818 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
819
820 let mut incident: Vec<Relation> = Vec::new();
821 let mut neighbor_seen: AHashSet<StrId> = AHashSet::new();
822 let mut neighbors: Vec<&str> = Vec::new();
823 for r in self.relations.iter() {
824 if r.from == name_id || r.to == name_id {
825 incident.push(self.relation_to_output(r));
826 let other = if r.from == name_id { r.to } else { r.from };
827 if other != name_id && neighbor_seen.insert(other) {
828 neighbors.push(self.interner.lookup(other));
829 }
830 }
831 }
832
833 Ok(serde_json::json!({
834 "entity": entity,
835 "relations": incident,
836 "neighbors": neighbors,
837 "degree": incident.len(),
838 }))
839 }
840
841 pub fn find_path(&self, from: &str, to: &str) -> Result<Vec<String>> {
843 let from_id = self
844 .interner
845 .get_optional(from)
846 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
847 let to_id = self
848 .interner
849 .get_optional(to)
850 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
851 if self.lookup_live_slot(from).is_none() {
852 return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
853 }
854 if self.lookup_live_slot(to).is_none() {
855 return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
856 }
857
858 let mut visited: AHashSet<StrId> = AHashSet::new();
860 let mut parent: AHashMap<StrId, StrId> = AHashMap::new();
861 let mut queue: VecDeque<StrId> = VecDeque::new();
862
863 visited.insert(from_id);
864 queue.push_back(from_id);
865
866 while let Some(current) = queue.pop_front() {
867 if current == to_id { break; }
868 if let Some(neighbors) = self.adjacency.get(¤t) {
869 for &(neighbor, _) in neighbors {
870 if visited.insert(neighbor) {
871 parent.insert(neighbor, current);
872 queue.push_back(neighbor);
873 }
874 }
875 }
876 }
877
878 if !visited.contains(&to_id) {
879 return Err(MCSError::MemoryError(format!(
880 "No path found between '{from}' and '{to}'"
881 )));
882 }
883
884 let mut path = Vec::new();
885 let mut cur = to_id;
886 path.push(self.interner.lookup(cur).to_string());
887 while let Some(&p) = parent.get(&cur) {
888 path.push(self.interner.lookup(p).to_string());
889 cur = p;
890 }
891 path.reverse();
892 Ok(path)
893 }
894
895 pub fn extract_subgraph(&self, names: &[String], depth: u32) -> Result<KnowledgeGraphOut> {
897 if names.is_empty() {
898 return Ok(KnowledgeGraphOut { entities: Vec::new(), relations: Vec::new() });
899 }
900 let mut visited: AHashSet<StrId> = AHashSet::new();
901 let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
902 for name in names {
903 if let Some(id) = self.interner.get_optional(name)
904 && visited.insert(id)
905 {
906 queue.push_back((id, 0));
907 }
908 }
909 let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
910 for (&node, edges) in &self.adjacency {
911 let nbrs: Vec<StrId> = edges.iter().map(|(to, _)| *to).collect();
912 adj.insert(node, nbrs);
913 }
914 while let Some((node, d)) = queue.pop_front() {
915 if d >= depth { continue; }
916 if let Some(nbrs) = adj.get(&node) {
917 for &nb in nbrs {
918 if visited.insert(nb) {
919 queue.push_back((nb, d + 1));
920 }
921 }
922 }
923 }
924 let mut entities: Vec<Entity> = Vec::with_capacity(visited.len());
925 for &nid in &visited {
926 if let Some(e) = self.entity_by_name_id(nid) {
927 entities.push(e);
928 }
929 }
930 let relations: Vec<Relation> = self
931 .relations
932 .iter()
933 .filter(|r| visited.contains(&r.from) && visited.contains(&r.to))
934 .map(|r| self.relation_to_output(r))
935 .collect();
936 Ok(KnowledgeGraphOut { entities, relations })
937 }
938
939 pub fn batch_get_entities(&self, names: &[String]) -> Vec<Option<Entity>> {
941 names.iter().map(|n| self.get_entity(n)).collect()
942 }
943
944 pub fn graph_stats(&self) -> serde_json::Value {
946 let entity_count = self
947 .entity_slots
948 .iter()
949 .filter(|s| s.as_ref().is_some_and(|e| e.is_live()))
950 .count();
951 let relation_count = self.relations.len();
952 let type_counts = self.entity_type_counts();
953 let relation_type_counts = self.relation_type_counts();
954 serde_json::json!({
955 "entities": entity_count,
956 "relations": relation_count,
957 "entityTypes": type_counts,
958 "relationTypes": relation_type_counts,
959 })
960 }
961
962 pub fn search_relations(&self, from: Option<&str>, to: Option<&str>, rtype: Option<&str>) -> Vec<Relation> {
964 let from_id = from.and_then(|n| self.interner.get_optional(n));
965 let to_id = to.and_then(|n| self.interner.get_optional(n));
966 let rtype_id = rtype.and_then(|n| self.interner.get_optional(n));
967 self.relations
968 .iter()
969 .filter(|r| {
970 from_id.is_none_or(|id| r.from == id)
971 && to_id.is_none_or(|id| r.to == id)
972 && rtype_id.is_none_or(|id| r.relation_type == id)
973 })
974 .map(|r| self.relation_to_output(r))
975 .collect()
976 }
977
978 pub fn entity_type_counts(&self) -> Vec<(String, usize)> {
980 let mut counts: AHashMap<StrId, usize> = AHashMap::new();
981 for slot in self.entity_slots.iter() {
982 if let Some(e) = slot.as_ref().filter(|e| e.is_live()) {
983 *counts.entry(e.entity_type).or_default() += 1;
984 }
985 }
986 let mut result: Vec<(String, usize)> = counts
987 .into_iter()
988 .map(|(id, c)| (self.interner.lookup(id).to_string(), c))
989 .collect();
990 result.sort_by(|a, b| a.0.cmp(&b.0));
991 result
992 }
993
994 pub fn relation_type_counts(&self) -> Vec<(String, usize)> {
996 let mut counts: AHashMap<StrId, usize> = AHashMap::new();
997 for r in self.relations.iter() {
998 *counts.entry(r.relation_type).or_default() += 1;
999 }
1000 let mut result: Vec<(String, usize)> = counts
1001 .into_iter()
1002 .map(|(id, c)| (self.interner.lookup(id).to_string(), c))
1003 .collect();
1004 result.sort_by(|a, b| a.0.cmp(&b.0));
1005 result
1006 }
1007
1008 pub fn export(&self, format: &str) -> Result<String> {
1010 match format {
1011 "json" => serde_json::to_string(&self.read_graph()).map_err(MCSError::JsonError),
1012 "mermaid" => Ok(self.export_mermaid()),
1013 "dot" => Ok(self.export_dot()),
1014 other => Err(MCSError::InvalidParams(format!(
1015 "Unknown export format '{other}' (expected json|mermaid|dot)"
1016 ))),
1017 }
1018 }
1019
1020 fn export_mermaid(&self) -> String {
1021 let mut out = String::with_capacity(4096);
1022 out.push_str("graph LR\n");
1023 for r in self.relations.iter() {
1024 let from = sanitize_label(self.interner.lookup(r.from));
1025 let to = sanitize_label(self.interner.lookup(r.to));
1026 let rt = sanitize_label(self.interner.lookup(r.relation_type));
1027 out.push_str(&format!(" {} -- \"{}\" --> {}\n", from, rt, to));
1028 }
1029 out
1030 }
1031
1032 fn export_dot(&self) -> String {
1033 let mut out = String::with_capacity(4096);
1034 out.push_str("digraph KG {\n");
1035 out.push_str(" rankdir=LR;\n");
1036 for slot in self.entity_slots.iter() {
1037 if let Some(e) = slot.as_ref().filter(|e| e.is_live()) {
1038 let name = sanitize_label(self.interner.lookup(e.name));
1039 let etype = sanitize_label(self.interner.lookup(e.entity_type));
1040 out.push_str(&format!(" \"{}\" [label=\"{}\n({})\"];\n", name, name, etype));
1041 }
1042 }
1043 for r in self.relations.iter() {
1044 let from = sanitize_label(self.interner.lookup(r.from));
1045 let to = sanitize_label(self.interner.lookup(r.to));
1046 let rt = sanitize_label(self.interner.lookup(r.relation_type));
1047 out.push_str(&format!(" \"{}\" -> \"{}\" [label=\"{}\"];\n", from, to, rt));
1048 }
1049 out.push_str("}\n");
1050 out
1051 }
1052
1053 pub fn find_all_paths(
1055 &self,
1056 from: &str,
1057 to: &str,
1058 max_depth: usize,
1059 max_paths: usize,
1060 ) -> Result<Vec<Vec<String>>> {
1061 let from_id = self
1062 .interner
1063 .get_optional(from)
1064 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
1065 let to_id = self
1066 .interner
1067 .get_optional(to)
1068 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
1069 if self.lookup_live_slot(from).is_none() {
1070 return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
1071 }
1072 if self.lookup_live_slot(to).is_none() {
1073 return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
1074 }
1075 if from_id == to_id {
1076 return Ok(vec![vec![from.to_string()]]);
1077 }
1078 let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::with_capacity(self.adjacency.len());
1079 for (&node, edges) in &self.adjacency {
1080 let nbrs: Vec<StrId> = edges.iter().map(|(to, _)| *to).collect();
1081 adj.insert(node, nbrs);
1082 }
1083 let mut all_paths: Vec<Vec<StrId>> = Vec::new();
1084 let mut current_path = Vec::new();
1085 let mut visited: AHashSet<StrId> = AHashSet::new();
1086 visited.insert(from_id);
1087 current_path.push(from_id);
1088 Self::dfs_all_paths(
1089 &adj,
1090 from_id,
1091 to_id,
1092 max_depth,
1093 max_paths,
1094 &mut visited,
1095 &mut current_path,
1096 &mut all_paths,
1097 );
1098 if all_paths.is_empty() {
1099 return Err(MCSError::MemoryError(format!(
1100 "No path found between '{from}' and '{to}'"
1101 )));
1102 }
1103 let result: Vec<Vec<String>> = all_paths
1104 .into_iter()
1105 .map(|path| {
1106 path.into_iter()
1107 .map(|id| self.interner.lookup(id).to_string())
1108 .collect()
1109 })
1110 .collect();
1111 Ok(result)
1112 }
1113
1114 fn dfs_all_paths(
1115 adj: &AHashMap<StrId, Vec<StrId>>,
1116 current: StrId,
1117 target: StrId,
1118 max_depth: usize,
1119 max_paths: usize,
1120 visited: &mut AHashSet<StrId>,
1121 current_path: &mut Vec<StrId>,
1122 all_paths: &mut Vec<Vec<StrId>>,
1123 ) {
1124 if all_paths.len() >= max_paths { return; }
1125 if current == target && current_path.len() > 1 {
1126 all_paths.push(current_path.clone());
1127 return;
1128 }
1129 if current_path.len() > max_depth { return; }
1130 if let Some(neighbors) = adj.get(¤t) {
1131 for &nb in neighbors {
1132 if !visited.contains(&nb) {
1133 visited.insert(nb);
1134 current_path.push(nb);
1135 Self::dfs_all_paths(adj, nb, target, max_depth, max_paths, visited, current_path, all_paths);
1136 current_path.pop();
1137 visited.remove(&nb);
1138 }
1139 }
1140 }
1141 }
1142
1143 pub fn search_entities(&self, query: &str) -> Result<Vec<Entity>> {
1145 let token = query.to_lowercase();
1146 let matching = self.search.search(&token, &self.interner);
1147 Ok(matching
1148 .iter()
1149 .filter_map(|idx| {
1150 self.entity_slots
1151 .get(*idx as usize)?
1152 .as_ref()
1153 .filter(|e| e.is_live())
1154 .map(|e| self.entity_to_output(e))
1155 })
1156 .collect())
1157 }
1158}
1159
1160impl KnowledgeGraph {
1161 pub fn new(path: &Path) -> std::io::Result<Self> {
1162 Self::open(path, None)
1163 }
1164
1165 fn open(
1168 path: &Path,
1169 sync_slot: Option<Arc<arc_swap::ArcSwap<std::fs::File>>>,
1170 ) -> std::io::Result<Self> {
1171 let store = BinaryStore::new_with_slot(path, sync_slot)?;
1172
1173 let mut interner = StringInterner::with_capacity(65536, 1024);
1175 let mut entity_slots: Vec<Option<StoredEntity>> = Vec::with_capacity(256);
1176 let mut name_table = ShardedNameTable::new(64);
1177 let mut relations: Vec<StoredRelation> = Vec::with_capacity(64);
1178 let mut search = SearchIndex::new();
1179
1180 let mut pending: Option<Vec<(RecordKind, Vec<u8>)>> = None;
1185 store.replay(|kind, data| {
1186 match kind {
1187 RecordKind::TxnBegin => pending = Some(Vec::new()),
1188 RecordKind::TxnCommit => {
1189 if let Some(buffered) = pending.take() {
1190 for (k, d) in &buffered {
1191 Self::apply_record(
1192 *k, d, &mut interner, &mut entity_slots, &mut search,
1193 &mut name_table, &mut relations,
1194 );
1195 }
1196 }
1197 }
1198 other => match pending.as_mut() {
1199 Some(buffered) => buffered.push((other, data.to_vec())),
1200 None => Self::apply_record(
1201 other, data, &mut interner, &mut entity_slots, &mut search,
1202 &mut name_table, &mut relations,
1203 ),
1204 },
1205 }
1206 })?;
1207
1208 let free_slots: Vec<u32> = entity_slots
1210 .iter()
1211 .enumerate()
1212 .filter(|(_, s)| s.is_none())
1213 .map(|(i, _)| i as u32)
1214 .collect();
1215
1216 let mut adjacency: AHashMap<StrId, Vec<(StrId, StrId)>> = AHashMap::new();
1217 for rel in &relations {
1218 adjacency.entry(rel.from).or_default().push((rel.to, rel.relation_type));
1219 adjacency.entry(rel.to).or_default().push((rel.from, rel.relation_type));
1220 }
1221
1222 Ok(Self {
1223 interner,
1224 entity_slots,
1225 free_slots,
1226 name_table,
1227 relations,
1228 adjacency,
1229 search,
1230 store,
1231 })
1232 }
1233
1234 #[allow(clippy::too_many_arguments)]
1242 fn apply_record(
1243 kind: RecordKind,
1244 data: &[u8],
1245 interner: &mut StringInterner,
1246 entity_slots: &mut Vec<Option<StoredEntity>>,
1247 search: &mut SearchIndex,
1248 name_table: &mut ShardedNameTable,
1249 relations: &mut Vec<StoredRelation>,
1250 ) {
1251 match kind {
1252 RecordKind::CreateEntity => {
1253 if let Some((name, etype, obs)) = store_enc::decode_create_entity(data) {
1254 Self::replay_create_entity(
1255 interner, entity_slots, search, name_table, name, etype, &obs,
1256 );
1257 }
1258 }
1259 RecordKind::CreateRelation => {
1260 if let Some((from, to, rtype)) = store_enc::decode_create_relation(data) {
1261 let from_id = interner.intern(from);
1262 let to_id = interner.intern(to);
1263 let type_id = interner.intern(rtype);
1264 relations.push(StoredRelation {
1265 from: from_id,
1266 to: to_id,
1267 relation_type: type_id,
1268 });
1269 }
1270 }
1271 RecordKind::AddObservations => {
1272 if let Some((name, obs)) = store_enc::decode_add_observations(data) {
1273 Self::replay_add_observations(
1274 interner, entity_slots, search, name_table, name, &obs,
1275 );
1276 }
1277 }
1278 RecordKind::DeleteEntity => {
1279 if let Some(name) = store_enc::decode_delete_entity(data) {
1280 Self::replay_delete_entity(
1281 interner, entity_slots, relations, search, name_table, name,
1282 );
1283 }
1284 }
1285 RecordKind::DeleteObservations => {
1286 if let Some((name, obs)) = store_enc::decode_delete_observations(data) {
1287 Self::replay_delete_observations(
1288 interner, entity_slots, search, name_table, name, &obs,
1289 );
1290 }
1291 }
1292 RecordKind::DeleteRelation => {
1293 if let Some((from, to, rtype)) = store_enc::decode_delete_relation(data) {
1294 let from_id = interner.intern(from);
1295 let to_id = interner.intern(to);
1296 let type_id = interner.intern(rtype);
1297 relations.retain(|r| {
1298 !(r.from == from_id && r.to == to_id && r.relation_type == type_id)
1299 });
1300 }
1301 }
1302 RecordKind::TxnBegin | RecordKind::TxnCommit => {}
1303 }
1304 }
1305
1306 #[allow(clippy::ptr_arg)]
1307 fn replay_create_entity(
1308 interner: &mut StringInterner,
1309 entities: &mut Vec<Option<StoredEntity>>,
1310 search: &mut SearchIndex,
1311 name_table: &mut ShardedNameTable,
1312 name: &str,
1313 etype: &str,
1314 observations: &[&str],
1315 ) {
1316 let name_id = interner.intern(name);
1317 let type_id = interner.intern(etype);
1318 let obs_ids: Vec<StrId> = observations.iter().map(|o| interner.intern(o)).collect();
1319 let slot = entities.len() as u32;
1320 entities.push(Some(StoredEntity {
1321 state: ENTITY_SLOT_LIVE,
1322 name: name_id,
1323 entity_type: type_id,
1324 observations: obs_ids.clone(),
1325 }));
1326 let hash = interner.get_hash(name_id);
1327 name_table.insert(&*interner, hash, name_id, slot);
1328 search.index_entity(interner, slot, name_id, type_id, &obs_ids);
1329 }
1330
1331 fn replay_add_observations(
1332 interner: &mut StringInterner,
1333 entities: &mut [Option<StoredEntity>],
1334 search: &mut SearchIndex,
1335 name_table: &mut ShardedNameTable,
1336 name: &str,
1337 observations: &[&str],
1338 ) {
1339 let name_id = interner.intern(name);
1340 let hash = interner.get_hash(name_id);
1341 if let Some(slot) = name_table.lookup(hash, name_id)
1342 && let Some(Some(entity)) = entities.get_mut(slot as usize)
1343 {
1344 for &o in observations {
1345 let oid = interner.intern(o);
1346 if !entity.observations.contains(&oid) {
1347 entity.observations.push(oid);
1348 }
1349 }
1350 search.remove_entity(slot);
1351 search.index_entity(
1352 interner,
1353 slot,
1354 entity.name,
1355 entity.entity_type,
1356 &entity.observations,
1357 );
1358 }
1359 }
1360
1361 fn replay_delete_entity(
1362 interner: &mut StringInterner,
1363 entities: &mut [Option<StoredEntity>],
1364 rels: &mut Vec<StoredRelation>,
1365 search: &mut SearchIndex,
1366 name_table: &mut ShardedNameTable,
1367 name: &str,
1368 ) {
1369 let name_id = interner.intern(name);
1370 let hash = interner.get_hash(name_id);
1371 if let Some(slot) = name_table.lookup(hash, name_id)
1372 && let Some(Some(_)) = entities.get(slot as usize)
1373 {
1374 entities[slot as usize] = None;
1375 search.remove_entity(slot);
1376 name_table.remove(&*interner, hash, name_id);
1377 }
1378 rels.retain(|r| r.from != name_id && r.to != name_id);
1379 }
1380
1381 fn replay_delete_observations(
1382 interner: &mut StringInterner,
1383 entities: &mut [Option<StoredEntity>],
1384 search: &mut SearchIndex,
1385 name_table: &mut ShardedNameTable,
1386 name: &str,
1387 observations: &[&str],
1388 ) {
1389 let name_id = interner.intern(name);
1390 let hash = interner.get_hash(name_id);
1391 if let Some(slot) = name_table.lookup(hash, name_id)
1392 && let Some(Some(entity)) = entities.get_mut(slot as usize)
1393 {
1394 let remove_ids: Vec<StrId> = observations.iter().map(|o| interner.intern(o)).collect();
1395 entity.observations.retain(|o| !remove_ids.contains(o));
1396 search.remove_entity(slot);
1397 search.index_entity(
1398 interner,
1399 slot,
1400 entity.name,
1401 entity.entity_type,
1402 &entity.observations,
1403 );
1404 }
1405 }
1406
1407 pub const fn interner(&self) -> &StringInterner {
1412 &self.interner
1413 }
1414
1415 pub fn get_entity(&self, name: &str) -> Option<Entity> {
1417 let name_id = self.interner.get_optional(name)?;
1418 let hash = self.interner.get_hash(name_id);
1419 let slot = self.name_table.lookup(hash, name_id)?;
1420 let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
1421 if !stored.is_live() {
1422 return None;
1423 }
1424 Some(self.entity_to_output(stored))
1425 }
1426
1427 pub fn graph_stats(&self) -> serde_json::Value {
1429 let live_entities = self
1430 .entity_slots
1431 .iter()
1432 .filter(|s| s.as_ref().is_some_and(|e| e.is_live()))
1433 .count();
1434 let total_relations = self.relations.len();
1435 let index_entries = self.search.len();
1436 let total_obs: usize = self
1437 .entity_slots
1438 .iter()
1439 .filter_map(|s| s.as_ref())
1440 .filter(|e| e.is_live())
1441 .map(|e| e.observations.len())
1442 .sum();
1443
1444 serde_json::json!({
1445 "entities": live_entities,
1446 "relations": total_relations,
1447 "totalObservations": total_obs,
1448 "searchIndexEntries": index_entries,
1449 "internedStrings": self.interner.len(),
1450 "internedBytes": self.interner.total_bytes(),
1451 })
1452 }
1453
1454 pub fn search_relations(&self, from: Option<&str>, to: Option<&str>, rtype: Option<&str>) -> Vec<Relation> {
1458 let from_id = match from {
1459 Some(f) => match self.interner.get_optional(f) {
1460 Some(id) => Some(id),
1461 None => return Vec::new(),
1462 },
1463 None => None,
1464 };
1465 let to_id = match to {
1466 Some(t) => match self.interner.get_optional(t) {
1467 Some(id) => Some(id),
1468 None => return Vec::new(),
1469 },
1470 None => None,
1471 };
1472 let rtype_id = match rtype {
1473 Some(r) => match self.interner.get_optional(r) {
1474 Some(id) => Some(id),
1475 None => return Vec::new(),
1476 },
1477 None => None,
1478 };
1479
1480 self.relations
1481 .iter()
1482 .filter(|r| {
1483 from_id.is_none_or(|f| r.from == f)
1484 && to_id.is_none_or(|t| r.to == t)
1485 && rtype_id.is_none_or(|rt| r.relation_type == rt)
1486 })
1487 .map(|r| Relation {
1488 from: self.interner.lookup(r.from).to_string(),
1489 to: self.interner.lookup(r.to).to_string(),
1490 relation_type: self.interner.lookup(r.relation_type).to_string(),
1491 })
1492 .collect()
1493 }
1494
1495 pub fn find_path(&self, from: &str, to: &str) -> Result<Vec<String>> {
1498 let from_id = self.interner.get_optional(from)
1499 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
1500 let to_id = self.interner.get_optional(to)
1501 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
1502 let hash_from = self.interner.get_hash(from_id);
1503 let hash_to = self.interner.get_hash(to_id);
1504
1505 if self.name_table.lookup(hash_from, from_id).is_none() {
1506 return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
1507 }
1508 if self.name_table.lookup(hash_to, to_id).is_none() {
1509 return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
1510 }
1511 if from_id == to_id {
1512 return Ok(vec![from.to_string()]);
1513 }
1514
1515 let mut visited: AHashSet<StrId> = AHashSet::new();
1517 let mut parent: AHashMap<StrId, StrId> = AHashMap::new();
1518 let mut queue: VecDeque<StrId> = VecDeque::new();
1519
1520 visited.insert(from_id);
1521 queue.push_back(from_id);
1522
1523 while let Some(current) = queue.pop_front() {
1524 if current == to_id {
1525 break;
1526 }
1527
1528 if let Some(neighbors) = self.adjacency.get(¤t) {
1529 for &(neighbor, _) in neighbors {
1530 if visited.insert(neighbor) {
1531 parent.insert(neighbor, current);
1532 queue.push_back(neighbor);
1533 }
1534 }
1535 }
1536 }
1537
1538 if !parent.contains_key(&to_id) && from_id != to_id {
1539 return Err(MCSError::MemoryError(format!(
1540 "No path found between '{from}' and '{to}'"
1541 )));
1542 }
1543
1544 let mut path: Vec<String> = Vec::new();
1546 let mut cur = to_id;
1547 loop {
1548 path.push(self.interner.lookup(cur).to_string());
1549 if cur == from_id {
1550 break;
1551 }
1552 cur = *parent.get(&cur).ok_or_else(|| {
1553 MCSError::MemoryError("Path reconstruction failed".into())
1554 })?;
1555 }
1556 path.reverse();
1557 Ok(path)
1558 }
1559
1560 pub fn compact(&mut self) -> Result<()> {
1565 let mut create_entities: Vec<Entity> = Vec::new();
1567 let mut create_relations: Vec<Relation> = Vec::new();
1568
1569 for slot in &self.entity_slots {
1570 if let Some(stored) = slot.as_ref().filter(|e| e.is_live()) {
1571 create_entities.push(self.entity_to_output(stored));
1572 }
1573 }
1574 for rel in &self.relations {
1575 create_relations.push(Relation {
1576 from: self.interner.lookup(rel.from).to_string(),
1577 to: self.interner.lookup(rel.to).to_string(),
1578 relation_type: self.interner.lookup(rel.relation_type).to_string(),
1579 });
1580 }
1581
1582 let tmp_path = self.store.path().with_extension("tmp");
1589 if let Err(e) = std::fs::remove_file(&tmp_path)
1590 && e.kind() != std::io::ErrorKind::NotFound
1591 {
1592 return Err(MCSError::IoError(e));
1593 }
1594 let mut tmp_store = BinaryStore::new(&tmp_path).map_err(MCSError::IoError)?;
1595 for entity in &create_entities {
1596 let mut buf = Vec::new();
1597 store_enc::encode_create_entity(&mut buf, &entity.name, &entity.entity_type, &entity.observations)
1598 .map_err(MCSError::IoError)?;
1599 tmp_store.write_record(RecordKind::CreateEntity, &buf).map_err(MCSError::IoError)?;
1600 }
1601 for relation in &create_relations {
1602 let mut buf = Vec::new();
1603 store_enc::encode_create_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
1604 .map_err(MCSError::IoError)?;
1605 tmp_store.write_record(RecordKind::CreateRelation, &buf).map_err(MCSError::IoError)?;
1606 }
1607 tmp_store.flush_and_sync().map_err(MCSError::IoError)?;
1608 drop(tmp_store);
1609
1610 std::fs::rename(&tmp_path, self.store.path()).map_err(MCSError::IoError)?;
1615 sync_parent_dir(self.store.path()).map_err(MCSError::IoError)?;
1616
1617 let path = self.store.path().clone();
1623 let sync_slot = Arc::clone(&self.store.sync_slot);
1626 *self = KnowledgeGraph::open(&path, Some(sync_slot)).map_err(MCSError::IoError)?;
1627
1628 Ok(())
1629 }
1630
1631 pub fn create_entities(&mut self, entities: &[Entity]) -> Result<Vec<Entity>> {
1634 for entity in entities {
1636 if entity.name.is_empty() {
1637 return Err(MCSError::InvalidParams(
1638 "Entity name must not be empty".into(),
1639 ));
1640 }
1641 }
1642 let mut created = Vec::new();
1643 for entity in entities {
1644 let existing = self.interner.get_optional(&entity.name)
1646 .and_then(|id| {
1647 let hash = self.interner.get_hash(id);
1648 self.name_table.lookup(hash, id)
1649 });
1650 if existing.is_some() {
1651 continue;
1652 }
1653 let mut buf = Vec::new();
1655 store_enc::encode_create_entity(&mut buf, &entity.name, &entity.entity_type, &entity.observations)
1656 .map_err(MCSError::IoError)?;
1657 self.store.write_record(RecordKind::CreateEntity, &buf)
1658 .map_err(MCSError::IoError)?;
1659
1660 let name_id = self.interner.intern(&entity.name);
1661 let hash = self.interner.get_hash(name_id);
1662 let type_id = self.interner.intern(&entity.entity_type);
1663 let obs_ids: Vec<StrId> = entity
1664 .observations
1665 .iter()
1666 .map(|o| self.interner.intern(o))
1667 .collect();
1668 let reused = self.free_slots.pop();
1671 let slot = reused.unwrap_or(self.entity_slots.len() as u32);
1672 self.search
1673 .index_entity(&mut self.interner, slot, name_id, type_id, &obs_ids);
1674 let stored = Some(StoredEntity {
1675 state: ENTITY_SLOT_LIVE,
1676 name: name_id,
1677 entity_type: type_id,
1678 observations: obs_ids,
1679 });
1680 match reused {
1681 Some(s) => self.entity_slots[s as usize] = stored,
1682 None => self.entity_slots.push(stored),
1683 }
1684 self.name_table.insert(&self.interner, hash, name_id, slot);
1685 created.push(Entity {
1686 name: entity.name.clone(),
1687 entity_type: entity.entity_type.clone(),
1688 observations: entity.observations.clone(),
1689 });
1690 }
1691 Ok(created)
1692 }
1693
1694 pub fn create_relations(&mut self, relations: &[Relation]) -> Result<Vec<Relation>> {
1695 for relation in relations {
1697 if relation.from.is_empty() || relation.to.is_empty() {
1698 return Err(MCSError::InvalidParams(
1699 "Relation endpoints must not be empty".into(),
1700 ));
1701 }
1702 }
1703 let mut created = Vec::new();
1704 let mut rel_set: AHashSet<(StrId, StrId, StrId)> = AHashSet::new();
1706 for rel in &self.relations {
1707 rel_set.insert((rel.from, rel.to, rel.relation_type));
1708 }
1709 for relation in relations {
1710 let from_id = self.interner.intern(&relation.from);
1711 let to_id = self.interner.intern(&relation.to);
1712 let type_id = self.interner.intern(&relation.relation_type);
1713 if !rel_set.insert((from_id, to_id, type_id)) {
1714 continue;
1715 }
1716 let mut buf = Vec::new();
1718 store_enc::encode_create_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
1719 .map_err(MCSError::IoError)?;
1720 self.store.write_record(RecordKind::CreateRelation, &buf)
1721 .map_err(MCSError::IoError)?;
1722
1723 self.relations.push(StoredRelation {
1724 from: from_id,
1725 to: to_id,
1726 relation_type: type_id,
1727 });
1728 self.adjacency.entry(from_id).or_default().push((to_id, type_id));
1729 self.adjacency.entry(to_id).or_default().push((from_id, type_id));
1730 created.push(Relation {
1731 from: relation.from.clone(),
1732 to: relation.to.clone(),
1733 relation_type: relation.relation_type.clone(),
1734 });
1735 }
1736 Ok(created)
1737 }
1738
1739 pub fn add_observations(&mut self, entity_name: &str, contents: &[String]) -> Result<Vec<String>> {
1740 let name_id = self.interner.get_optional(entity_name)
1741 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1742 let hash = self.interner.get_hash(name_id);
1743 let slot = self
1744 .name_table
1745 .lookup(hash, name_id)
1746 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1747 let existing: AHashSet<StrId> = self
1750 .entity_slots
1751 .get(slot as usize)
1752 .and_then(|e| e.as_ref())
1753 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?
1754 .observations
1755 .iter()
1756 .copied()
1757 .collect();
1758
1759 let mut added = Vec::new();
1762 let mut interned_added = Vec::new();
1763 let mut seen: AHashSet<StrId> = AHashSet::new();
1764 for content in contents {
1765 let cid = self.interner.intern(content);
1766 if existing.contains(&cid) || !seen.insert(cid) {
1767 continue;
1768 }
1769 interned_added.push(cid);
1770 added.push(content.clone());
1771 }
1772 if added.is_empty() {
1773 return Ok(added);
1774 }
1775
1776 let mut buf = Vec::new();
1779 store_enc::encode_add_observations(&mut buf, entity_name, &added)
1780 .map_err(MCSError::IoError)?;
1781 self.store.write_record(RecordKind::AddObservations, &buf)
1782 .map_err(MCSError::IoError)?;
1783
1784 let stored = self
1786 .entity_slots
1787 .get_mut(slot as usize)
1788 .and_then(|e| e.as_mut())
1789 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1790 stored.observations.extend_from_slice(&interned_added);
1791
1792 self.search
1795 .index_additional(&mut self.interner, slot, &interned_added);
1796 Ok(added)
1797 }
1798
1799 pub fn delete_entities(&mut self, entity_names: &[String]) -> Result<()> {
1800 let mut deleted_names = Vec::new();
1801 for name in entity_names {
1802 let name_id_opt = self.interner.get_optional(name);
1803 if let Some(name_id) = name_id_opt {
1804 let hash = self.interner.get_hash(name_id);
1805 if let Some(slot) = self.name_table.lookup(hash, name_id)
1806 && let Some(Some(_)) = self.entity_slots.get(slot as usize)
1807 {
1808 let mut buf = Vec::new();
1810 store_enc::encode_delete_entity(&mut buf, name)
1811 .map_err(MCSError::IoError)?;
1812 self.store.write_record(RecordKind::DeleteEntity, &buf)
1813 .map_err(MCSError::IoError)?;
1814
1815 self.entity_slots[slot as usize] = None;
1816 self.free_slots.push(slot);
1817 self.search.remove_entity(slot);
1818 self.name_table.remove(&self.interner, hash, name_id);
1819 deleted_names.push(name.clone());
1820 }
1821 }
1822 }
1823 if !deleted_names.is_empty() {
1824 let deleted_ids: AHashSet<StrId> = deleted_names.iter()
1826 .map(|n| self.interner.intern(n))
1827 .collect();
1828 self.relations
1829 .retain(|r| !deleted_ids.contains(&r.from) && !deleted_ids.contains(&r.to));
1830 for id in &deleted_ids {
1832 self.adjacency.remove(id);
1833 for list in self.adjacency.values_mut() {
1835 list.retain(|(to, _)| !deleted_ids.contains(to));
1836 }
1837 }
1838 }
1839 Ok(())
1840 }
1841
1842 pub fn delete_observations(&mut self, entity_name: &str, observations: &[String]) -> Result<()> {
1843 let name_id = self.interner.get_optional(entity_name)
1844 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1845 let hash = self.interner.get_hash(name_id);
1846 let slot = self
1847 .name_table
1848 .lookup(hash, name_id)
1849 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1850 self.entity_slots
1852 .get(slot as usize)
1853 .and_then(|e| e.as_ref())
1854 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1855 let remove_ids: AHashSet<StrId> = observations.iter().map(|o| self.interner.intern(o)).collect();
1856
1857 let mut buf = Vec::new();
1859 store_enc::encode_delete_observations(&mut buf, entity_name, observations)
1860 .map_err(MCSError::IoError)?;
1861 self.store.write_record(RecordKind::DeleteObservations, &buf)
1862 .map_err(MCSError::IoError)?;
1863
1864 let stored = self
1866 .entity_slots
1867 .get_mut(slot as usize)
1868 .and_then(|e| e.as_mut())
1869 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1870 stored.observations.retain(|o| !remove_ids.contains(o));
1871 self.search.remove_entity(slot);
1872 self.search
1873 .index_entity(&mut self.interner, slot, stored.name, stored.entity_type, &stored.observations);
1874 Ok(())
1875 }
1876
1877 pub fn delete_relations(&mut self, relations: &[Relation]) -> Result<()> {
1878 let rels: AHashSet<(StrId, StrId, StrId)> = relations
1880 .iter()
1881 .map(|r| {
1882 (
1883 self.interner.intern(&r.from),
1884 self.interner.intern(&r.to),
1885 self.interner.intern(&r.relation_type),
1886 )
1887 })
1888 .collect();
1889 for relation in relations {
1892 let mut buf = Vec::new();
1893 store_enc::encode_delete_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
1894 .map_err(MCSError::IoError)?;
1895 self.store.write_record(RecordKind::DeleteRelation, &buf)
1896 .map_err(MCSError::IoError)?;
1897 }
1898 self.relations
1899 .retain(|r| !rels.contains(&(r.from, r.to, r.relation_type)));
1900 for (f, t, rt) in &rels {
1902 if let Some(edges) = self.adjacency.get_mut(f) {
1903 edges.retain(|(to, rtype)| to != t || rtype != rt);
1904 if edges.is_empty() {
1905 self.adjacency.remove(f);
1906 }
1907 }
1908 if let Some(edges) = self.adjacency.get_mut(t) {
1909 edges.retain(|(to, rtype)| to != f || rtype != rt);
1910 if edges.is_empty() {
1911 self.adjacency.remove(t);
1912 }
1913 }
1914 }
1915 Ok(())
1916 }
1917
1918 pub fn read_graph(&self) -> KnowledgeGraphOut {
1919 self.read_graph_view().to_owned_out()
1920 }
1921
1922 pub fn read_graph_view(&self) -> GraphView<'_> {
1926 let entities: Vec<&StoredEntity> = self
1927 .entity_slots
1928 .iter()
1929 .filter_map(|s| s.as_ref().filter(|e| e.is_live()))
1930 .collect();
1931 let relations: Vec<&StoredRelation> = self.relations.iter().collect();
1932 GraphView { kg: self, entities, relations }
1933 }
1934
1935 pub fn search_nodes(&self, query: &str) -> KnowledgeGraphOut {
1938 self.search_nodes_filtered(query, None, 0, usize::MAX)
1939 }
1940
1941 pub fn open_nodes(&self, names: &[String]) -> KnowledgeGraphOut {
1942 self.open_nodes_view(names).to_owned_out()
1943 }
1944
1945 pub fn open_nodes_view(&self, names: &[String]) -> GraphView<'_> {
1947 let name_ids: AHashSet<StrId> = names.iter()
1948 .filter_map(|n| self.interner.get_optional(n))
1949 .collect();
1950 let entities: Vec<&StoredEntity> = self
1951 .entity_slots
1952 .iter()
1953 .filter_map(|s| {
1954 s.as_ref()
1955 .filter(|stored| stored.is_live() && name_ids.contains(&stored.name))
1956 })
1957 .collect();
1958 let matched_names: AHashSet<StrId> = entities.iter().map(|e| e.name).collect();
1959 let relations: Vec<&StoredRelation> = self
1960 .relations
1961 .iter()
1962 .filter(|r| matched_names.contains(&r.from) || matched_names.contains(&r.to))
1963 .collect();
1964 GraphView { kg: self, entities, relations }
1965 }
1966
1967 fn entity_to_output(&self, stored: &StoredEntity) -> Entity {
1972 Entity {
1973 name: self.interner.lookup(stored.name).to_string(),
1974 entity_type: self.interner.lookup(stored.entity_type).to_string(),
1975 observations: stored
1976 .observations
1977 .iter()
1978 .map(|o| self.interner.lookup(*o).to_string())
1979 .collect(),
1980 }
1981 }
1982
1983 #[inline]
1984 fn relation_to_output(&self, r: &StoredRelation) -> Relation {
1985 Relation {
1986 from: self.interner.lookup(r.from).to_string(),
1987 to: self.interner.lookup(r.to).to_string(),
1988 relation_type: self.interner.lookup(r.relation_type).to_string(),
1989 }
1990 }
1991
1992 fn lookup_live_slot(&self, name: &str) -> Option<u32> {
1994 let name_id = self.interner.get_optional(name)?;
1995 let hash = self.interner.get_hash(name_id);
1996 let slot = self.name_table.lookup(hash, name_id)?;
1997 let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
1998 stored.is_live().then_some(slot)
1999 }
2000
2001 fn entity_by_name_id(&self, name_id: StrId) -> Option<Entity> {
2003 let hash = self.interner.get_hash(name_id);
2004 let slot = self.name_table.lookup(hash, name_id)?;
2005 let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
2006 stored.is_live().then(|| self.entity_to_output(stored))
2007 }
2008
2009 pub fn entity_type_counts(&self) -> Vec<(String, usize)> {
2013 let mut counts: AHashMap<StrId, usize> = AHashMap::new();
2014 for st in self
2015 .entity_slots
2016 .iter()
2017 .filter_map(|s| s.as_ref())
2018 .filter(|e| e.is_live())
2019 {
2020 *counts.entry(st.entity_type).or_insert(0) += 1;
2021 }
2022 self.rank_counts(counts)
2023 }
2024
2025 pub fn relation_type_counts(&self) -> Vec<(String, usize)> {
2027 let mut counts: AHashMap<StrId, usize> = AHashMap::new();
2028 for r in &self.relations {
2029 *counts.entry(r.relation_type).or_insert(0) += 1;
2030 }
2031 self.rank_counts(counts)
2032 }
2033
2034 fn rank_counts(&self, counts: AHashMap<StrId, usize>) -> Vec<(String, usize)> {
2035 let mut out: Vec<(String, usize)> = counts
2036 .into_iter()
2037 .map(|(id, c)| (self.interner.lookup(id).to_string(), c))
2038 .collect();
2039 out.sort_unstable_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
2040 out
2041 }
2042
2043 pub fn search_nodes_filtered(
2047 &self,
2048 query: &str,
2049 entity_type: Option<&str>,
2050 offset: usize,
2051 limit: usize,
2052 ) -> KnowledgeGraphOut {
2053 self.search_nodes_view(query, entity_type, offset, limit).to_owned_out()
2054 }
2055
2056 pub fn search_nodes_view(
2058 &self,
2059 query: &str,
2060 entity_type: Option<&str>,
2061 offset: usize,
2062 limit: usize,
2063 ) -> GraphView<'_> {
2064 let type_id = match entity_type {
2065 Some(t) => match self.interner.get_optional(t) {
2066 Some(id) => Some(id),
2067 None => return GraphView { kg: self, entities: Vec::new(), relations: Vec::new() },
2068 },
2069 None => None,
2070 };
2071
2072 let ranked = self.search.search_ranked(query, &self.interner);
2073 let mut selected: AHashSet<StrId> = AHashSet::new();
2074 let mut entities: Vec<&StoredEntity> = Vec::new();
2075 let mut skipped = 0usize;
2076 for (slot, _score) in ranked {
2077 let Some(st) = self
2078 .entity_slots
2079 .get(slot as usize)
2080 .and_then(|s| s.as_ref())
2081 .filter(|e| e.is_live())
2082 else {
2083 continue;
2084 };
2085 if type_id.is_some_and(|tid| st.entity_type != tid) {
2086 continue;
2087 }
2088 if skipped < offset {
2089 skipped += 1;
2090 continue;
2091 }
2092 if entities.len() >= limit {
2093 break;
2094 }
2095 selected.insert(st.name);
2096 entities.push(st);
2097 }
2098
2099 let relations: Vec<&StoredRelation> = self
2100 .relations
2101 .iter()
2102 .filter(|r| selected.contains(&r.from) || selected.contains(&r.to))
2103 .collect();
2104 GraphView { kg: self, entities, relations }
2105 }
2106
2107 pub fn read_graph_filtered(
2111 &self,
2112 entity_type: Option<&str>,
2113 offset: usize,
2114 limit: usize,
2115 ) -> KnowledgeGraphOut {
2116 self.read_graph_filtered_view(entity_type, offset, limit).to_owned_out()
2117 }
2118
2119 pub fn read_graph_filtered_view(
2121 &self,
2122 entity_type: Option<&str>,
2123 offset: usize,
2124 limit: usize,
2125 ) -> GraphView<'_> {
2126 let type_id = match entity_type {
2127 Some(t) => match self.interner.get_optional(t) {
2128 Some(id) => Some(id),
2129 None => return GraphView { kg: self, entities: Vec::new(), relations: Vec::new() },
2130 },
2131 None => None,
2132 };
2133
2134 let mut selected: AHashSet<StrId> = AHashSet::new();
2135 let mut entities: Vec<&StoredEntity> = Vec::new();
2136 let mut skipped = 0usize;
2137 for st in self
2138 .entity_slots
2139 .iter()
2140 .filter_map(|s| s.as_ref())
2141 .filter(|e| e.is_live())
2142 {
2143 if type_id.is_some_and(|tid| st.entity_type != tid) {
2144 continue;
2145 }
2146 if skipped < offset {
2147 skipped += 1;
2148 continue;
2149 }
2150 if entities.len() >= limit {
2151 break;
2152 }
2153 selected.insert(st.name);
2154 entities.push(st);
2155 }
2156
2157 let relations: Vec<&StoredRelation> = self
2158 .relations
2159 .iter()
2160 .filter(|r| selected.contains(&r.from) && selected.contains(&r.to))
2161 .collect();
2162 GraphView { kg: self, entities, relations }
2163 }
2164
2165 pub fn neighbors(
2173 &self,
2174 name: &str,
2175 direction: Direction,
2176 rtype: Option<&str>,
2177 depth: u32,
2178 ) -> Result<KnowledgeGraphOut> {
2179 self.lookup_live_slot(name)
2180 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
2181 let start = self.interner.get_optional(name).unwrap();
2183
2184 let rtype_id = match rtype {
2186 Some(r) => match self.interner.get_optional(r) {
2187 Some(id) => Some(id),
2188 None => {
2189 let entities = self.entity_by_name_id(start).into_iter().collect();
2190 return Ok(KnowledgeGraphOut { entities, relations: Vec::new() });
2191 }
2192 },
2193 None => None,
2194 };
2195
2196 let mut visited: AHashSet<StrId> = AHashSet::new();
2197 visited.insert(start);
2198
2199 let type_ok = |r: &StoredRelation| rtype_id.is_none_or(|rt| r.relation_type == rt);
2200
2201 if depth == 1 {
2202 for r in self.relations.iter().filter(|r| type_ok(r)) {
2203 match direction {
2204 Direction::Out => {
2205 if r.from == start {
2206 visited.insert(r.to);
2207 }
2208 }
2209 Direction::In => {
2210 if r.to == start {
2211 visited.insert(r.from);
2212 }
2213 }
2214 Direction::Both => {
2215 if r.from == start {
2216 visited.insert(r.to);
2217 } else if r.to == start {
2218 visited.insert(r.from);
2219 }
2220 }
2221 }
2222 }
2223 } else if depth >= 2 {
2224 let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
2228 match direction {
2229 Direction::Both => {
2230 for (&node, edges) in &self.adjacency {
2231 for &(nb, rt) in edges {
2232 if rtype_id.is_none_or(|rt_id| rt == rt_id) {
2233 adj.entry(node).or_default().push(nb);
2234 }
2235 }
2236 }
2237 }
2238 Direction::Out | Direction::In => {
2239 for r in self.relations.iter().filter(|r| type_ok(r)) {
2240 match direction {
2241 Direction::Out => adj.entry(r.from).or_default().push(r.to),
2242 Direction::In => adj.entry(r.to).or_default().push(r.from),
2243 _ => unreachable!(),
2244 }
2245 }
2246 }
2247 }
2248 let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
2249 queue.push_back((start, 0));
2250 while let Some((node, d)) = queue.pop_front() {
2251 if d >= depth {
2252 continue;
2253 }
2254 if let Some(nbrs) = adj.get(&node) {
2255 for &nb in nbrs {
2256 if visited.insert(nb) {
2257 queue.push_back((nb, d + 1));
2258 }
2259 }
2260 }
2261 }
2262 }
2263
2264 let mut entities = Vec::with_capacity(visited.len());
2265 for &nid in &visited {
2266 if let Some(e) = self.entity_by_name_id(nid) {
2267 entities.push(e);
2268 }
2269 }
2270 let relations = self
2271 .relations
2272 .iter()
2273 .filter(|r| type_ok(r) && visited.contains(&r.from) && visited.contains(&r.to))
2274 .map(|r| self.relation_to_output(r))
2275 .collect();
2276 Ok(KnowledgeGraphOut { entities, relations })
2277 }
2278
2279 pub fn describe_entity(&self, name: &str) -> Result<serde_json::Value> {
2283 let name_id = self
2284 .interner
2285 .get_optional(name)
2286 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
2287 let entity = self
2288 .entity_by_name_id(name_id)
2289 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
2290
2291 let mut incident: Vec<Relation> = Vec::new();
2292 let mut neighbor_seen: AHashSet<StrId> = AHashSet::new();
2293 let mut neighbors: Vec<&str> = Vec::new();
2294 for r in &self.relations {
2295 if r.from == name_id || r.to == name_id {
2296 incident.push(self.relation_to_output(r));
2297 let other = if r.from == name_id { r.to } else { r.from };
2298 if other != name_id && neighbor_seen.insert(other) {
2299 neighbors.push(self.interner.lookup(other));
2300 }
2301 }
2302 }
2303
2304 Ok(serde_json::json!({
2305 "entity": entity,
2306 "relations": incident,
2307 "neighbors": neighbors,
2308 "degree": incident.len(),
2309 }))
2310 }
2311
2312 pub fn upsert_entities(&mut self, entities: &[Entity]) -> Result<Vec<serde_json::Value>> {
2317 for e in entities {
2318 if e.name.is_empty() {
2319 return Err(MCSError::InvalidParams(
2320 "Entity name must not be empty".into(),
2321 ));
2322 }
2323 }
2324 let mut out = Vec::with_capacity(entities.len());
2325 for e in entities {
2326 if self.lookup_live_slot(&e.name).is_some() {
2327 let added = self.add_observations(&e.name, &e.observations)?;
2328 out.push(serde_json::json!({
2329 "name": e.name,
2330 "created": false,
2331 "addedObservations": added,
2332 }));
2333 } else {
2334 let created = self.create_entities(std::slice::from_ref(e))?;
2335 out.push(serde_json::json!({
2336 "name": e.name,
2337 "created": !created.is_empty(),
2338 "addedObservations": e.observations,
2339 }));
2340 }
2341 }
2342 Ok(out)
2343 }
2344
2345 pub fn export(&self, format: &str) -> Result<String> {
2347 match format {
2348 "json" => serde_json::to_string(&self.read_graph()).map_err(MCSError::JsonError),
2349 "mermaid" => Ok(self.export_mermaid()),
2350 "dot" => Ok(self.export_dot()),
2351 other => Err(MCSError::InvalidParams(format!(
2352 "Unknown export format '{other}' (expected json|mermaid|dot)"
2353 ))),
2354 }
2355 }
2356
2357 fn diagram_node_ids(&self) -> (AHashMap<StrId, usize>, Vec<(usize, StrId)>) {
2359 let mut ids: AHashMap<StrId, usize> = AHashMap::new();
2360 let mut order: Vec<(usize, StrId)> = Vec::new();
2361 for st in self
2362 .entity_slots
2363 .iter()
2364 .filter_map(|s| s.as_ref())
2365 .filter(|e| e.is_live())
2366 {
2367 let n = ids.len();
2368 ids.insert(st.name, n);
2369 order.push((n, st.name));
2370 }
2371 (ids, order)
2372 }
2373
2374 fn export_mermaid(&self) -> String {
2375 let (ids, order) = self.diagram_node_ids();
2376 let mut s = String::with_capacity(64 + order.len() * 32 + self.relations.len() * 32);
2377 s.push_str("graph LR\n");
2378 for (n, name_id) in &order {
2379 let label = sanitize_label(self.interner.lookup(*name_id));
2380 s.push_str(&format!(" n{n}[\"{label}\"]\n"));
2381 }
2382 for r in &self.relations {
2383 if let (Some(&a), Some(&b)) = (ids.get(&r.from), ids.get(&r.to)) {
2384 let rel = sanitize_label(self.interner.lookup(r.relation_type));
2385 s.push_str(&format!(" n{a} -->|{rel}| n{b}\n"));
2386 }
2387 }
2388 s
2389 }
2390
2391 fn export_dot(&self) -> String {
2392 let (ids, order) = self.diagram_node_ids();
2393 let mut s = String::with_capacity(64 + order.len() * 32 + self.relations.len() * 32);
2394 s.push_str("digraph G {\n");
2395 for (n, name_id) in &order {
2396 let label = sanitize_label(self.interner.lookup(*name_id));
2397 s.push_str(&format!(" n{n} [label=\"{label}\"];\n"));
2398 }
2399 for r in &self.relations {
2400 if let (Some(&a), Some(&b)) = (ids.get(&r.from), ids.get(&r.to)) {
2401 let rel = sanitize_label(self.interner.lookup(r.relation_type));
2402 s.push_str(&format!(" n{a} -> n{b} [label=\"{rel}\"];\n"));
2403 }
2404 }
2405 s.push_str("}\n");
2406 s
2407 }
2408
2409 pub fn merge_entities(&mut self, source: &str, target: &str) -> Result<serde_json::Value> {
2422 if source == target {
2423 return Err(MCSError::InvalidParams(
2424 "Source and target must be different entities".into(),
2425 ));
2426 }
2427 self.lookup_live_slot(source).ok_or_else(|| {
2428 MCSError::InvalidParams(format!("Source entity '{source}' not found"))
2429 })?;
2430 let target_slot = self.lookup_live_slot(target).ok_or_else(|| {
2431 MCSError::InvalidParams(format!("Target entity '{target}' not found"))
2432 })?;
2433
2434 let source_entity = self.get_entity(source).unwrap();
2435 let moved_obs_count = source_entity.observations.len();
2436 let source_id = self.interner.get_optional(source).unwrap();
2437 let target_id = self.interner.get_optional(target).unwrap();
2438
2439 let target_existing: AHashSet<StrId> = self.entity_slots[target_slot as usize]
2442 .as_ref()
2443 .unwrap()
2444 .observations
2445 .iter()
2446 .copied()
2447 .collect();
2448 let mut obs_seen: AHashSet<StrId> = AHashSet::new();
2449 let mut obs_to_add: Vec<String> = Vec::new();
2450 for o in &source_entity.observations {
2451 if let Some(oid) = self.interner.get_optional(o)
2452 && !target_existing.contains(&oid)
2453 && obs_seen.insert(oid)
2454 {
2455 obs_to_add.push(o.clone());
2456 }
2457 }
2458
2459 let existing_rels: AHashSet<(StrId, StrId, StrId)> =
2462 self.relations.iter().map(|r| (r.from, r.to, r.relation_type)).collect();
2463 let mut rel_seen: AHashSet<(StrId, StrId, StrId)> = AHashSet::new();
2464 let mut redirect: Vec<Relation> = Vec::new();
2465 for r in &self.relations {
2466 if r.from != source_id && r.to != source_id {
2467 continue;
2468 }
2469 let new_from = if r.from == source_id { target_id } else { r.from };
2470 let new_to = if r.to == source_id { target_id } else { r.to };
2471 if new_from == new_to {
2472 continue; }
2474 let key = (new_from, new_to, r.relation_type);
2475 if existing_rels.contains(&key) || !rel_seen.insert(key) {
2476 continue;
2477 }
2478 redirect.push(Relation {
2479 from: self.interner.lookup(new_from).to_string(),
2480 to: self.interner.lookup(new_to).to_string(),
2481 relation_type: self.interner.lookup(r.relation_type).to_string(),
2482 });
2483 }
2484
2485 let added_count = obs_to_add.len();
2486 let redirected = redirect.len() as u32;
2487
2488 let mut records: Vec<(RecordKind, Vec<u8>)> = Vec::new();
2490 if !obs_to_add.is_empty() {
2491 let mut buf = Vec::new();
2492 store_enc::encode_add_observations(&mut buf, target, &obs_to_add)
2493 .map_err(MCSError::IoError)?;
2494 records.push((RecordKind::AddObservations, buf));
2495 }
2496 for r in &redirect {
2497 let mut buf = Vec::new();
2498 store_enc::encode_create_relation(&mut buf, &r.from, &r.to, &r.relation_type)
2499 .map_err(MCSError::IoError)?;
2500 records.push((RecordKind::CreateRelation, buf));
2501 }
2502 let mut del_buf = Vec::new();
2503 store_enc::encode_delete_entity(&mut del_buf, source).map_err(MCSError::IoError)?;
2504 records.push((RecordKind::DeleteEntity, del_buf));
2505
2506 self.store.write_record(RecordKind::TxnBegin, &[]).map_err(MCSError::IoError)?;
2508 for (kind, data) in &records {
2509 self.store.write_record(*kind, data).map_err(MCSError::IoError)?;
2510 }
2511 self.store.write_record(RecordKind::TxnCommit, &[]).map_err(MCSError::IoError)?;
2512
2513 for (kind, data) in &records {
2515 Self::apply_record(
2516 *kind, data, &mut self.interner, &mut self.entity_slots, &mut self.search,
2517 &mut self.name_table, &mut self.relations,
2518 );
2519 }
2520
2521 self.adjacency.remove(&source_id);
2529 for list in self.adjacency.values_mut() {
2530 list.retain(|(to, _)| *to != source_id);
2531 }
2532 for r in &redirect {
2533 let from_id = self.interner.get_optional(&r.from).unwrap();
2535 let to_id = self.interner.get_optional(&r.to).unwrap();
2536 let type_id = self.interner.get_optional(&r.relation_type).unwrap();
2537 self.adjacency.entry(from_id).or_default().push((to_id, type_id));
2538 self.adjacency.entry(to_id).or_default().push((from_id, type_id));
2539 }
2540
2541 Ok(serde_json::json!({
2542 "source": source,
2543 "target": target,
2544 "movedObservations": moved_obs_count,
2545 "addedObservations": added_count,
2546 "redirectedRelations": redirected,
2547 }))
2548 }
2549
2550 pub fn extract_subgraph(&self, names: &[String], depth: u32) -> Result<KnowledgeGraphOut> {
2554 if names.is_empty() {
2555 return Ok(KnowledgeGraphOut {
2556 entities: Vec::new(),
2557 relations: Vec::new(),
2558 });
2559 }
2560 let mut visited: AHashSet<StrId> = AHashSet::new();
2562 let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
2563 for name in names {
2564 if let Some(id) = self.interner.get_optional(name)
2565 && visited.insert(id)
2566 {
2567 queue.push_back((id, 0));
2568 }
2569 }
2570 let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
2572 for (&node, edges) in &self.adjacency {
2573 let nb: Vec<StrId> = edges.iter().map(|(to, _)| *to).collect();
2574 adj.insert(node, nb);
2575 }
2576 while let Some((node, d)) = queue.pop_front() {
2577 if d >= depth {
2578 continue;
2579 }
2580 if let Some(nbrs) = adj.get(&node) {
2581 for &nb in nbrs {
2582 if visited.insert(nb) {
2583 queue.push_back((nb, d + 1));
2584 }
2585 }
2586 }
2587 }
2588 let mut entities: Vec<Entity> = Vec::with_capacity(visited.len());
2589 for &nid in &visited {
2590 if let Some(e) = self.entity_by_name_id(nid) {
2591 entities.push(e);
2592 }
2593 }
2594 let relations: Vec<Relation> = self
2595 .relations
2596 .iter()
2597 .filter(|r| visited.contains(&r.from) && visited.contains(&r.to))
2598 .map(|r| self.relation_to_output(r))
2599 .collect();
2600 Ok(KnowledgeGraphOut { entities, relations })
2601 }
2602
2603 pub fn batch_get_entities(&self, names: &[String]) -> Vec<Option<Entity>> {
2605 names.iter().map(|n| self.get_entity(n)).collect()
2606 }
2607
2608 #[allow(clippy::too_many_arguments)]
2611 fn dfs_all_paths(
2612 adj: &AHashMap<StrId, Vec<StrId>>,
2613 current: StrId,
2614 target: StrId,
2615 max_depth: usize,
2616 max_paths: usize,
2617 visited: &mut AHashSet<StrId>,
2618 current_path: &mut Vec<StrId>,
2619 all_paths: &mut Vec<Vec<StrId>>,
2620 ) {
2621 if all_paths.len() >= max_paths {
2622 return;
2623 }
2624 if current == target && current_path.len() > 1 {
2625 all_paths.push(current_path.clone());
2626 return;
2627 }
2628 if current_path.len() > max_depth {
2629 return;
2630 }
2631 if let Some(neighbors) = adj.get(¤t) {
2632 for &nb in neighbors {
2633 if visited.insert(nb) {
2634 current_path.push(nb);
2635 Self::dfs_all_paths(
2636 adj, nb, target, max_depth, max_paths, visited, current_path, all_paths,
2637 );
2638 current_path.pop();
2639 visited.remove(&nb);
2640 }
2641 }
2642 }
2643 }
2644
2645 pub fn find_all_paths(
2649 &self,
2650 from: &str,
2651 to: &str,
2652 max_depth: usize,
2653 max_paths: usize,
2654 ) -> Result<Vec<Vec<String>>> {
2655 let from_id = self
2656 .interner
2657 .get_optional(from)
2658 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
2659 let to_id = self
2660 .interner
2661 .get_optional(to)
2662 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
2663 if self.lookup_live_slot(from).is_none() {
2665 return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
2666 }
2667 if self.lookup_live_slot(to).is_none() {
2668 return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
2669 }
2670 if from_id == to_id {
2671 return Ok(vec![vec![from.to_string()]]);
2672 }
2673 let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::with_capacity(self.adjacency.len());
2675 for (&node, edges) in &self.adjacency {
2676 let nbrs: Vec<StrId> = edges.iter().map(|(to, _)| *to).collect();
2677 adj.insert(node, nbrs);
2678 }
2679 let mut all_paths: Vec<Vec<StrId>> = Vec::new();
2680 let mut current_path = Vec::new();
2681 let mut visited: AHashSet<StrId> = AHashSet::new();
2682 visited.insert(from_id);
2683 current_path.push(from_id);
2684 Self::dfs_all_paths(
2685 &adj,
2686 from_id,
2687 to_id,
2688 max_depth,
2689 max_paths,
2690 &mut visited,
2691 &mut current_path,
2692 &mut all_paths,
2693 );
2694 if all_paths.is_empty() {
2695 return Err(MCSError::MemoryError(format!(
2696 "No path found between '{from}' and '{to}'"
2697 )));
2698 }
2699 let result: Vec<Vec<String>> = all_paths
2700 .into_iter()
2701 .map(|path| {
2702 path.into_iter()
2703 .map(|id| self.interner.lookup(id).to_string())
2704 .collect()
2705 })
2706 .collect();
2707 Ok(result)
2708 }
2709
2710 pub fn snapshot(&self) -> ReadSnapshot {
2715 ReadSnapshot {
2716 interner: self.interner.clone(),
2717 entity_slots: Arc::from_iter(self.entity_slots.iter().cloned()),
2718 free_slots: self.free_slots.clone(),
2719 name_table: self.name_table.clone(),
2720 relations: Arc::from_iter(self.relations.iter().cloned()),
2721 adjacency: self.adjacency.clone(),
2722 search: self.search.clone(),
2723 }
2724 }
2725
2726 pub fn flush(&mut self) -> Result<()> {
2730 self.store.flush().map_err(MCSError::IoError)
2731 }
2732
2733 pub fn sync(&mut self) -> Result<()> {
2736 self.store.sync().map_err(MCSError::IoError)
2737 }
2738
2739 pub fn flush_and_sync(&mut self) -> Result<()> {
2741 self.store.flush_and_sync().map_err(MCSError::IoError)
2742 }
2743}
2744
2745
2746
2747pub struct GraphHandle {
2766 inner: Arc<parking_lot::Mutex<KnowledgeGraph>>,
2767 snapshot: ArcSwap<ReadSnapshot>,
2768 read_cache: ArcSwap<Option<Arc<str>>>,
2770 sync_notify: Arc<(StdMutex<bool>, Condvar)>,
2774 stop_sync: Arc<AtomicBool>,
2776}
2777
2778pub struct WriteGuard<'a> {
2780 guard: parking_lot::MutexGuard<'a, KnowledgeGraph>,
2781 snapshot: &'a ArcSwap<ReadSnapshot>,
2782 read_cache: &'a ArcSwap<Option<Arc<str>>>,
2783 sync_notify: &'a (StdMutex<bool>, Condvar),
2784 did_publish: bool,
2785}
2786
2787impl WriteGuard<'_> {
2788 pub fn publish(&mut self) {
2792 if let Err(e) = self.guard.flush() {
2793 tracing::error!("WAL flush failed: {e}");
2794 }
2795 let snap = Arc::new(self.guard.snapshot());
2796 self.snapshot.store(snap);
2797 self.read_cache.store(Arc::new(None));
2798 self.did_publish = true;
2799 let (lock, cvar) = self.sync_notify;
2801 let mut pending = lock.lock().unwrap_or_else(|e| e.into_inner());
2802 *pending = true;
2803 cvar.notify_one();
2804 }
2805
2806 pub fn graph(&mut self) -> &mut KnowledgeGraph {
2808 &mut self.guard
2809 }
2810}
2811
2812impl std::ops::Deref for WriteGuard<'_> {
2813 type Target = KnowledgeGraph;
2814 fn deref(&self) -> &KnowledgeGraph {
2815 &self.guard
2816 }
2817}
2818
2819impl std::ops::DerefMut for WriteGuard<'_> {
2820 fn deref_mut(&mut self) -> &mut KnowledgeGraph {
2821 &mut self.guard
2822 }
2823}
2824
2825impl Drop for WriteGuard<'_> {
2826 fn drop(&mut self) {
2827 if !self.did_publish {
2828 self.publish();
2829 }
2830 }
2831}
2832
2833impl Drop for GraphHandle {
2834 fn drop(&mut self) {
2835 self.stop_sync.store(true, Ordering::Relaxed);
2836 let (lock, cvar) = &*self.sync_notify;
2839 let mut pending = lock.lock().unwrap_or_else(|e| e.into_inner());
2840 *pending = true;
2841 cvar.notify_one();
2842 }
2843}
2844
2845impl GraphHandle {
2846 pub fn new(path: &Path) -> std::io::Result<Self> {
2850 let kg = KnowledgeGraph::new(path)?;
2851 let snapshot = Arc::new(kg.snapshot());
2852 let sync_slot = Arc::clone(&kg.store.sync_slot);
2856 let inner = Arc::new(parking_lot::Mutex::new(kg));
2857
2858 let sync_notify: Arc<(StdMutex<bool>, Condvar)> =
2859 Arc::new((StdMutex::new(false), Condvar::new()));
2860 let notify = Arc::clone(&sync_notify);
2861 let stop_sync = Arc::new(AtomicBool::new(false));
2862
2863 let sync_stop = Arc::clone(&stop_sync);
2867 std::thread::Builder::new()
2868 .name("mcp-memory-sync".into())
2869 .spawn(move || {
2870 let (lock, cvar) = &*notify;
2871 loop {
2872 let mut guard = cvar
2875 .wait_timeout_while(
2876 lock.lock().unwrap_or_else(|e| e.into_inner()),
2877 std::time::Duration::from_secs(1),
2878 |p| !*p,
2879 )
2880 .unwrap_or_else(|e| e.into_inner())
2881 .0;
2882
2883 let should_sync = *guard;
2884 *guard = false;
2885 drop(guard);
2888
2889 if should_sync {
2890 if let Err(e) = sync_slot.load().sync_data() {
2893 tracing::error!("WAL fsync failed: {e}");
2894 }
2895 }
2896
2897 if sync_stop.load(Ordering::Relaxed) {
2898 if let Err(e) = sync_slot.load().sync_data() {
2900 tracing::error!("WAL final fsync failed: {e}");
2901 }
2902 break;
2903 }
2904 }
2905 })
2906 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
2907
2908 Ok(Self {
2909 inner,
2910 snapshot: ArcSwap::new(snapshot),
2911 read_cache: ArcSwap::new(Arc::new(None)),
2912 sync_notify,
2913 stop_sync,
2914 })
2915 }
2916
2917 pub fn read_graph_cached(&self) -> Arc<str> {
2920 if let Some(cached) = self.read_cache.load().as_ref() {
2921 return cached.clone();
2922 }
2923 let graph = self.read();
2924 let json: Arc<str> = Arc::from(graph.read_graph_json().into_boxed_str());
2925 self.read_cache.store(Arc::new(Some(json.clone())));
2926 json
2927 }
2928
2929 pub fn read(&self) -> ReadSnapshot {
2931 (**self.snapshot.load()).clone()
2932 }
2933
2934 pub fn write(&self) -> WriteGuard<'_> {
2937 WriteGuard {
2938 guard: self.inner.lock(),
2939 snapshot: &self.snapshot,
2940 read_cache: &self.read_cache,
2941 sync_notify: &self.sync_notify,
2942 did_publish: false,
2943 }
2944 }
2945}
2946
2947