1use std::collections::VecDeque;
2use std::sync::Arc;
3
4use ahash::{AHashMap, AHashSet};
5use arc_swap::ArcSwap;
6use parking_lot::Mutex;
7use std::path::Path;
8
9use serde::ser::{Serialize, SerializeSeq, SerializeStruct, Serializer};
10
11use crate::errors::{MCSError, Result};
12use crate::intern::{StrId, StringInterner};
13use crate::types::{Entity, Relation, KnowledgeGraphOut};
14use crate::search::SearchIndex;
15use crate::store::{self as store_enc, BinaryStore, RecordKind};
16
17const ENTITY_SLOT_LIVE: u8 = 1;
18const NAME_TABLE_SHARDS: usize = 4;
19
20#[cfg(target_arch = "x86_64")]
25#[inline(always)]
26unsafe fn prefetch_addr(addr: *const u8) {
27 std::arch::x86_64::_mm_prefetch::<3>(addr);
29}
30
31#[cfg(not(target_arch = "x86_64"))]
32#[inline(always)]
33const unsafe fn prefetch_addr(_addr: *const u8) {}
34
35fn sync_parent_dir(path: &Path) -> std::io::Result<()> {
38 let dir = path.parent().filter(|p| !p.as_os_str().is_empty());
39 let dir = match dir {
40 Some(d) => d,
41 None => Path::new("."),
42 };
43 match std::fs::File::open(dir) {
44 Ok(f) => match f.sync_all() {
45 Ok(()) => Ok(()),
46 Err(e) if e.kind() == std::io::ErrorKind::InvalidInput => Ok(()),
48 Err(e) => Err(e),
49 },
50 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
51 Err(e) => Err(e),
52 }
53}
54
55#[derive(Clone)]
64#[cfg_attr(feature = "cache_align", repr(align(64)))]
65pub(crate) struct StoredEntity {
66 state: u8,
67 pub(crate) name: StrId,
68 pub(crate) entity_type: StrId,
69 pub(crate) observations: Vec<StrId>,
70}
71
72impl StoredEntity {
73 pub(crate) const fn is_live(&self) -> bool {
74 self.state == ENTITY_SLOT_LIVE
75 }
76}
77
78#[derive(Clone)]
82#[cfg_attr(feature = "cache_align", repr(align(16)))]
83pub(crate) struct StoredRelation {
84 pub(crate) from: StrId,
85 pub(crate) to: StrId,
86 pub(crate) relation_type: StrId,
87}
88
89pub struct GraphView<'a> {
103 kg: &'a KnowledgeGraph,
104 entities: Vec<&'a StoredEntity>,
105 relations: Vec<&'a StoredRelation>,
106}
107
108impl GraphView<'_> {
109 pub fn to_owned_out(&self) -> KnowledgeGraphOut {
113 KnowledgeGraphOut {
114 entities: self.entities.iter().map(|e| self.kg.entity_to_output(e)).collect(),
115 relations: self.relations.iter().map(|r| self.kg.relation_to_output(r)).collect(),
116 }
117 }
118}
119
120impl Serialize for GraphView<'_> {
121 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
122 let mut st = s.serialize_struct("KnowledgeGraphOut", 2)?;
123 st.serialize_field("entities", &EntityListRef { kg: self.kg, items: &self.entities })?;
124 st.serialize_field("relations", &RelationListRef { kg: self.kg, items: &self.relations })?;
125 st.end()
126 }
127}
128
129struct EntityListRef<'a> { kg: &'a KnowledgeGraph, items: &'a [&'a StoredEntity] }
130impl Serialize for EntityListRef<'_> {
131 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
132 let mut seq = s.serialize_seq(Some(self.items.len()))?;
133 for &e in self.items {
134 seq.serialize_element(&EntityRef { kg: self.kg, e })?;
135 }
136 seq.end()
137 }
138}
139
140struct RelationListRef<'a> { kg: &'a KnowledgeGraph, items: &'a [&'a StoredRelation] }
141impl Serialize for RelationListRef<'_> {
142 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
143 let mut seq = s.serialize_seq(Some(self.items.len()))?;
144 for &r in self.items {
145 seq.serialize_element(&RelationRef { kg: self.kg, r })?;
146 }
147 seq.end()
148 }
149}
150
151struct EntityRef<'a> { kg: &'a KnowledgeGraph, e: &'a StoredEntity }
152impl Serialize for EntityRef<'_> {
153 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
154 let mut st = s.serialize_struct("Entity", 3)?;
155 st.serialize_field("name", self.kg.interner.lookup(self.e.name))?;
156 st.serialize_field("entityType", self.kg.interner.lookup(self.e.entity_type))?;
157 st.serialize_field("observations", &ObsRef { kg: self.kg, obs: &self.e.observations })?;
158 st.end()
159 }
160}
161
162struct ObsRef<'a> { kg: &'a KnowledgeGraph, obs: &'a [StrId] }
163impl Serialize for ObsRef<'_> {
164 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
165 let mut seq = s.serialize_seq(Some(self.obs.len()))?;
166 for &o in self.obs {
167 seq.serialize_element(self.kg.interner.lookup(o))?;
168 }
169 seq.end()
170 }
171}
172
173struct RelationRef<'a> { kg: &'a KnowledgeGraph, r: &'a StoredRelation }
174impl Serialize for RelationRef<'_> {
175 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
176 let mut st = s.serialize_struct("Relation", 3)?;
177 st.serialize_field("from", self.kg.interner.lookup(self.r.from))?;
178 st.serialize_field("to", self.kg.interner.lookup(self.r.to))?;
179 st.serialize_field("relationType", self.kg.interner.lookup(self.r.relation_type))?;
180 st.end()
181 }
182}
183
184#[derive(Clone, Copy, PartialEq, Eq, Debug)]
186pub enum Direction {
187 Out,
189 In,
191 Both,
193}
194
195impl Direction {
196 pub fn parse(s: Option<&str>) -> Self {
198 match s {
199 Some("out") => Direction::Out,
200 Some("in") => Direction::In,
201 _ => Direction::Both,
202 }
203 }
204}
205
206fn sanitize_label(s: &str) -> String {
208 let mut out = String::with_capacity(s.len());
209 for c in s.chars() {
210 match c {
211 '"' => out.push('\''),
212 '\n' | '\r' => out.push(' '),
213 _ => out.push(c),
214 }
215 }
216 out
217}
218
219const EMPTY_SLOT: u8 = 0xFF;
229
230#[inline(always)]
231const fn h2(hash: u64) -> u8 {
232 (hash & 0x7F) as u8
233}
234
235#[inline(always)]
236const fn h1(hash: u64, mask: usize) -> usize {
237 ((hash >> 7) as usize) & mask
238}
239
240#[derive(Clone)]
241struct NameTableShard {
242 ctrl: Vec<u8>, names: Vec<StrId>,
244 slots: Vec<u32>,
245 mask: usize,
246 count: usize,
247}
248
249impl NameTableShard {
250 fn new(capacity: usize) -> Self {
251 let cap = capacity.next_power_of_two().max(16);
252 Self {
253 ctrl: vec![EMPTY_SLOT; cap],
254 names: vec![StrId::EMPTY; cap],
255 slots: vec![u32::MAX; cap],
256 mask: cap - 1,
257 count: 0,
258 }
259 }
260
261 #[inline(always)]
262 fn lookup(&self, hash: u64, name: StrId) -> Option<u32> {
263 let stamp = h2(hash);
264 let mask = self.mask;
265 let mut idx = h1(hash, mask);
266 let ctrl = self.ctrl.as_ptr();
267 let names = self.names.as_ptr();
268 let slots = self.slots.as_ptr();
269 let len = self.ctrl.len();
270
271 for _ in 0..len {
272 let prefetch_idx = idx.wrapping_add(4) & mask;
274 unsafe { prefetch_addr(ctrl.add(prefetch_idx)) };
275
276 unsafe {
278 let c = *ctrl.add(idx);
279 if c & 0x80 != 0 {
281 return None;
282 }
283 if c == stamp && *names.add(idx) == name {
285 return Some(*slots.add(idx));
286 }
287 }
288 idx = (idx + 1) & mask;
289 }
290 None
291 }
292
293 fn insert(&mut self, interner: &StringInterner, hash: u64, name: StrId, slot: u32) {
294 if self.count * 4 > self.ctrl.len() * 3 {
295 self.grow(interner);
296 }
297 let stamp = h2(hash);
298 let mask = self.mask;
299 let mut idx = h1(hash, mask);
300 loop {
301 unsafe {
303 if *self.ctrl.get_unchecked(idx) & 0x80 != 0 {
304 *self.ctrl.get_unchecked_mut(idx) = stamp;
305 *self.names.get_unchecked_mut(idx) = name;
306 *self.slots.get_unchecked_mut(idx) = slot;
307 self.count += 1;
308 return;
309 }
310 }
311 idx = (idx + 1) & mask;
312 }
313 }
314
315 fn remove(&mut self, interner: &StringInterner, hash: u64, name: StrId) {
316 let stamp = h2(hash);
317 let mask = self.mask;
318 let mut idx = h1(hash, mask);
319 let len = self.ctrl.len();
320 for _ in 0..len {
321 if self.ctrl[idx] & 0x80 != 0 {
322 return;
323 }
324 if self.ctrl[idx] == stamp && self.names[idx] == name {
325 self.ctrl[idx] = EMPTY_SLOT;
327 self.names[idx] = StrId::EMPTY;
328 self.slots[idx] = u32::MAX;
329 self.count -= 1;
330
331 let mut next = (idx + 1) & mask;
332 while self.ctrl[next] & 0x80 == 0 {
333 let nn = self.names[next];
334 let ns = self.slots[next];
335 let nh = interner.get_hash(nn);
338 self.ctrl[next] = EMPTY_SLOT;
339 self.names[next] = StrId::EMPTY;
340 self.slots[next] = u32::MAX;
341 self.count -= 1;
342
343 let nstamp = h2(nh);
345 let mut re_idx = h1(nh, mask);
346 while self.ctrl[re_idx] & 0x80 == 0 {
347 re_idx = (re_idx + 1) & mask;
348 }
349 self.ctrl[re_idx] = nstamp;
350 self.names[re_idx] = nn;
351 self.slots[re_idx] = ns;
352 self.count += 1;
353
354 next = (next + 1) & mask;
355 }
356 return;
357 }
358 idx = (idx + 1) & mask;
359 }
360 }
361
362 fn grow(&mut self, interner: &StringInterner) {
363 let new_cap = self.ctrl.len() * 2;
364 let new_mask = new_cap - 1;
365 let mut new_ctrl = vec![EMPTY_SLOT; new_cap];
366 let mut new_names = vec![StrId::EMPTY; new_cap];
367 let mut new_slots = vec![u32::MAX; new_cap];
368
369 for i in 0..self.ctrl.len() {
370 if self.ctrl[i] & 0x80 == 0 {
371 let name = self.names[i];
373 let hash = interner.get_hash(name);
374 let stamp = h2(hash);
375 let mut idx = h1(hash, new_mask);
376 while new_ctrl[idx] & 0x80 == 0 {
377 idx = (idx + 1) & new_mask;
378 }
379 new_ctrl[idx] = stamp;
380 new_names[idx] = name;
381 new_slots[idx] = self.slots[i];
382 }
383 }
384
385 self.ctrl = new_ctrl;
386 self.names = new_names;
387 self.slots = new_slots;
388 self.mask = new_mask;
389 }
390}
391
392#[derive(Clone)]
393struct ShardedNameTable {
394 shards: [NameTableShard; NAME_TABLE_SHARDS],
395}
396
397impl ShardedNameTable {
398 fn new(capacity_per_shard: usize) -> Self {
399 Self {
400 shards: [
401 NameTableShard::new(capacity_per_shard),
402 NameTableShard::new(capacity_per_shard),
403 NameTableShard::new(capacity_per_shard),
404 NameTableShard::new(capacity_per_shard),
405 ],
406 }
407 }
408
409 #[inline(always)]
410 const fn shard(hash: u64) -> usize {
411 (hash as usize) & (NAME_TABLE_SHARDS - 1)
412 }
413
414 #[inline(always)]
415 fn lookup(&self, hash: u64, name: StrId) -> Option<u32> {
416 self.shards[Self::shard(hash)].lookup(hash, name)
417 }
418
419 #[inline(always)]
420 fn insert(&mut self, interner: &StringInterner, hash: u64, name: StrId, slot: u32) {
421 self.shards[Self::shard(hash)].insert(interner, hash, name, slot);
422 }
423
424 #[inline(always)]
425 fn remove(&mut self, interner: &StringInterner, hash: u64, name: StrId) {
426 self.shards[Self::shard(hash)].remove(interner, hash, name);
427 }
428}
429
430pub struct KnowledgeGraph {
434 interner: StringInterner,
435 entity_slots: Vec<Option<StoredEntity>>,
436 free_slots: Vec<u32>,
439 name_table: ShardedNameTable,
440 relations: Vec<StoredRelation>,
441 adjacency: AHashMap<StrId, Vec<(StrId, StrId)>>,
445 search: SearchIndex,
446 store: BinaryStore,
447}
448
449#[derive(Clone)]
454pub struct ReadSnapshot {
455 pub(crate) interner: StringInterner,
456 pub(crate) entity_slots: Arc<[Option<StoredEntity>]>,
457 #[allow(dead_code)]
458 free_slots: Vec<u32>,
459 name_table: ShardedNameTable,
460 pub(crate) relations: Arc<[StoredRelation]>,
461 adjacency: AHashMap<StrId, Vec<(StrId, StrId)>>,
462 search: SearchIndex,
463}
464
465impl ReadSnapshot {
467 fn lookup_live_slot(&self, name: &str) -> Option<u32> {
468 let name_id = self.interner.get_optional(name)?;
469 let hash = self.interner.get_hash(name_id);
470 let slot = self.name_table.lookup(hash, name_id)?;
471 self.entity_slots
472 .get(slot as usize)
473 .and_then(|s| s.as_ref())
474 .filter(|e| e.is_live())?;
475 Some(slot)
476 }
477
478 fn entity_by_name_id(&self, name_id: StrId) -> Option<Entity> {
479 let hash = self.interner.get_hash(name_id);
480 let slot = self.name_table.lookup(hash, name_id)?;
481 let e = self.entity_slots.get(slot as usize)?.as_ref()?;
482 Some(self.entity_to_output(e))
483 }
484
485 pub(crate) fn entity_to_output(&self, e: &StoredEntity) -> Entity {
486 Entity {
487 name: self.interner.lookup(e.name).to_string(),
488 entity_type: self.interner.lookup(e.entity_type).to_string(),
489 observations: e
490 .observations
491 .iter()
492 .map(|o| self.interner.lookup(*o).to_string())
493 .collect(),
494 }
495 }
496
497 pub(crate) fn relation_to_output(&self, r: &StoredRelation) -> Relation {
498 Relation {
499 from: self.interner.lookup(r.from).to_string(),
500 to: self.interner.lookup(r.to).to_string(),
501 relation_type: self.interner.lookup(r.relation_type).to_string(),
502 }
503 }
504
505 pub fn open_nodes(&self, names: &[String]) -> KnowledgeGraphOut {
507 let name_ids: std::collections::HashSet<StrId> = names
508 .iter()
509 .filter_map(|n| self.interner.get_optional(n))
510 .collect();
511 let entities: Vec<Entity> = self
512 .entity_slots
513 .iter()
514 .filter_map(|s| {
515 let e = s.as_ref()?;
516 if e.is_live() && name_ids.contains(&e.name) {
517 Some(self.entity_to_output(e))
518 } else {
519 None
520 }
521 })
522 .collect();
523 let matched: std::collections::HashSet<StrId> = entities.iter()
524 .filter_map(|e| self.interner.get_optional(&e.name))
525 .collect();
526 let relations: Vec<Relation> = self
527 .relations
528 .iter()
529 .filter(|r| matched.contains(&r.from) || matched.contains(&r.to))
530 .map(|r| self.relation_to_output(r))
531 .collect();
532 KnowledgeGraphOut { entities, relations }
533 }
534
535 pub fn read_graph(&self) -> KnowledgeGraphOut {
537 let entities: Vec<Entity> = self
538 .entity_slots
539 .iter()
540 .filter_map(|s| s.as_ref().filter(|e| e.is_live()))
541 .map(|e| self.entity_to_output(e))
542 .collect();
543 let relations: Vec<Relation> = self
544 .relations
545 .iter()
546 .map(|r| self.relation_to_output(r))
547 .collect();
548 KnowledgeGraphOut { entities, relations }
549 }
550
551 pub fn search_entities(&self, query: &str) -> Result<Vec<Entity>> {
553 let token = query.to_lowercase();
554 let matching = self.search.search(&token, &self.interner);
555 Ok(matching
556 .iter()
557 .filter_map(|idx| {
558 self.entity_slots
559 .get(*idx as usize)?
560 .as_ref()
561 .filter(|e| e.is_live())
562 .map(|e| self.entity_to_output(e))
563 })
564 .collect())
565 }
566
567 pub fn get_entity(&self, name: &str) -> Option<Entity> {
569 self.lookup_live_slot(name)?;
570 let name_id = self.interner.get_optional(name)?;
571 self.entity_by_name_id(name_id)
572 }
573
574 pub fn neighbors(
576 &self,
577 name: &str,
578 direction: Direction,
579 rtype: Option<&str>,
580 depth: u32,
581 ) -> Result<KnowledgeGraphOut> {
582 self.lookup_live_slot(name)
583 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
584 let start = self.interner.get_optional(name).unwrap();
585
586 let rtype_id = match rtype {
587 Some(r) => match self.interner.get_optional(r) {
588 Some(id) => Some(id),
589 None => {
590 let entities = self.entity_by_name_id(start).into_iter().collect();
591 return Ok(KnowledgeGraphOut { entities, relations: Vec::new() });
592 }
593 },
594 None => None,
595 };
596
597 let mut visited: AHashSet<StrId> = AHashSet::new();
598 visited.insert(start);
599
600 let type_ok = |r: &StoredRelation, rt: Option<StrId>| rt.is_none_or(|rt_id| r.relation_type == rt_id);
601
602 if depth == 1 {
603 for r in self.relations.iter().filter(|r| type_ok(r, rtype_id)) {
604 match direction {
605 Direction::Out => {
606 if r.from == start { visited.insert(r.to); }
607 }
608 Direction::In => {
609 if r.to == start { visited.insert(r.from); }
610 }
611 Direction::Both => {
612 if r.from == start { visited.insert(r.to); }
613 else if r.to == start { visited.insert(r.from); }
614 }
615 }
616 }
617 } else if depth >= 2 {
618 let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
619 match direction {
620 Direction::Both => {
621 for (&node, edges) in &self.adjacency {
622 for &(nb, rt) in edges {
623 if rtype_id.is_none_or(|rt_id| rt == rt_id) {
624 adj.entry(node).or_default().push(nb);
625 }
626 }
627 }
628 }
629 Direction::Out | Direction::In => {
630 for r in self.relations.iter().filter(|r| type_ok(r, rtype_id)) {
631 match direction {
632 Direction::Out => adj.entry(r.from).or_default().push(r.to),
633 Direction::In => adj.entry(r.to).or_default().push(r.from),
634 _ => unreachable!(),
635 }
636 }
637 }
638 }
639 let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
640 queue.push_back((start, 0));
641 while let Some((node, d)) = queue.pop_front() {
642 if d >= depth { continue; }
643 if let Some(nbrs) = adj.get(&node) {
644 for &nb in nbrs {
645 if visited.insert(nb) {
646 queue.push_back((nb, d + 1));
647 }
648 }
649 }
650 }
651 }
652
653 let mut entities = Vec::with_capacity(visited.len());
654 for &nid in &visited {
655 if let Some(e) = self.entity_by_name_id(nid) {
656 entities.push(e);
657 }
658 }
659 let relations: Vec<Relation> = self
660 .relations
661 .iter()
662 .filter(|r| type_ok(r, rtype_id) && visited.contains(&r.from) && visited.contains(&r.to))
663 .map(|r| self.relation_to_output(r))
664 .collect();
665 Ok(KnowledgeGraphOut { entities, relations })
666 }
667
668 pub fn describe_entity(&self, name: &str) -> Result<serde_json::Value> {
670 let name_id = self
671 .interner
672 .get_optional(name)
673 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
674 let entity = self
675 .entity_by_name_id(name_id)
676 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
677
678 let mut incident: Vec<Relation> = Vec::new();
679 let mut neighbor_seen: AHashSet<StrId> = AHashSet::new();
680 let mut neighbors: Vec<&str> = Vec::new();
681 for r in self.relations.iter() {
682 if r.from == name_id || r.to == name_id {
683 incident.push(self.relation_to_output(r));
684 let other = if r.from == name_id { r.to } else { r.from };
685 if other != name_id && neighbor_seen.insert(other) {
686 neighbors.push(self.interner.lookup(other));
687 }
688 }
689 }
690
691 Ok(serde_json::json!({
692 "entity": entity,
693 "relations": incident,
694 "neighbors": neighbors,
695 "degree": incident.len(),
696 }))
697 }
698
699 pub fn find_path(&self, from: &str, to: &str) -> Result<Vec<String>> {
701 let from_id = self
702 .interner
703 .get_optional(from)
704 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
705 let to_id = self
706 .interner
707 .get_optional(to)
708 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
709 if self.lookup_live_slot(from).is_none() {
710 return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
711 }
712 if self.lookup_live_slot(to).is_none() {
713 return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
714 }
715
716 let mut visited: AHashSet<StrId> = AHashSet::new();
718 let mut parent: AHashMap<StrId, StrId> = AHashMap::new();
719 let mut queue: VecDeque<StrId> = VecDeque::new();
720
721 visited.insert(from_id);
722 queue.push_back(from_id);
723
724 while let Some(current) = queue.pop_front() {
725 if current == to_id { break; }
726 if let Some(neighbors) = self.adjacency.get(¤t) {
727 for &(neighbor, _) in neighbors {
728 if visited.insert(neighbor) {
729 parent.insert(neighbor, current);
730 queue.push_back(neighbor);
731 }
732 }
733 }
734 }
735
736 if !visited.contains(&to_id) {
737 return Err(MCSError::MemoryError(format!(
738 "No path found between '{from}' and '{to}'"
739 )));
740 }
741
742 let mut path = Vec::new();
743 let mut cur = to_id;
744 path.push(self.interner.lookup(cur).to_string());
745 while let Some(&p) = parent.get(&cur) {
746 path.push(self.interner.lookup(p).to_string());
747 cur = p;
748 }
749 path.reverse();
750 Ok(path)
751 }
752
753 pub fn extract_subgraph(&self, names: &[String], depth: u32) -> Result<KnowledgeGraphOut> {
755 if names.is_empty() {
756 return Ok(KnowledgeGraphOut { entities: Vec::new(), relations: Vec::new() });
757 }
758 let mut visited: AHashSet<StrId> = AHashSet::new();
759 let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
760 for name in names {
761 if let Some(id) = self.interner.get_optional(name)
762 && visited.insert(id)
763 {
764 queue.push_back((id, 0));
765 }
766 }
767 let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
768 for (&node, edges) in &self.adjacency {
769 let nbrs: Vec<StrId> = edges.iter().map(|(to, _)| *to).collect();
770 adj.insert(node, nbrs);
771 }
772 while let Some((node, d)) = queue.pop_front() {
773 if d >= depth { continue; }
774 if let Some(nbrs) = adj.get(&node) {
775 for &nb in nbrs {
776 if visited.insert(nb) {
777 queue.push_back((nb, d + 1));
778 }
779 }
780 }
781 }
782 let mut entities: Vec<Entity> = Vec::with_capacity(visited.len());
783 for &nid in &visited {
784 if let Some(e) = self.entity_by_name_id(nid) {
785 entities.push(e);
786 }
787 }
788 let relations: Vec<Relation> = self
789 .relations
790 .iter()
791 .filter(|r| visited.contains(&r.from) && visited.contains(&r.to))
792 .map(|r| self.relation_to_output(r))
793 .collect();
794 Ok(KnowledgeGraphOut { entities, relations })
795 }
796
797 pub fn batch_get_entities(&self, names: &[String]) -> Vec<Option<Entity>> {
799 names.iter().map(|n| self.get_entity(n)).collect()
800 }
801
802 pub fn graph_stats(&self) -> serde_json::Value {
804 let entity_count = self
805 .entity_slots
806 .iter()
807 .filter(|s| s.as_ref().is_some_and(|e| e.is_live()))
808 .count();
809 let relation_count = self.relations.len();
810 let type_counts = self.entity_type_counts();
811 let relation_type_counts = self.relation_type_counts();
812 serde_json::json!({
813 "entities": entity_count,
814 "relations": relation_count,
815 "entityTypes": type_counts,
816 "relationTypes": relation_type_counts,
817 })
818 }
819
820 pub fn search_relations(&self, from: Option<&str>, to: Option<&str>, rtype: Option<&str>) -> Vec<Relation> {
822 let from_id = from.and_then(|n| self.interner.get_optional(n));
823 let to_id = to.and_then(|n| self.interner.get_optional(n));
824 let rtype_id = rtype.and_then(|n| self.interner.get_optional(n));
825 self.relations
826 .iter()
827 .filter(|r| {
828 from_id.is_none_or(|id| r.from == id)
829 && to_id.is_none_or(|id| r.to == id)
830 && rtype_id.is_none_or(|id| r.relation_type == id)
831 })
832 .map(|r| self.relation_to_output(r))
833 .collect()
834 }
835
836 pub fn entity_type_counts(&self) -> Vec<(String, usize)> {
838 let mut counts: AHashMap<StrId, usize> = AHashMap::new();
839 for slot in self.entity_slots.iter() {
840 if let Some(e) = slot.as_ref().filter(|e| e.is_live()) {
841 *counts.entry(e.entity_type).or_default() += 1;
842 }
843 }
844 let mut result: Vec<(String, usize)> = counts
845 .into_iter()
846 .map(|(id, c)| (self.interner.lookup(id).to_string(), c))
847 .collect();
848 result.sort_by(|a, b| a.0.cmp(&b.0));
849 result
850 }
851
852 pub fn relation_type_counts(&self) -> Vec<(String, usize)> {
854 let mut counts: AHashMap<StrId, usize> = AHashMap::new();
855 for r in self.relations.iter() {
856 *counts.entry(r.relation_type).or_default() += 1;
857 }
858 let mut result: Vec<(String, usize)> = counts
859 .into_iter()
860 .map(|(id, c)| (self.interner.lookup(id).to_string(), c))
861 .collect();
862 result.sort_by(|a, b| a.0.cmp(&b.0));
863 result
864 }
865
866 pub fn export(&self, format: &str) -> Result<String> {
868 match format {
869 "json" => serde_json::to_string(&self.read_graph()).map_err(MCSError::JsonError),
870 "mermaid" => Ok(self.export_mermaid()),
871 "dot" => Ok(self.export_dot()),
872 other => Err(MCSError::InvalidParams(format!(
873 "Unknown export format '{other}' (expected json|mermaid|dot)"
874 ))),
875 }
876 }
877
878 fn export_mermaid(&self) -> String {
879 let mut out = String::with_capacity(4096);
880 out.push_str("graph LR\n");
881 for r in self.relations.iter() {
882 let from = sanitize_label(self.interner.lookup(r.from));
883 let to = sanitize_label(self.interner.lookup(r.to));
884 let rt = sanitize_label(self.interner.lookup(r.relation_type));
885 out.push_str(&format!(" {} -- \"{}\" --> {}\n", from, rt, to));
886 }
887 out
888 }
889
890 fn export_dot(&self) -> String {
891 let mut out = String::with_capacity(4096);
892 out.push_str("digraph KG {\n");
893 out.push_str(" rankdir=LR;\n");
894 for slot in self.entity_slots.iter() {
895 if let Some(e) = slot.as_ref().filter(|e| e.is_live()) {
896 let name = sanitize_label(self.interner.lookup(e.name));
897 let etype = sanitize_label(self.interner.lookup(e.entity_type));
898 out.push_str(&format!(" \"{}\" [label=\"{}\n({})\"];\n", name, name, etype));
899 }
900 }
901 for r in self.relations.iter() {
902 let from = sanitize_label(self.interner.lookup(r.from));
903 let to = sanitize_label(self.interner.lookup(r.to));
904 let rt = sanitize_label(self.interner.lookup(r.relation_type));
905 out.push_str(&format!(" \"{}\" -> \"{}\" [label=\"{}\"];\n", from, to, rt));
906 }
907 out.push_str("}\n");
908 out
909 }
910
911 pub fn find_all_paths(
913 &self,
914 from: &str,
915 to: &str,
916 max_depth: usize,
917 max_paths: usize,
918 ) -> Result<Vec<Vec<String>>> {
919 let from_id = self
920 .interner
921 .get_optional(from)
922 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
923 let to_id = self
924 .interner
925 .get_optional(to)
926 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
927 if self.lookup_live_slot(from).is_none() {
928 return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
929 }
930 if self.lookup_live_slot(to).is_none() {
931 return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
932 }
933 if from_id == to_id {
934 return Ok(vec![vec![from.to_string()]]);
935 }
936 let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::with_capacity(self.adjacency.len());
937 for (&node, edges) in &self.adjacency {
938 let nbrs: Vec<StrId> = edges.iter().map(|(to, _)| *to).collect();
939 adj.insert(node, nbrs);
940 }
941 let mut all_paths: Vec<Vec<StrId>> = Vec::new();
942 let mut current_path = Vec::new();
943 let mut visited: AHashSet<StrId> = AHashSet::new();
944 visited.insert(from_id);
945 current_path.push(from_id);
946 Self::dfs_all_paths(
947 &adj,
948 from_id,
949 to_id,
950 max_depth,
951 max_paths,
952 &mut visited,
953 &mut current_path,
954 &mut all_paths,
955 );
956 if all_paths.is_empty() {
957 return Err(MCSError::MemoryError(format!(
958 "No path found between '{from}' and '{to}'"
959 )));
960 }
961 let result: Vec<Vec<String>> = all_paths
962 .into_iter()
963 .map(|path| {
964 path.into_iter()
965 .map(|id| self.interner.lookup(id).to_string())
966 .collect()
967 })
968 .collect();
969 Ok(result)
970 }
971
972 fn dfs_all_paths(
973 adj: &AHashMap<StrId, Vec<StrId>>,
974 current: StrId,
975 target: StrId,
976 max_depth: usize,
977 max_paths: usize,
978 visited: &mut AHashSet<StrId>,
979 current_path: &mut Vec<StrId>,
980 all_paths: &mut Vec<Vec<StrId>>,
981 ) {
982 if all_paths.len() >= max_paths { return; }
983 if current == target && current_path.len() > 1 {
984 all_paths.push(current_path.clone());
985 return;
986 }
987 if current_path.len() > max_depth { return; }
988 if let Some(neighbors) = adj.get(¤t) {
989 for &nb in neighbors {
990 if !visited.contains(&nb) {
991 visited.insert(nb);
992 current_path.push(nb);
993 Self::dfs_all_paths(adj, nb, target, max_depth, max_paths, visited, current_path, all_paths);
994 current_path.pop();
995 visited.remove(&nb);
996 }
997 }
998 }
999 }
1000}
1001
1002impl KnowledgeGraph {
1003 pub fn new(path: &Path) -> std::io::Result<Self> {
1004 let store = BinaryStore::new(path)?;
1005
1006 let mut interner = StringInterner::with_capacity(65536, 1024);
1008 let mut entity_slots: Vec<Option<StoredEntity>> = Vec::with_capacity(256);
1009 let mut name_table = ShardedNameTable::new(64);
1010 let mut relations: Vec<StoredRelation> = Vec::with_capacity(64);
1011 let mut search = SearchIndex::new();
1012
1013 let mut pending: Option<Vec<(RecordKind, Vec<u8>)>> = None;
1018 store.replay(|kind, data| {
1019 match kind {
1020 RecordKind::TxnBegin => pending = Some(Vec::new()),
1021 RecordKind::TxnCommit => {
1022 if let Some(buffered) = pending.take() {
1023 for (k, d) in &buffered {
1024 Self::apply_record(
1025 *k, d, &mut interner, &mut entity_slots, &mut search,
1026 &mut name_table, &mut relations,
1027 );
1028 }
1029 }
1030 }
1031 other => match pending.as_mut() {
1032 Some(buffered) => buffered.push((other, data.to_vec())),
1033 None => Self::apply_record(
1034 other, data, &mut interner, &mut entity_slots, &mut search,
1035 &mut name_table, &mut relations,
1036 ),
1037 },
1038 }
1039 })?;
1040
1041 let free_slots: Vec<u32> = entity_slots
1043 .iter()
1044 .enumerate()
1045 .filter(|(_, s)| s.is_none())
1046 .map(|(i, _)| i as u32)
1047 .collect();
1048
1049 let mut adjacency: AHashMap<StrId, Vec<(StrId, StrId)>> = AHashMap::new();
1050 for rel in &relations {
1051 adjacency.entry(rel.from).or_default().push((rel.to, rel.relation_type));
1052 adjacency.entry(rel.to).or_default().push((rel.from, rel.relation_type));
1053 }
1054
1055 Ok(Self {
1056 interner,
1057 entity_slots,
1058 free_slots,
1059 name_table,
1060 relations,
1061 adjacency,
1062 search,
1063 store,
1064 })
1065 }
1066
1067 #[allow(clippy::too_many_arguments)]
1075 fn apply_record(
1076 kind: RecordKind,
1077 data: &[u8],
1078 interner: &mut StringInterner,
1079 entity_slots: &mut Vec<Option<StoredEntity>>,
1080 search: &mut SearchIndex,
1081 name_table: &mut ShardedNameTable,
1082 relations: &mut Vec<StoredRelation>,
1083 ) {
1084 match kind {
1085 RecordKind::CreateEntity => {
1086 if let Some((name, etype, obs)) = store_enc::decode_create_entity(data) {
1087 Self::replay_create_entity(
1088 interner, entity_slots, search, name_table, name, etype, &obs,
1089 );
1090 }
1091 }
1092 RecordKind::CreateRelation => {
1093 if let Some((from, to, rtype)) = store_enc::decode_create_relation(data) {
1094 let from_id = interner.intern(from);
1095 let to_id = interner.intern(to);
1096 let type_id = interner.intern(rtype);
1097 relations.push(StoredRelation {
1098 from: from_id,
1099 to: to_id,
1100 relation_type: type_id,
1101 });
1102 }
1103 }
1104 RecordKind::AddObservations => {
1105 if let Some((name, obs)) = store_enc::decode_add_observations(data) {
1106 Self::replay_add_observations(
1107 interner, entity_slots, search, name_table, name, &obs,
1108 );
1109 }
1110 }
1111 RecordKind::DeleteEntity => {
1112 if let Some(name) = store_enc::decode_delete_entity(data) {
1113 Self::replay_delete_entity(
1114 interner, entity_slots, relations, search, name_table, name,
1115 );
1116 }
1117 }
1118 RecordKind::DeleteObservations => {
1119 if let Some((name, obs)) = store_enc::decode_delete_observations(data) {
1120 Self::replay_delete_observations(
1121 interner, entity_slots, search, name_table, name, &obs,
1122 );
1123 }
1124 }
1125 RecordKind::DeleteRelation => {
1126 if let Some((from, to, rtype)) = store_enc::decode_delete_relation(data) {
1127 let from_id = interner.intern(from);
1128 let to_id = interner.intern(to);
1129 let type_id = interner.intern(rtype);
1130 relations.retain(|r| {
1131 !(r.from == from_id && r.to == to_id && r.relation_type == type_id)
1132 });
1133 }
1134 }
1135 RecordKind::TxnBegin | RecordKind::TxnCommit => {}
1136 }
1137 }
1138
1139 #[allow(clippy::ptr_arg)]
1140 fn replay_create_entity(
1141 interner: &mut StringInterner,
1142 entities: &mut Vec<Option<StoredEntity>>,
1143 search: &mut SearchIndex,
1144 name_table: &mut ShardedNameTable,
1145 name: &str,
1146 etype: &str,
1147 observations: &[&str],
1148 ) {
1149 let name_id = interner.intern(name);
1150 let type_id = interner.intern(etype);
1151 let obs_ids: Vec<StrId> = observations.iter().map(|o| interner.intern(o)).collect();
1152 let slot = entities.len() as u32;
1153 entities.push(Some(StoredEntity {
1154 state: ENTITY_SLOT_LIVE,
1155 name: name_id,
1156 entity_type: type_id,
1157 observations: obs_ids.clone(),
1158 }));
1159 let hash = interner.get_hash(name_id);
1160 name_table.insert(&*interner, hash, name_id, slot);
1161 search.index_entity(interner, slot, name_id, type_id, &obs_ids);
1162 }
1163
1164 fn replay_add_observations(
1165 interner: &mut StringInterner,
1166 entities: &mut [Option<StoredEntity>],
1167 search: &mut SearchIndex,
1168 name_table: &mut ShardedNameTable,
1169 name: &str,
1170 observations: &[&str],
1171 ) {
1172 let name_id = interner.intern(name);
1173 let hash = interner.get_hash(name_id);
1174 if let Some(slot) = name_table.lookup(hash, name_id)
1175 && let Some(Some(entity)) = entities.get_mut(slot as usize)
1176 {
1177 for &o in observations {
1178 let oid = interner.intern(o);
1179 if !entity.observations.contains(&oid) {
1180 entity.observations.push(oid);
1181 }
1182 }
1183 search.remove_entity(slot);
1184 search.index_entity(
1185 interner,
1186 slot,
1187 entity.name,
1188 entity.entity_type,
1189 &entity.observations,
1190 );
1191 }
1192 }
1193
1194 fn replay_delete_entity(
1195 interner: &mut StringInterner,
1196 entities: &mut [Option<StoredEntity>],
1197 rels: &mut Vec<StoredRelation>,
1198 search: &mut SearchIndex,
1199 name_table: &mut ShardedNameTable,
1200 name: &str,
1201 ) {
1202 let name_id = interner.intern(name);
1203 let hash = interner.get_hash(name_id);
1204 if let Some(slot) = name_table.lookup(hash, name_id)
1205 && let Some(Some(_)) = entities.get(slot as usize)
1206 {
1207 entities[slot as usize] = None;
1208 search.remove_entity(slot);
1209 name_table.remove(&*interner, hash, name_id);
1210 }
1211 rels.retain(|r| r.from != name_id && r.to != name_id);
1212 }
1213
1214 fn replay_delete_observations(
1215 interner: &mut StringInterner,
1216 entities: &mut [Option<StoredEntity>],
1217 search: &mut SearchIndex,
1218 name_table: &mut ShardedNameTable,
1219 name: &str,
1220 observations: &[&str],
1221 ) {
1222 let name_id = interner.intern(name);
1223 let hash = interner.get_hash(name_id);
1224 if let Some(slot) = name_table.lookup(hash, name_id)
1225 && let Some(Some(entity)) = entities.get_mut(slot as usize)
1226 {
1227 let remove_ids: Vec<StrId> = observations.iter().map(|o| interner.intern(o)).collect();
1228 entity.observations.retain(|o| !remove_ids.contains(o));
1229 search.remove_entity(slot);
1230 search.index_entity(
1231 interner,
1232 slot,
1233 entity.name,
1234 entity.entity_type,
1235 &entity.observations,
1236 );
1237 }
1238 }
1239
1240 pub const fn interner(&self) -> &StringInterner {
1245 &self.interner
1246 }
1247
1248 pub fn get_entity(&self, name: &str) -> Option<Entity> {
1250 let name_id = self.interner.get_optional(name)?;
1251 let hash = self.interner.get_hash(name_id);
1252 let slot = self.name_table.lookup(hash, name_id)?;
1253 let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
1254 if !stored.is_live() {
1255 return None;
1256 }
1257 Some(self.entity_to_output(stored))
1258 }
1259
1260 pub fn graph_stats(&self) -> serde_json::Value {
1262 let live_entities = self
1263 .entity_slots
1264 .iter()
1265 .filter(|s| s.as_ref().is_some_and(|e| e.is_live()))
1266 .count();
1267 let total_relations = self.relations.len();
1268 let index_entries = self.search.len();
1269 let total_obs: usize = self
1270 .entity_slots
1271 .iter()
1272 .filter_map(|s| s.as_ref())
1273 .filter(|e| e.is_live())
1274 .map(|e| e.observations.len())
1275 .sum();
1276
1277 serde_json::json!({
1278 "entities": live_entities,
1279 "relations": total_relations,
1280 "totalObservations": total_obs,
1281 "searchIndexEntries": index_entries,
1282 "internedStrings": self.interner.len(),
1283 "internedBytes": self.interner.total_bytes(),
1284 })
1285 }
1286
1287 pub fn search_relations(&self, from: Option<&str>, to: Option<&str>, rtype: Option<&str>) -> Vec<Relation> {
1291 let from_id = match from {
1292 Some(f) => match self.interner.get_optional(f) {
1293 Some(id) => Some(id),
1294 None => return Vec::new(),
1295 },
1296 None => None,
1297 };
1298 let to_id = match to {
1299 Some(t) => match self.interner.get_optional(t) {
1300 Some(id) => Some(id),
1301 None => return Vec::new(),
1302 },
1303 None => None,
1304 };
1305 let rtype_id = match rtype {
1306 Some(r) => match self.interner.get_optional(r) {
1307 Some(id) => Some(id),
1308 None => return Vec::new(),
1309 },
1310 None => None,
1311 };
1312
1313 self.relations
1314 .iter()
1315 .filter(|r| {
1316 from_id.is_none_or(|f| r.from == f)
1317 && to_id.is_none_or(|t| r.to == t)
1318 && rtype_id.is_none_or(|rt| r.relation_type == rt)
1319 })
1320 .map(|r| Relation {
1321 from: self.interner.lookup(r.from).to_string(),
1322 to: self.interner.lookup(r.to).to_string(),
1323 relation_type: self.interner.lookup(r.relation_type).to_string(),
1324 })
1325 .collect()
1326 }
1327
1328 pub fn find_path(&self, from: &str, to: &str) -> Result<Vec<String>> {
1331 let from_id = self.interner.get_optional(from)
1332 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
1333 let to_id = self.interner.get_optional(to)
1334 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
1335 let hash_from = self.interner.get_hash(from_id);
1336 let hash_to = self.interner.get_hash(to_id);
1337
1338 if self.name_table.lookup(hash_from, from_id).is_none() {
1339 return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
1340 }
1341 if self.name_table.lookup(hash_to, to_id).is_none() {
1342 return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
1343 }
1344 if from_id == to_id {
1345 return Ok(vec![from.to_string()]);
1346 }
1347
1348 let mut visited: AHashSet<StrId> = AHashSet::new();
1350 let mut parent: AHashMap<StrId, StrId> = AHashMap::new();
1351 let mut queue: VecDeque<StrId> = VecDeque::new();
1352
1353 visited.insert(from_id);
1354 queue.push_back(from_id);
1355
1356 while let Some(current) = queue.pop_front() {
1357 if current == to_id {
1358 break;
1359 }
1360
1361 if let Some(neighbors) = self.adjacency.get(¤t) {
1362 for &(neighbor, _) in neighbors {
1363 if visited.insert(neighbor) {
1364 parent.insert(neighbor, current);
1365 queue.push_back(neighbor);
1366 }
1367 }
1368 }
1369 }
1370
1371 if !parent.contains_key(&to_id) && from_id != to_id {
1372 return Err(MCSError::MemoryError(format!(
1373 "No path found between '{from}' and '{to}'"
1374 )));
1375 }
1376
1377 let mut path: Vec<String> = Vec::new();
1379 let mut cur = to_id;
1380 loop {
1381 path.push(self.interner.lookup(cur).to_string());
1382 if cur == from_id {
1383 break;
1384 }
1385 cur = *parent.get(&cur).ok_or_else(|| {
1386 MCSError::MemoryError("Path reconstruction failed".into())
1387 })?;
1388 }
1389 path.reverse();
1390 Ok(path)
1391 }
1392
1393 pub fn compact(&mut self) -> Result<()> {
1398 let mut create_entities: Vec<Entity> = Vec::new();
1400 let mut create_relations: Vec<Relation> = Vec::new();
1401
1402 for slot in &self.entity_slots {
1403 if let Some(stored) = slot.as_ref().filter(|e| e.is_live()) {
1404 create_entities.push(self.entity_to_output(stored));
1405 }
1406 }
1407 for rel in &self.relations {
1408 create_relations.push(Relation {
1409 from: self.interner.lookup(rel.from).to_string(),
1410 to: self.interner.lookup(rel.to).to_string(),
1411 relation_type: self.interner.lookup(rel.relation_type).to_string(),
1412 });
1413 }
1414
1415 let tmp_path = self.store.path().with_extension("tmp");
1422 if let Err(e) = std::fs::remove_file(&tmp_path)
1423 && e.kind() != std::io::ErrorKind::NotFound
1424 {
1425 return Err(MCSError::IoError(e));
1426 }
1427 let mut tmp_store = BinaryStore::new(&tmp_path).map_err(MCSError::IoError)?;
1428 for entity in &create_entities {
1429 let mut buf = Vec::new();
1430 store_enc::encode_create_entity(&mut buf, &entity.name, &entity.entity_type, &entity.observations)
1431 .map_err(MCSError::IoError)?;
1432 tmp_store.write_record(RecordKind::CreateEntity, &buf).map_err(MCSError::IoError)?;
1433 }
1434 for relation in &create_relations {
1435 let mut buf = Vec::new();
1436 store_enc::encode_create_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
1437 .map_err(MCSError::IoError)?;
1438 tmp_store.write_record(RecordKind::CreateRelation, &buf).map_err(MCSError::IoError)?;
1439 }
1440 tmp_store.flush_and_sync().map_err(MCSError::IoError)?;
1441 drop(tmp_store);
1442
1443 std::fs::rename(&tmp_path, self.store.path()).map_err(MCSError::IoError)?;
1448 sync_parent_dir(self.store.path()).map_err(MCSError::IoError)?;
1449
1450 let path = self.store.path().clone();
1456 *self = KnowledgeGraph::new(&path).map_err(MCSError::IoError)?;
1457
1458 Ok(())
1459 }
1460
1461 pub fn create_entities(&mut self, entities: &[Entity]) -> Result<Vec<Entity>> {
1464 for entity in entities {
1466 if entity.name.is_empty() {
1467 return Err(MCSError::InvalidParams(
1468 "Entity name must not be empty".into(),
1469 ));
1470 }
1471 }
1472 let mut created = Vec::new();
1473 for entity in entities {
1474 let existing = self.interner.get_optional(&entity.name)
1476 .and_then(|id| {
1477 let hash = self.interner.get_hash(id);
1478 self.name_table.lookup(hash, id)
1479 });
1480 if existing.is_some() {
1481 continue;
1482 }
1483 let mut buf = Vec::new();
1485 store_enc::encode_create_entity(&mut buf, &entity.name, &entity.entity_type, &entity.observations)
1486 .map_err(MCSError::IoError)?;
1487 self.store.write_record(RecordKind::CreateEntity, &buf)
1488 .map_err(MCSError::IoError)?;
1489
1490 let name_id = self.interner.intern(&entity.name);
1491 let hash = self.interner.get_hash(name_id);
1492 let type_id = self.interner.intern(&entity.entity_type);
1493 let obs_ids: Vec<StrId> = entity
1494 .observations
1495 .iter()
1496 .map(|o| self.interner.intern(o))
1497 .collect();
1498 let reused = self.free_slots.pop();
1501 let slot = reused.unwrap_or(self.entity_slots.len() as u32);
1502 self.search
1503 .index_entity(&mut self.interner, slot, name_id, type_id, &obs_ids);
1504 let stored = Some(StoredEntity {
1505 state: ENTITY_SLOT_LIVE,
1506 name: name_id,
1507 entity_type: type_id,
1508 observations: obs_ids,
1509 });
1510 match reused {
1511 Some(s) => self.entity_slots[s as usize] = stored,
1512 None => self.entity_slots.push(stored),
1513 }
1514 self.name_table.insert(&self.interner, hash, name_id, slot);
1515 created.push(Entity {
1516 name: entity.name.clone(),
1517 entity_type: entity.entity_type.clone(),
1518 observations: entity.observations.clone(),
1519 });
1520 }
1521 Ok(created)
1522 }
1523
1524 pub fn create_relations(&mut self, relations: &[Relation]) -> Result<Vec<Relation>> {
1525 for relation in relations {
1527 if relation.from.is_empty() || relation.to.is_empty() {
1528 return Err(MCSError::InvalidParams(
1529 "Relation endpoints must not be empty".into(),
1530 ));
1531 }
1532 }
1533 let mut created = Vec::new();
1534 let mut rel_set: AHashSet<(StrId, StrId, StrId)> = AHashSet::new();
1536 for rel in &self.relations {
1537 rel_set.insert((rel.from, rel.to, rel.relation_type));
1538 }
1539 for relation in relations {
1540 let from_id = self.interner.intern(&relation.from);
1541 let to_id = self.interner.intern(&relation.to);
1542 let type_id = self.interner.intern(&relation.relation_type);
1543 if !rel_set.insert((from_id, to_id, type_id)) {
1544 continue;
1545 }
1546 let mut buf = Vec::new();
1548 store_enc::encode_create_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
1549 .map_err(MCSError::IoError)?;
1550 self.store.write_record(RecordKind::CreateRelation, &buf)
1551 .map_err(MCSError::IoError)?;
1552
1553 self.relations.push(StoredRelation {
1554 from: from_id,
1555 to: to_id,
1556 relation_type: type_id,
1557 });
1558 self.adjacency.entry(from_id).or_default().push((to_id, type_id));
1559 self.adjacency.entry(to_id).or_default().push((from_id, type_id));
1560 created.push(Relation {
1561 from: relation.from.clone(),
1562 to: relation.to.clone(),
1563 relation_type: relation.relation_type.clone(),
1564 });
1565 }
1566 Ok(created)
1567 }
1568
1569 pub fn add_observations(&mut self, entity_name: &str, contents: &[String]) -> Result<Vec<String>> {
1570 let name_id = self.interner.get_optional(entity_name)
1571 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1572 let hash = self.interner.get_hash(name_id);
1573 let slot = self
1574 .name_table
1575 .lookup(hash, name_id)
1576 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1577 let existing: AHashSet<StrId> = self
1580 .entity_slots
1581 .get(slot as usize)
1582 .and_then(|e| e.as_ref())
1583 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?
1584 .observations
1585 .iter()
1586 .copied()
1587 .collect();
1588
1589 let mut added = Vec::new();
1592 let mut interned_added = Vec::new();
1593 let mut seen: AHashSet<StrId> = AHashSet::new();
1594 for content in contents {
1595 let cid = self.interner.intern(content);
1596 if existing.contains(&cid) || !seen.insert(cid) {
1597 continue;
1598 }
1599 interned_added.push(cid);
1600 added.push(content.clone());
1601 }
1602 if added.is_empty() {
1603 return Ok(added);
1604 }
1605
1606 let mut buf = Vec::new();
1609 store_enc::encode_add_observations(&mut buf, entity_name, &added)
1610 .map_err(MCSError::IoError)?;
1611 self.store.write_record(RecordKind::AddObservations, &buf)
1612 .map_err(MCSError::IoError)?;
1613
1614 let stored = self
1616 .entity_slots
1617 .get_mut(slot as usize)
1618 .and_then(|e| e.as_mut())
1619 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1620 stored.observations.extend_from_slice(&interned_added);
1621
1622 self.search
1625 .index_additional(&mut self.interner, slot, &interned_added);
1626 Ok(added)
1627 }
1628
1629 pub fn delete_entities(&mut self, entity_names: &[String]) -> Result<()> {
1630 let mut deleted_names = Vec::new();
1631 for name in entity_names {
1632 let name_id_opt = self.interner.get_optional(name);
1633 if let Some(name_id) = name_id_opt {
1634 let hash = self.interner.get_hash(name_id);
1635 if let Some(slot) = self.name_table.lookup(hash, name_id)
1636 && let Some(Some(_)) = self.entity_slots.get(slot as usize)
1637 {
1638 let mut buf = Vec::new();
1640 store_enc::encode_delete_entity(&mut buf, name)
1641 .map_err(MCSError::IoError)?;
1642 self.store.write_record(RecordKind::DeleteEntity, &buf)
1643 .map_err(MCSError::IoError)?;
1644
1645 self.entity_slots[slot as usize] = None;
1646 self.free_slots.push(slot);
1647 self.search.remove_entity(slot);
1648 self.name_table.remove(&self.interner, hash, name_id);
1649 deleted_names.push(name.clone());
1650 }
1651 }
1652 }
1653 if !deleted_names.is_empty() {
1654 let deleted_ids: AHashSet<StrId> = deleted_names.iter()
1656 .map(|n| self.interner.intern(n))
1657 .collect();
1658 self.relations
1659 .retain(|r| !deleted_ids.contains(&r.from) && !deleted_ids.contains(&r.to));
1660 for id in &deleted_ids {
1662 self.adjacency.remove(id);
1663 for list in self.adjacency.values_mut() {
1665 list.retain(|(to, _)| !deleted_ids.contains(to));
1666 }
1667 }
1668 }
1669 Ok(())
1670 }
1671
1672 pub fn delete_observations(&mut self, entity_name: &str, observations: &[String]) -> Result<()> {
1673 let name_id = self.interner.get_optional(entity_name)
1674 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1675 let hash = self.interner.get_hash(name_id);
1676 let slot = self
1677 .name_table
1678 .lookup(hash, name_id)
1679 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1680 self.entity_slots
1682 .get(slot as usize)
1683 .and_then(|e| e.as_ref())
1684 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1685 let remove_ids: AHashSet<StrId> = observations.iter().map(|o| self.interner.intern(o)).collect();
1686
1687 let mut buf = Vec::new();
1689 store_enc::encode_delete_observations(&mut buf, entity_name, observations)
1690 .map_err(MCSError::IoError)?;
1691 self.store.write_record(RecordKind::DeleteObservations, &buf)
1692 .map_err(MCSError::IoError)?;
1693
1694 let stored = self
1696 .entity_slots
1697 .get_mut(slot as usize)
1698 .and_then(|e| e.as_mut())
1699 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1700 stored.observations.retain(|o| !remove_ids.contains(o));
1701 self.search.remove_entity(slot);
1702 self.search
1703 .index_entity(&mut self.interner, slot, stored.name, stored.entity_type, &stored.observations);
1704 Ok(())
1705 }
1706
1707 pub fn delete_relations(&mut self, relations: &[Relation]) -> Result<()> {
1708 let rels: AHashSet<(StrId, StrId, StrId)> = relations
1710 .iter()
1711 .map(|r| {
1712 (
1713 self.interner.intern(&r.from),
1714 self.interner.intern(&r.to),
1715 self.interner.intern(&r.relation_type),
1716 )
1717 })
1718 .collect();
1719 for relation in relations {
1722 let mut buf = Vec::new();
1723 store_enc::encode_delete_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
1724 .map_err(MCSError::IoError)?;
1725 self.store.write_record(RecordKind::DeleteRelation, &buf)
1726 .map_err(MCSError::IoError)?;
1727 }
1728 self.relations
1729 .retain(|r| !rels.contains(&(r.from, r.to, r.relation_type)));
1730 for (f, t, rt) in &rels {
1732 if let Some(edges) = self.adjacency.get_mut(f) {
1733 edges.retain(|(to, rtype)| to != t || rtype != rt);
1734 if edges.is_empty() {
1735 self.adjacency.remove(f);
1736 }
1737 }
1738 if let Some(edges) = self.adjacency.get_mut(t) {
1739 edges.retain(|(to, rtype)| to != f || rtype != rt);
1740 if edges.is_empty() {
1741 self.adjacency.remove(t);
1742 }
1743 }
1744 }
1745 Ok(())
1746 }
1747
1748 pub fn read_graph(&self) -> KnowledgeGraphOut {
1749 self.read_graph_view().to_owned_out()
1750 }
1751
1752 pub fn read_graph_view(&self) -> GraphView<'_> {
1756 let entities: Vec<&StoredEntity> = self
1757 .entity_slots
1758 .iter()
1759 .filter_map(|s| s.as_ref().filter(|e| e.is_live()))
1760 .collect();
1761 let relations: Vec<&StoredRelation> = self.relations.iter().collect();
1762 GraphView { kg: self, entities, relations }
1763 }
1764
1765 pub fn search_nodes(&self, query: &str) -> KnowledgeGraphOut {
1768 self.search_nodes_filtered(query, None, 0, usize::MAX)
1769 }
1770
1771 pub fn open_nodes(&self, names: &[String]) -> KnowledgeGraphOut {
1772 self.open_nodes_view(names).to_owned_out()
1773 }
1774
1775 pub fn open_nodes_view(&self, names: &[String]) -> GraphView<'_> {
1777 let name_ids: AHashSet<StrId> = names.iter()
1778 .filter_map(|n| self.interner.get_optional(n))
1779 .collect();
1780 let entities: Vec<&StoredEntity> = self
1781 .entity_slots
1782 .iter()
1783 .filter_map(|s| {
1784 s.as_ref()
1785 .filter(|stored| stored.is_live() && name_ids.contains(&stored.name))
1786 })
1787 .collect();
1788 let matched_names: AHashSet<StrId> = entities.iter().map(|e| e.name).collect();
1789 let relations: Vec<&StoredRelation> = self
1790 .relations
1791 .iter()
1792 .filter(|r| matched_names.contains(&r.from) || matched_names.contains(&r.to))
1793 .collect();
1794 GraphView { kg: self, entities, relations }
1795 }
1796
1797 fn entity_to_output(&self, stored: &StoredEntity) -> Entity {
1802 Entity {
1803 name: self.interner.lookup(stored.name).to_string(),
1804 entity_type: self.interner.lookup(stored.entity_type).to_string(),
1805 observations: stored
1806 .observations
1807 .iter()
1808 .map(|o| self.interner.lookup(*o).to_string())
1809 .collect(),
1810 }
1811 }
1812
1813 #[inline]
1814 fn relation_to_output(&self, r: &StoredRelation) -> Relation {
1815 Relation {
1816 from: self.interner.lookup(r.from).to_string(),
1817 to: self.interner.lookup(r.to).to_string(),
1818 relation_type: self.interner.lookup(r.relation_type).to_string(),
1819 }
1820 }
1821
1822 fn lookup_live_slot(&self, name: &str) -> Option<u32> {
1824 let name_id = self.interner.get_optional(name)?;
1825 let hash = self.interner.get_hash(name_id);
1826 let slot = self.name_table.lookup(hash, name_id)?;
1827 let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
1828 stored.is_live().then_some(slot)
1829 }
1830
1831 fn entity_by_name_id(&self, name_id: StrId) -> Option<Entity> {
1833 let hash = self.interner.get_hash(name_id);
1834 let slot = self.name_table.lookup(hash, name_id)?;
1835 let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
1836 stored.is_live().then(|| self.entity_to_output(stored))
1837 }
1838
1839 pub fn entity_type_counts(&self) -> Vec<(String, usize)> {
1843 let mut counts: AHashMap<StrId, usize> = AHashMap::new();
1844 for st in self
1845 .entity_slots
1846 .iter()
1847 .filter_map(|s| s.as_ref())
1848 .filter(|e| e.is_live())
1849 {
1850 *counts.entry(st.entity_type).or_insert(0) += 1;
1851 }
1852 self.rank_counts(counts)
1853 }
1854
1855 pub fn relation_type_counts(&self) -> Vec<(String, usize)> {
1857 let mut counts: AHashMap<StrId, usize> = AHashMap::new();
1858 for r in &self.relations {
1859 *counts.entry(r.relation_type).or_insert(0) += 1;
1860 }
1861 self.rank_counts(counts)
1862 }
1863
1864 fn rank_counts(&self, counts: AHashMap<StrId, usize>) -> Vec<(String, usize)> {
1865 let mut out: Vec<(String, usize)> = counts
1866 .into_iter()
1867 .map(|(id, c)| (self.interner.lookup(id).to_string(), c))
1868 .collect();
1869 out.sort_unstable_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
1870 out
1871 }
1872
1873 pub fn search_nodes_filtered(
1877 &self,
1878 query: &str,
1879 entity_type: Option<&str>,
1880 offset: usize,
1881 limit: usize,
1882 ) -> KnowledgeGraphOut {
1883 self.search_nodes_view(query, entity_type, offset, limit).to_owned_out()
1884 }
1885
1886 pub fn search_nodes_view(
1888 &self,
1889 query: &str,
1890 entity_type: Option<&str>,
1891 offset: usize,
1892 limit: usize,
1893 ) -> GraphView<'_> {
1894 let type_id = match entity_type {
1895 Some(t) => match self.interner.get_optional(t) {
1896 Some(id) => Some(id),
1897 None => return GraphView { kg: self, entities: Vec::new(), relations: Vec::new() },
1898 },
1899 None => None,
1900 };
1901
1902 let ranked = self.search.search_ranked(query, &self.interner);
1903 let mut selected: AHashSet<StrId> = AHashSet::new();
1904 let mut entities: Vec<&StoredEntity> = Vec::new();
1905 let mut skipped = 0usize;
1906 for (slot, _score) in ranked {
1907 let Some(st) = self
1908 .entity_slots
1909 .get(slot as usize)
1910 .and_then(|s| s.as_ref())
1911 .filter(|e| e.is_live())
1912 else {
1913 continue;
1914 };
1915 if type_id.is_some_and(|tid| st.entity_type != tid) {
1916 continue;
1917 }
1918 if skipped < offset {
1919 skipped += 1;
1920 continue;
1921 }
1922 if entities.len() >= limit {
1923 break;
1924 }
1925 selected.insert(st.name);
1926 entities.push(st);
1927 }
1928
1929 let relations: Vec<&StoredRelation> = self
1930 .relations
1931 .iter()
1932 .filter(|r| selected.contains(&r.from) || selected.contains(&r.to))
1933 .collect();
1934 GraphView { kg: self, entities, relations }
1935 }
1936
1937 pub fn read_graph_filtered(
1941 &self,
1942 entity_type: Option<&str>,
1943 offset: usize,
1944 limit: usize,
1945 ) -> KnowledgeGraphOut {
1946 self.read_graph_filtered_view(entity_type, offset, limit).to_owned_out()
1947 }
1948
1949 pub fn read_graph_filtered_view(
1951 &self,
1952 entity_type: Option<&str>,
1953 offset: usize,
1954 limit: usize,
1955 ) -> GraphView<'_> {
1956 let type_id = match entity_type {
1957 Some(t) => match self.interner.get_optional(t) {
1958 Some(id) => Some(id),
1959 None => return GraphView { kg: self, entities: Vec::new(), relations: Vec::new() },
1960 },
1961 None => None,
1962 };
1963
1964 let mut selected: AHashSet<StrId> = AHashSet::new();
1965 let mut entities: Vec<&StoredEntity> = Vec::new();
1966 let mut skipped = 0usize;
1967 for st in self
1968 .entity_slots
1969 .iter()
1970 .filter_map(|s| s.as_ref())
1971 .filter(|e| e.is_live())
1972 {
1973 if type_id.is_some_and(|tid| st.entity_type != tid) {
1974 continue;
1975 }
1976 if skipped < offset {
1977 skipped += 1;
1978 continue;
1979 }
1980 if entities.len() >= limit {
1981 break;
1982 }
1983 selected.insert(st.name);
1984 entities.push(st);
1985 }
1986
1987 let relations: Vec<&StoredRelation> = self
1988 .relations
1989 .iter()
1990 .filter(|r| selected.contains(&r.from) && selected.contains(&r.to))
1991 .collect();
1992 GraphView { kg: self, entities, relations }
1993 }
1994
1995 pub fn neighbors(
2003 &self,
2004 name: &str,
2005 direction: Direction,
2006 rtype: Option<&str>,
2007 depth: u32,
2008 ) -> Result<KnowledgeGraphOut> {
2009 self.lookup_live_slot(name)
2010 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
2011 let start = self.interner.get_optional(name).unwrap();
2013
2014 let rtype_id = match rtype {
2016 Some(r) => match self.interner.get_optional(r) {
2017 Some(id) => Some(id),
2018 None => {
2019 let entities = self.entity_by_name_id(start).into_iter().collect();
2020 return Ok(KnowledgeGraphOut { entities, relations: Vec::new() });
2021 }
2022 },
2023 None => None,
2024 };
2025
2026 let mut visited: AHashSet<StrId> = AHashSet::new();
2027 visited.insert(start);
2028
2029 let type_ok = |r: &StoredRelation| rtype_id.is_none_or(|rt| r.relation_type == rt);
2030
2031 if depth == 1 {
2032 for r in self.relations.iter().filter(|r| type_ok(r)) {
2033 match direction {
2034 Direction::Out => {
2035 if r.from == start {
2036 visited.insert(r.to);
2037 }
2038 }
2039 Direction::In => {
2040 if r.to == start {
2041 visited.insert(r.from);
2042 }
2043 }
2044 Direction::Both => {
2045 if r.from == start {
2046 visited.insert(r.to);
2047 } else if r.to == start {
2048 visited.insert(r.from);
2049 }
2050 }
2051 }
2052 }
2053 } else if depth >= 2 {
2054 let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
2058 match direction {
2059 Direction::Both => {
2060 for (&node, edges) in &self.adjacency {
2061 for &(nb, rt) in edges {
2062 if rtype_id.is_none_or(|rt_id| rt == rt_id) {
2063 adj.entry(node).or_default().push(nb);
2064 }
2065 }
2066 }
2067 }
2068 Direction::Out | Direction::In => {
2069 for r in self.relations.iter().filter(|r| type_ok(r)) {
2070 match direction {
2071 Direction::Out => adj.entry(r.from).or_default().push(r.to),
2072 Direction::In => adj.entry(r.to).or_default().push(r.from),
2073 _ => unreachable!(),
2074 }
2075 }
2076 }
2077 }
2078 let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
2079 queue.push_back((start, 0));
2080 while let Some((node, d)) = queue.pop_front() {
2081 if d >= depth {
2082 continue;
2083 }
2084 if let Some(nbrs) = adj.get(&node) {
2085 for &nb in nbrs {
2086 if visited.insert(nb) {
2087 queue.push_back((nb, d + 1));
2088 }
2089 }
2090 }
2091 }
2092 }
2093
2094 let mut entities = Vec::with_capacity(visited.len());
2095 for &nid in &visited {
2096 if let Some(e) = self.entity_by_name_id(nid) {
2097 entities.push(e);
2098 }
2099 }
2100 let relations = self
2101 .relations
2102 .iter()
2103 .filter(|r| type_ok(r) && visited.contains(&r.from) && visited.contains(&r.to))
2104 .map(|r| self.relation_to_output(r))
2105 .collect();
2106 Ok(KnowledgeGraphOut { entities, relations })
2107 }
2108
2109 pub fn describe_entity(&self, name: &str) -> Result<serde_json::Value> {
2113 let name_id = self
2114 .interner
2115 .get_optional(name)
2116 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
2117 let entity = self
2118 .entity_by_name_id(name_id)
2119 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
2120
2121 let mut incident: Vec<Relation> = Vec::new();
2122 let mut neighbor_seen: AHashSet<StrId> = AHashSet::new();
2123 let mut neighbors: Vec<&str> = Vec::new();
2124 for r in &self.relations {
2125 if r.from == name_id || r.to == name_id {
2126 incident.push(self.relation_to_output(r));
2127 let other = if r.from == name_id { r.to } else { r.from };
2128 if other != name_id && neighbor_seen.insert(other) {
2129 neighbors.push(self.interner.lookup(other));
2130 }
2131 }
2132 }
2133
2134 Ok(serde_json::json!({
2135 "entity": entity,
2136 "relations": incident,
2137 "neighbors": neighbors,
2138 "degree": incident.len(),
2139 }))
2140 }
2141
2142 pub fn upsert_entities(&mut self, entities: &[Entity]) -> Result<Vec<serde_json::Value>> {
2147 for e in entities {
2148 if e.name.is_empty() {
2149 return Err(MCSError::InvalidParams(
2150 "Entity name must not be empty".into(),
2151 ));
2152 }
2153 }
2154 let mut out = Vec::with_capacity(entities.len());
2155 for e in entities {
2156 if self.lookup_live_slot(&e.name).is_some() {
2157 let added = self.add_observations(&e.name, &e.observations)?;
2158 out.push(serde_json::json!({
2159 "name": e.name,
2160 "created": false,
2161 "addedObservations": added,
2162 }));
2163 } else {
2164 let created = self.create_entities(std::slice::from_ref(e))?;
2165 out.push(serde_json::json!({
2166 "name": e.name,
2167 "created": !created.is_empty(),
2168 "addedObservations": e.observations,
2169 }));
2170 }
2171 }
2172 Ok(out)
2173 }
2174
2175 pub fn export(&self, format: &str) -> Result<String> {
2177 match format {
2178 "json" => serde_json::to_string(&self.read_graph()).map_err(MCSError::JsonError),
2179 "mermaid" => Ok(self.export_mermaid()),
2180 "dot" => Ok(self.export_dot()),
2181 other => Err(MCSError::InvalidParams(format!(
2182 "Unknown export format '{other}' (expected json|mermaid|dot)"
2183 ))),
2184 }
2185 }
2186
2187 fn diagram_node_ids(&self) -> (AHashMap<StrId, usize>, Vec<(usize, StrId)>) {
2189 let mut ids: AHashMap<StrId, usize> = AHashMap::new();
2190 let mut order: Vec<(usize, StrId)> = Vec::new();
2191 for st in self
2192 .entity_slots
2193 .iter()
2194 .filter_map(|s| s.as_ref())
2195 .filter(|e| e.is_live())
2196 {
2197 let n = ids.len();
2198 ids.insert(st.name, n);
2199 order.push((n, st.name));
2200 }
2201 (ids, order)
2202 }
2203
2204 fn export_mermaid(&self) -> String {
2205 let (ids, order) = self.diagram_node_ids();
2206 let mut s = String::with_capacity(64 + order.len() * 32 + self.relations.len() * 32);
2207 s.push_str("graph LR\n");
2208 for (n, name_id) in &order {
2209 let label = sanitize_label(self.interner.lookup(*name_id));
2210 s.push_str(&format!(" n{n}[\"{label}\"]\n"));
2211 }
2212 for r in &self.relations {
2213 if let (Some(&a), Some(&b)) = (ids.get(&r.from), ids.get(&r.to)) {
2214 let rel = sanitize_label(self.interner.lookup(r.relation_type));
2215 s.push_str(&format!(" n{a} -->|{rel}| n{b}\n"));
2216 }
2217 }
2218 s
2219 }
2220
2221 fn export_dot(&self) -> String {
2222 let (ids, order) = self.diagram_node_ids();
2223 let mut s = String::with_capacity(64 + order.len() * 32 + self.relations.len() * 32);
2224 s.push_str("digraph G {\n");
2225 for (n, name_id) in &order {
2226 let label = sanitize_label(self.interner.lookup(*name_id));
2227 s.push_str(&format!(" n{n} [label=\"{label}\"];\n"));
2228 }
2229 for r in &self.relations {
2230 if let (Some(&a), Some(&b)) = (ids.get(&r.from), ids.get(&r.to)) {
2231 let rel = sanitize_label(self.interner.lookup(r.relation_type));
2232 s.push_str(&format!(" n{a} -> n{b} [label=\"{rel}\"];\n"));
2233 }
2234 }
2235 s.push_str("}\n");
2236 s
2237 }
2238
2239 pub fn merge_entities(&mut self, source: &str, target: &str) -> Result<serde_json::Value> {
2252 if source == target {
2253 return Err(MCSError::InvalidParams(
2254 "Source and target must be different entities".into(),
2255 ));
2256 }
2257 self.lookup_live_slot(source).ok_or_else(|| {
2258 MCSError::InvalidParams(format!("Source entity '{source}' not found"))
2259 })?;
2260 let target_slot = self.lookup_live_slot(target).ok_or_else(|| {
2261 MCSError::InvalidParams(format!("Target entity '{target}' not found"))
2262 })?;
2263
2264 let source_entity = self.get_entity(source).unwrap();
2265 let moved_obs_count = source_entity.observations.len();
2266 let source_id = self.interner.get_optional(source).unwrap();
2267 let target_id = self.interner.get_optional(target).unwrap();
2268
2269 let target_existing: AHashSet<StrId> = self.entity_slots[target_slot as usize]
2272 .as_ref()
2273 .unwrap()
2274 .observations
2275 .iter()
2276 .copied()
2277 .collect();
2278 let mut obs_seen: AHashSet<StrId> = AHashSet::new();
2279 let mut obs_to_add: Vec<String> = Vec::new();
2280 for o in &source_entity.observations {
2281 if let Some(oid) = self.interner.get_optional(o)
2282 && !target_existing.contains(&oid)
2283 && obs_seen.insert(oid)
2284 {
2285 obs_to_add.push(o.clone());
2286 }
2287 }
2288
2289 let existing_rels: AHashSet<(StrId, StrId, StrId)> =
2292 self.relations.iter().map(|r| (r.from, r.to, r.relation_type)).collect();
2293 let mut rel_seen: AHashSet<(StrId, StrId, StrId)> = AHashSet::new();
2294 let mut redirect: Vec<Relation> = Vec::new();
2295 for r in &self.relations {
2296 if r.from != source_id && r.to != source_id {
2297 continue;
2298 }
2299 let new_from = if r.from == source_id { target_id } else { r.from };
2300 let new_to = if r.to == source_id { target_id } else { r.to };
2301 if new_from == new_to {
2302 continue; }
2304 let key = (new_from, new_to, r.relation_type);
2305 if existing_rels.contains(&key) || !rel_seen.insert(key) {
2306 continue;
2307 }
2308 redirect.push(Relation {
2309 from: self.interner.lookup(new_from).to_string(),
2310 to: self.interner.lookup(new_to).to_string(),
2311 relation_type: self.interner.lookup(r.relation_type).to_string(),
2312 });
2313 }
2314
2315 let added_count = obs_to_add.len();
2316 let redirected = redirect.len() as u32;
2317
2318 let mut records: Vec<(RecordKind, Vec<u8>)> = Vec::new();
2320 if !obs_to_add.is_empty() {
2321 let mut buf = Vec::new();
2322 store_enc::encode_add_observations(&mut buf, target, &obs_to_add)
2323 .map_err(MCSError::IoError)?;
2324 records.push((RecordKind::AddObservations, buf));
2325 }
2326 for r in &redirect {
2327 let mut buf = Vec::new();
2328 store_enc::encode_create_relation(&mut buf, &r.from, &r.to, &r.relation_type)
2329 .map_err(MCSError::IoError)?;
2330 records.push((RecordKind::CreateRelation, buf));
2331 }
2332 let mut del_buf = Vec::new();
2333 store_enc::encode_delete_entity(&mut del_buf, source).map_err(MCSError::IoError)?;
2334 records.push((RecordKind::DeleteEntity, del_buf));
2335
2336 self.store.write_record(RecordKind::TxnBegin, &[]).map_err(MCSError::IoError)?;
2338 for (kind, data) in &records {
2339 self.store.write_record(*kind, data).map_err(MCSError::IoError)?;
2340 }
2341 self.store.write_record(RecordKind::TxnCommit, &[]).map_err(MCSError::IoError)?;
2342
2343 for (kind, data) in &records {
2345 Self::apply_record(
2346 *kind, data, &mut self.interner, &mut self.entity_slots, &mut self.search,
2347 &mut self.name_table, &mut self.relations,
2348 );
2349 }
2350
2351 Ok(serde_json::json!({
2352 "source": source,
2353 "target": target,
2354 "movedObservations": moved_obs_count,
2355 "addedObservations": added_count,
2356 "redirectedRelations": redirected,
2357 }))
2358 }
2359
2360 pub fn extract_subgraph(&self, names: &[String], depth: u32) -> Result<KnowledgeGraphOut> {
2364 if names.is_empty() {
2365 return Ok(KnowledgeGraphOut {
2366 entities: Vec::new(),
2367 relations: Vec::new(),
2368 });
2369 }
2370 let mut visited: AHashSet<StrId> = AHashSet::new();
2372 let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
2373 for name in names {
2374 if let Some(id) = self.interner.get_optional(name)
2375 && visited.insert(id)
2376 {
2377 queue.push_back((id, 0));
2378 }
2379 }
2380 let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
2382 for (&node, edges) in &self.adjacency {
2383 let nb: Vec<StrId> = edges.iter().map(|(to, _)| *to).collect();
2384 adj.insert(node, nb);
2385 }
2386 while let Some((node, d)) = queue.pop_front() {
2387 if d >= depth {
2388 continue;
2389 }
2390 if let Some(nbrs) = adj.get(&node) {
2391 for &nb in nbrs {
2392 if visited.insert(nb) {
2393 queue.push_back((nb, d + 1));
2394 }
2395 }
2396 }
2397 }
2398 let mut entities: Vec<Entity> = Vec::with_capacity(visited.len());
2399 for &nid in &visited {
2400 if let Some(e) = self.entity_by_name_id(nid) {
2401 entities.push(e);
2402 }
2403 }
2404 let relations: Vec<Relation> = self
2405 .relations
2406 .iter()
2407 .filter(|r| visited.contains(&r.from) && visited.contains(&r.to))
2408 .map(|r| self.relation_to_output(r))
2409 .collect();
2410 Ok(KnowledgeGraphOut { entities, relations })
2411 }
2412
2413 pub fn batch_get_entities(&self, names: &[String]) -> Vec<Option<Entity>> {
2415 names.iter().map(|n| self.get_entity(n)).collect()
2416 }
2417
2418 #[allow(clippy::too_many_arguments)]
2421 fn dfs_all_paths(
2422 adj: &AHashMap<StrId, Vec<StrId>>,
2423 current: StrId,
2424 target: StrId,
2425 max_depth: usize,
2426 max_paths: usize,
2427 visited: &mut AHashSet<StrId>,
2428 current_path: &mut Vec<StrId>,
2429 all_paths: &mut Vec<Vec<StrId>>,
2430 ) {
2431 if all_paths.len() >= max_paths {
2432 return;
2433 }
2434 if current == target && current_path.len() > 1 {
2435 all_paths.push(current_path.clone());
2436 return;
2437 }
2438 if current_path.len() > max_depth {
2439 return;
2440 }
2441 if let Some(neighbors) = adj.get(¤t) {
2442 for &nb in neighbors {
2443 if visited.insert(nb) {
2444 current_path.push(nb);
2445 Self::dfs_all_paths(
2446 adj, nb, target, max_depth, max_paths, visited, current_path, all_paths,
2447 );
2448 current_path.pop();
2449 visited.remove(&nb);
2450 }
2451 }
2452 }
2453 }
2454
2455 pub fn find_all_paths(
2459 &self,
2460 from: &str,
2461 to: &str,
2462 max_depth: usize,
2463 max_paths: usize,
2464 ) -> Result<Vec<Vec<String>>> {
2465 let from_id = self
2466 .interner
2467 .get_optional(from)
2468 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
2469 let to_id = self
2470 .interner
2471 .get_optional(to)
2472 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
2473 if self.lookup_live_slot(from).is_none() {
2475 return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
2476 }
2477 if self.lookup_live_slot(to).is_none() {
2478 return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
2479 }
2480 if from_id == to_id {
2481 return Ok(vec![vec![from.to_string()]]);
2482 }
2483 let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::with_capacity(self.adjacency.len());
2485 for (&node, edges) in &self.adjacency {
2486 let nbrs: Vec<StrId> = edges.iter().map(|(to, _)| *to).collect();
2487 adj.insert(node, nbrs);
2488 }
2489 let mut all_paths: Vec<Vec<StrId>> = Vec::new();
2490 let mut current_path = Vec::new();
2491 let mut visited: AHashSet<StrId> = AHashSet::new();
2492 visited.insert(from_id);
2493 current_path.push(from_id);
2494 Self::dfs_all_paths(
2495 &adj,
2496 from_id,
2497 to_id,
2498 max_depth,
2499 max_paths,
2500 &mut visited,
2501 &mut current_path,
2502 &mut all_paths,
2503 );
2504 if all_paths.is_empty() {
2505 return Err(MCSError::MemoryError(format!(
2506 "No path found between '{from}' and '{to}'"
2507 )));
2508 }
2509 let result: Vec<Vec<String>> = all_paths
2510 .into_iter()
2511 .map(|path| {
2512 path.into_iter()
2513 .map(|id| self.interner.lookup(id).to_string())
2514 .collect()
2515 })
2516 .collect();
2517 Ok(result)
2518 }
2519
2520 pub fn snapshot(&self) -> ReadSnapshot {
2525 ReadSnapshot {
2526 interner: self.interner.clone(),
2527 entity_slots: Arc::from_iter(self.entity_slots.iter().cloned()),
2528 free_slots: self.free_slots.clone(),
2529 name_table: self.name_table.clone(),
2530 relations: Arc::from_iter(self.relations.iter().cloned()),
2531 adjacency: self.adjacency.clone(),
2532 search: self.search.clone(),
2533 }
2534 }
2535
2536 pub fn flush_and_sync(&mut self) -> Result<()> {
2540 self.store.flush_and_sync().map_err(MCSError::IoError)
2541 }
2542}
2543
2544
2545
2546pub struct GraphHandle {
2557 inner: Arc<Mutex<KnowledgeGraph>>,
2558 snapshot: ArcSwap<ReadSnapshot>,
2559}
2560
2561pub struct WriteGuard<'a> {
2563 guard: parking_lot::MutexGuard<'a, KnowledgeGraph>,
2564 snapshot: &'a ArcSwap<ReadSnapshot>,
2565 did_publish: bool,
2566}
2567
2568impl WriteGuard<'_> {
2569 pub fn publish(&mut self) {
2571 let snap = Arc::new(self.guard.snapshot());
2572 self.snapshot.store(snap);
2573 self.did_publish = true;
2574 }
2575
2576 pub fn graph(&mut self) -> &mut KnowledgeGraph {
2578 &mut self.guard
2579 }
2580}
2581
2582impl std::ops::Deref for WriteGuard<'_> {
2583 type Target = KnowledgeGraph;
2584 fn deref(&self) -> &KnowledgeGraph {
2585 &self.guard
2586 }
2587}
2588
2589impl std::ops::DerefMut for WriteGuard<'_> {
2590 fn deref_mut(&mut self) -> &mut KnowledgeGraph {
2591 &mut self.guard
2592 }
2593}
2594
2595impl Drop for WriteGuard<'_> {
2596 fn drop(&mut self) {
2597 if !self.did_publish {
2598 self.publish();
2599 }
2600 }
2601}
2602
2603impl GraphHandle {
2604 pub fn new(path: &Path) -> std::io::Result<Self> {
2606 let kg = KnowledgeGraph::new(path)?;
2607 let snapshot = Arc::new(kg.snapshot());
2608 Ok(Self {
2609 inner: Arc::new(Mutex::new(kg)),
2610 snapshot: ArcSwap::new(snapshot),
2611 })
2612 }
2613
2614 pub fn read(&self) -> ReadSnapshot {
2616 (**self.snapshot.load()).clone()
2617 }
2618
2619 pub fn write(&self) -> WriteGuard<'_> {
2622 WriteGuard {
2623 guard: self.inner.lock(),
2624 snapshot: &self.snapshot,
2625 did_publish: false,
2626 }
2627 }
2628}
2629
2630