1use std::collections::VecDeque;
2
3use ahash::{AHashMap, AHashSet};
4use std::path::Path;
5
6use serde::ser::{Serialize, SerializeSeq, SerializeStruct, Serializer};
7
8use crate::errors::{MCSError, Result};
9use crate::intern::{StrId, StringInterner};
10use crate::types::{Entity, Relation, KnowledgeGraphOut};
11use crate::search::SearchIndex;
12use crate::store::{self as store_enc, BinaryStore, RecordKind};
13
14const ENTITY_SLOT_LIVE: u8 = 1;
15const NAME_TABLE_SHARDS: usize = 4;
16
17#[cfg(target_arch = "x86_64")]
22#[inline(always)]
23unsafe fn prefetch_addr(addr: *const u8) {
24 std::arch::x86_64::_mm_prefetch::<3>(addr);
26}
27
28#[cfg(not(target_arch = "x86_64"))]
29#[inline(always)]
30const unsafe fn prefetch_addr(_addr: *const u8) {}
31
32#[cfg_attr(feature = "cache_align", repr(align(64)))]
41struct StoredEntity {
42 state: u8,
43 name: StrId,
44 entity_type: StrId,
45 observations: Vec<StrId>,
46}
47
48impl StoredEntity {
49 const fn is_live(&self) -> bool {
50 self.state == ENTITY_SLOT_LIVE
51 }
52}
53
54#[cfg_attr(feature = "cache_align", repr(align(16)))]
58struct StoredRelation {
59 from: StrId,
60 to: StrId,
61 relation_type: StrId,
62}
63
64pub struct GraphView<'a> {
78 kg: &'a KnowledgeGraph,
79 entities: Vec<&'a StoredEntity>,
80 relations: Vec<&'a StoredRelation>,
81}
82
83impl GraphView<'_> {
84 pub fn to_owned_out(&self) -> KnowledgeGraphOut {
88 KnowledgeGraphOut {
89 entities: self.entities.iter().map(|e| self.kg.entity_to_output(e)).collect(),
90 relations: self.relations.iter().map(|r| self.kg.relation_to_output(r)).collect(),
91 }
92 }
93}
94
95impl Serialize for GraphView<'_> {
96 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
97 let mut st = s.serialize_struct("KnowledgeGraphOut", 2)?;
98 st.serialize_field("entities", &EntityListRef { kg: self.kg, items: &self.entities })?;
99 st.serialize_field("relations", &RelationListRef { kg: self.kg, items: &self.relations })?;
100 st.end()
101 }
102}
103
104struct EntityListRef<'a> { kg: &'a KnowledgeGraph, items: &'a [&'a StoredEntity] }
105impl Serialize for EntityListRef<'_> {
106 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
107 let mut seq = s.serialize_seq(Some(self.items.len()))?;
108 for &e in self.items {
109 seq.serialize_element(&EntityRef { kg: self.kg, e })?;
110 }
111 seq.end()
112 }
113}
114
115struct RelationListRef<'a> { kg: &'a KnowledgeGraph, items: &'a [&'a StoredRelation] }
116impl Serialize for RelationListRef<'_> {
117 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
118 let mut seq = s.serialize_seq(Some(self.items.len()))?;
119 for &r in self.items {
120 seq.serialize_element(&RelationRef { kg: self.kg, r })?;
121 }
122 seq.end()
123 }
124}
125
126struct EntityRef<'a> { kg: &'a KnowledgeGraph, e: &'a StoredEntity }
127impl Serialize for EntityRef<'_> {
128 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
129 let mut st = s.serialize_struct("Entity", 3)?;
130 st.serialize_field("name", self.kg.interner.lookup(self.e.name))?;
131 st.serialize_field("entityType", self.kg.interner.lookup(self.e.entity_type))?;
132 st.serialize_field("observations", &ObsRef { kg: self.kg, obs: &self.e.observations })?;
133 st.end()
134 }
135}
136
137struct ObsRef<'a> { kg: &'a KnowledgeGraph, obs: &'a [StrId] }
138impl Serialize for ObsRef<'_> {
139 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
140 let mut seq = s.serialize_seq(Some(self.obs.len()))?;
141 for &o in self.obs {
142 seq.serialize_element(self.kg.interner.lookup(o))?;
143 }
144 seq.end()
145 }
146}
147
148struct RelationRef<'a> { kg: &'a KnowledgeGraph, r: &'a StoredRelation }
149impl Serialize for RelationRef<'_> {
150 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
151 let mut st = s.serialize_struct("Relation", 3)?;
152 st.serialize_field("from", self.kg.interner.lookup(self.r.from))?;
153 st.serialize_field("to", self.kg.interner.lookup(self.r.to))?;
154 st.serialize_field("relationType", self.kg.interner.lookup(self.r.relation_type))?;
155 st.end()
156 }
157}
158
159#[derive(Clone, Copy, PartialEq, Eq, Debug)]
161pub enum Direction {
162 Out,
164 In,
166 Both,
168}
169
170impl Direction {
171 pub fn parse(s: Option<&str>) -> Self {
173 match s {
174 Some("out") => Direction::Out,
175 Some("in") => Direction::In,
176 _ => Direction::Both,
177 }
178 }
179}
180
181fn sanitize_label(s: &str) -> String {
183 let mut out = String::with_capacity(s.len());
184 for c in s.chars() {
185 match c {
186 '"' => out.push('\''),
187 '\n' | '\r' => out.push(' '),
188 _ => out.push(c),
189 }
190 }
191 out
192}
193
194const EMPTY_SLOT: u8 = 0xFF;
204
205#[inline(always)]
206const fn h2(hash: u64) -> u8 {
207 (hash & 0x7F) as u8
208}
209
210#[inline(always)]
211const fn h1(hash: u64, mask: usize) -> usize {
212 ((hash >> 7) as usize) & mask
213}
214
215struct NameTableShard {
216 ctrl: Vec<u8>, names: Vec<StrId>,
218 slots: Vec<u32>,
219 mask: usize,
220 count: usize,
221}
222
223impl NameTableShard {
224 fn new(capacity: usize) -> Self {
225 let cap = capacity.next_power_of_two().max(16);
226 Self {
227 ctrl: vec![EMPTY_SLOT; cap],
228 names: vec![StrId::EMPTY; cap],
229 slots: vec![u32::MAX; cap],
230 mask: cap - 1,
231 count: 0,
232 }
233 }
234
235 #[inline(always)]
236 fn lookup(&self, hash: u64, name: StrId) -> Option<u32> {
237 let stamp = h2(hash);
238 let mask = self.mask;
239 let mut idx = h1(hash, mask);
240 let ctrl = self.ctrl.as_ptr();
241 let names = self.names.as_ptr();
242 let slots = self.slots.as_ptr();
243 let len = self.ctrl.len();
244
245 for _ in 0..len {
246 let prefetch_idx = idx.wrapping_add(4) & mask;
248 unsafe { prefetch_addr(ctrl.add(prefetch_idx)) };
249
250 unsafe {
252 let c = *ctrl.add(idx);
253 if c & 0x80 != 0 {
255 return None;
256 }
257 if c == stamp && *names.add(idx) == name {
259 return Some(*slots.add(idx));
260 }
261 }
262 idx = (idx + 1) & mask;
263 }
264 None
265 }
266
267 fn insert(&mut self, interner: &StringInterner, hash: u64, name: StrId, slot: u32) {
268 if self.count * 4 > self.ctrl.len() * 3 {
269 self.grow(interner);
270 }
271 let stamp = h2(hash);
272 let mask = self.mask;
273 let mut idx = h1(hash, mask);
274 loop {
275 unsafe {
277 if *self.ctrl.get_unchecked(idx) & 0x80 != 0 {
278 *self.ctrl.get_unchecked_mut(idx) = stamp;
279 *self.names.get_unchecked_mut(idx) = name;
280 *self.slots.get_unchecked_mut(idx) = slot;
281 self.count += 1;
282 return;
283 }
284 }
285 idx = (idx + 1) & mask;
286 }
287 }
288
289 fn remove(&mut self, interner: &StringInterner, hash: u64, name: StrId) {
290 let stamp = h2(hash);
291 let mask = self.mask;
292 let mut idx = h1(hash, mask);
293 let len = self.ctrl.len();
294 for _ in 0..len {
295 if self.ctrl[idx] & 0x80 != 0 {
296 return;
297 }
298 if self.ctrl[idx] == stamp && self.names[idx] == name {
299 self.ctrl[idx] = EMPTY_SLOT;
301 self.names[idx] = StrId::EMPTY;
302 self.slots[idx] = u32::MAX;
303 self.count -= 1;
304
305 let mut next = (idx + 1) & mask;
306 while self.ctrl[next] & 0x80 == 0 {
307 let nn = self.names[next];
308 let ns = self.slots[next];
309 let nh = interner.get_hash(nn);
312 self.ctrl[next] = EMPTY_SLOT;
313 self.names[next] = StrId::EMPTY;
314 self.slots[next] = u32::MAX;
315 self.count -= 1;
316
317 let nstamp = h2(nh);
319 let mut re_idx = h1(nh, mask);
320 while self.ctrl[re_idx] & 0x80 == 0 {
321 re_idx = (re_idx + 1) & mask;
322 }
323 self.ctrl[re_idx] = nstamp;
324 self.names[re_idx] = nn;
325 self.slots[re_idx] = ns;
326 self.count += 1;
327
328 next = (next + 1) & mask;
329 }
330 return;
331 }
332 idx = (idx + 1) & mask;
333 }
334 }
335
336 fn grow(&mut self, interner: &StringInterner) {
337 let new_cap = self.ctrl.len() * 2;
338 let new_mask = new_cap - 1;
339 let mut new_ctrl = vec![EMPTY_SLOT; new_cap];
340 let mut new_names = vec![StrId::EMPTY; new_cap];
341 let mut new_slots = vec![u32::MAX; new_cap];
342
343 for i in 0..self.ctrl.len() {
344 if self.ctrl[i] & 0x80 == 0 {
345 let name = self.names[i];
347 let hash = interner.get_hash(name);
348 let stamp = h2(hash);
349 let mut idx = h1(hash, new_mask);
350 while new_ctrl[idx] & 0x80 == 0 {
351 idx = (idx + 1) & new_mask;
352 }
353 new_ctrl[idx] = stamp;
354 new_names[idx] = name;
355 new_slots[idx] = self.slots[i];
356 }
357 }
358
359 self.ctrl = new_ctrl;
360 self.names = new_names;
361 self.slots = new_slots;
362 self.mask = new_mask;
363 }
364}
365
366struct ShardedNameTable {
367 shards: [NameTableShard; NAME_TABLE_SHARDS],
368}
369
370impl ShardedNameTable {
371 fn new(capacity_per_shard: usize) -> Self {
372 Self {
373 shards: [
374 NameTableShard::new(capacity_per_shard),
375 NameTableShard::new(capacity_per_shard),
376 NameTableShard::new(capacity_per_shard),
377 NameTableShard::new(capacity_per_shard),
378 ],
379 }
380 }
381
382 #[inline(always)]
383 const fn shard(hash: u64) -> usize {
384 (hash as usize) & (NAME_TABLE_SHARDS - 1)
385 }
386
387 #[inline(always)]
388 fn lookup(&self, hash: u64, name: StrId) -> Option<u32> {
389 self.shards[Self::shard(hash)].lookup(hash, name)
390 }
391
392 #[inline(always)]
393 fn insert(&mut self, interner: &StringInterner, hash: u64, name: StrId, slot: u32) {
394 self.shards[Self::shard(hash)].insert(interner, hash, name, slot);
395 }
396
397 #[inline(always)]
398 fn remove(&mut self, interner: &StringInterner, hash: u64, name: StrId) {
399 self.shards[Self::shard(hash)].remove(interner, hash, name);
400 }
401}
402
403pub struct KnowledgeGraph {
407 interner: StringInterner,
408 entity_slots: Vec<Option<StoredEntity>>,
409 free_slots: Vec<u32>,
412 name_table: ShardedNameTable,
413 relations: Vec<StoredRelation>,
414 search: SearchIndex,
415 store: BinaryStore,
416}
417
418impl KnowledgeGraph {
419 pub fn new(path: &Path) -> std::io::Result<Self> {
420 let store = BinaryStore::new(path)?;
421
422 let mut interner = StringInterner::with_capacity(65536, 1024);
424 let mut entity_slots: Vec<Option<StoredEntity>> = Vec::with_capacity(256);
425 let mut name_table = ShardedNameTable::new(64);
426 let mut relations: Vec<StoredRelation> = Vec::with_capacity(64);
427 let mut search = SearchIndex::new();
428
429 store.replay(|kind, data| {
430 match kind {
431 RecordKind::CreateEntity => {
432 if let Some((name, etype, obs)) = store_enc::decode_create_entity(data) {
433 Self::replay_create_entity(
434 &mut interner, &mut entity_slots, &mut search, &mut name_table, name, etype, &obs,
435 );
436 }
437 }
438 RecordKind::CreateRelation => {
439 if let Some((from, to, rtype)) = store_enc::decode_create_relation(data) {
440 let from_id = interner.intern(from);
441 let to_id = interner.intern(to);
442 let type_id = interner.intern(rtype);
443 relations.push(StoredRelation {
444 from: from_id,
445 to: to_id,
446 relation_type: type_id,
447 });
448 }
449 }
450 RecordKind::AddObservations => {
451 if let Some((name, obs)) = store_enc::decode_add_observations(data) {
452 Self::replay_add_observations(
453 &mut interner, &mut entity_slots, &mut search, &mut name_table, name, &obs,
454 );
455 }
456 }
457 RecordKind::DeleteEntity => {
458 if let Some(name) = store_enc::decode_delete_entity(data) {
459 Self::replay_delete_entity(
460 &mut interner, &mut entity_slots, &mut relations, &mut search, &mut name_table, name,
461 );
462 }
463 }
464 RecordKind::DeleteObservations => {
465 if let Some((name, obs)) = store_enc::decode_delete_observations(data) {
466 Self::replay_delete_observations(
467 &mut interner, &mut entity_slots, &mut search, &mut name_table, name, &obs,
468 );
469 }
470 }
471 RecordKind::DeleteRelation => {
472 if let Some((from, to, rtype)) = store_enc::decode_delete_relation(data) {
473 let from_id = interner.intern(from);
474 let to_id = interner.intern(to);
475 let type_id = interner.intern(rtype);
476 relations.retain(|r| {
477 !(r.from == from_id && r.to == to_id && r.relation_type == type_id)
478 });
479 }
480 }
481 }
482 })?;
483
484 let free_slots: Vec<u32> = entity_slots
486 .iter()
487 .enumerate()
488 .filter(|(_, s)| s.is_none())
489 .map(|(i, _)| i as u32)
490 .collect();
491
492 Ok(Self {
493 interner,
494 entity_slots,
495 free_slots,
496 name_table,
497 relations,
498 search,
499 store,
500 })
501 }
502
503 #[allow(clippy::ptr_arg)]
508 fn replay_create_entity(
509 interner: &mut StringInterner,
510 entities: &mut Vec<Option<StoredEntity>>,
511 search: &mut SearchIndex,
512 name_table: &mut ShardedNameTable,
513 name: &str,
514 etype: &str,
515 observations: &[&str],
516 ) {
517 let name_id = interner.intern(name);
518 let type_id = interner.intern(etype);
519 let obs_ids: Vec<StrId> = observations.iter().map(|o| interner.intern(o)).collect();
520 let slot = entities.len() as u32;
521 entities.push(Some(StoredEntity {
522 state: ENTITY_SLOT_LIVE,
523 name: name_id,
524 entity_type: type_id,
525 observations: obs_ids.clone(),
526 }));
527 let hash = interner.get_hash(name_id);
528 name_table.insert(&*interner, hash, name_id, slot);
529 search.index_entity(interner, slot, name_id, type_id, &obs_ids);
530 }
531
532 fn replay_add_observations(
533 interner: &mut StringInterner,
534 entities: &mut [Option<StoredEntity>],
535 search: &mut SearchIndex,
536 name_table: &mut ShardedNameTable,
537 name: &str,
538 observations: &[&str],
539 ) {
540 let name_id = interner.intern(name);
541 let hash = interner.get_hash(name_id);
542 if let Some(slot) = name_table.lookup(hash, name_id)
543 && let Some(Some(entity)) = entities.get_mut(slot as usize)
544 {
545 for &o in observations {
546 let oid = interner.intern(o);
547 if !entity.observations.contains(&oid) {
548 entity.observations.push(oid);
549 }
550 }
551 search.remove_entity(slot);
552 search.index_entity(
553 interner,
554 slot,
555 entity.name,
556 entity.entity_type,
557 &entity.observations,
558 );
559 }
560 }
561
562 fn replay_delete_entity(
563 interner: &mut StringInterner,
564 entities: &mut [Option<StoredEntity>],
565 rels: &mut Vec<StoredRelation>,
566 search: &mut SearchIndex,
567 name_table: &mut ShardedNameTable,
568 name: &str,
569 ) {
570 let name_id = interner.intern(name);
571 let hash = interner.get_hash(name_id);
572 if let Some(slot) = name_table.lookup(hash, name_id)
573 && let Some(Some(_)) = entities.get(slot as usize)
574 {
575 entities[slot as usize] = None;
576 search.remove_entity(slot);
577 name_table.remove(&*interner, hash, name_id);
578 }
579 rels.retain(|r| r.from != name_id && r.to != name_id);
580 }
581
582 fn replay_delete_observations(
583 interner: &mut StringInterner,
584 entities: &mut [Option<StoredEntity>],
585 search: &mut SearchIndex,
586 name_table: &mut ShardedNameTable,
587 name: &str,
588 observations: &[&str],
589 ) {
590 let name_id = interner.intern(name);
591 let hash = interner.get_hash(name_id);
592 if let Some(slot) = name_table.lookup(hash, name_id)
593 && let Some(Some(entity)) = entities.get_mut(slot as usize)
594 {
595 let remove_ids: Vec<StrId> = observations.iter().map(|o| interner.intern(o)).collect();
596 entity.observations.retain(|o| !remove_ids.contains(o));
597 search.remove_entity(slot);
598 search.index_entity(
599 interner,
600 slot,
601 entity.name,
602 entity.entity_type,
603 &entity.observations,
604 );
605 }
606 }
607
608 pub const fn interner(&self) -> &StringInterner {
613 &self.interner
614 }
615
616 pub fn get_entity(&self, name: &str) -> Option<Entity> {
618 let name_id = self.interner.get_optional(name)?;
619 let hash = self.interner.get_hash(name_id);
620 let slot = self.name_table.lookup(hash, name_id)?;
621 let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
622 if !stored.is_live() {
623 return None;
624 }
625 Some(self.entity_to_output(stored))
626 }
627
628 pub fn graph_stats(&self) -> serde_json::Value {
630 let live_entities = self
631 .entity_slots
632 .iter()
633 .filter(|s| s.as_ref().is_some_and(|e| e.is_live()))
634 .count();
635 let total_relations = self.relations.len();
636 let index_entries = self.search.len();
637 let total_obs: usize = self
638 .entity_slots
639 .iter()
640 .filter_map(|s| s.as_ref())
641 .filter(|e| e.is_live())
642 .map(|e| e.observations.len())
643 .sum();
644
645 serde_json::json!({
646 "entities": live_entities,
647 "relations": total_relations,
648 "totalObservations": total_obs,
649 "searchIndexEntries": index_entries,
650 "internedStrings": self.interner.len(),
651 "internedBytes": self.interner.total_bytes(),
652 })
653 }
654
655 pub fn search_relations(&self, from: Option<&str>, to: Option<&str>, rtype: Option<&str>) -> Vec<Relation> {
659 let from_id = match from {
660 Some(f) => match self.interner.get_optional(f) {
661 Some(id) => Some(id),
662 None => return Vec::new(),
663 },
664 None => None,
665 };
666 let to_id = match to {
667 Some(t) => match self.interner.get_optional(t) {
668 Some(id) => Some(id),
669 None => return Vec::new(),
670 },
671 None => None,
672 };
673 let rtype_id = match rtype {
674 Some(r) => match self.interner.get_optional(r) {
675 Some(id) => Some(id),
676 None => return Vec::new(),
677 },
678 None => None,
679 };
680
681 self.relations
682 .iter()
683 .filter(|r| {
684 from_id.is_none_or(|f| r.from == f)
685 && to_id.is_none_or(|t| r.to == t)
686 && rtype_id.is_none_or(|rt| r.relation_type == rt)
687 })
688 .map(|r| Relation {
689 from: self.interner.lookup(r.from).to_string(),
690 to: self.interner.lookup(r.to).to_string(),
691 relation_type: self.interner.lookup(r.relation_type).to_string(),
692 })
693 .collect()
694 }
695
696 pub fn find_path(&self, from: &str, to: &str) -> Result<Vec<String>> {
699 let from_id = self.interner.get_optional(from)
700 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
701 let to_id = self.interner.get_optional(to)
702 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
703 let hash_from = self.interner.get_hash(from_id);
704 let hash_to = self.interner.get_hash(to_id);
705
706 if self.name_table.lookup(hash_from, from_id).is_none() {
707 return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
708 }
709 if self.name_table.lookup(hash_to, to_id).is_none() {
710 return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
711 }
712 if from_id == to_id {
713 return Ok(vec![from.to_string()]);
714 }
715
716 let mut adj: AHashMap<StrId, Vec<(StrId, StrId)>> = AHashMap::new();
718 for rel in &self.relations {
719 adj.entry(rel.from).or_default().push((rel.to, rel.relation_type));
720 adj.entry(rel.to).or_default().push((rel.from, rel.relation_type));
721 }
722
723 let mut visited: AHashSet<StrId> = AHashSet::new();
725 let mut parent: AHashMap<StrId, StrId> = AHashMap::new();
726 let mut queue: VecDeque<StrId> = VecDeque::new();
727
728 visited.insert(from_id);
729 queue.push_back(from_id);
730
731 while let Some(current) = queue.pop_front() {
732 if current == to_id {
733 break;
734 }
735
736 if let Some(neighbors) = adj.get(¤t) {
737 for &(neighbor, _) in neighbors {
738 if visited.insert(neighbor) {
739 parent.insert(neighbor, current);
740 queue.push_back(neighbor);
741 }
742 }
743 }
744 }
745
746 if !parent.contains_key(&to_id) && from_id != to_id {
747 return Err(MCSError::MemoryError(format!(
748 "No path found between '{from}' and '{to}'"
749 )));
750 }
751
752 let mut path: Vec<String> = Vec::new();
754 let mut cur = to_id;
755 loop {
756 path.push(self.interner.lookup(cur).to_string());
757 if cur == from_id {
758 break;
759 }
760 cur = *parent.get(&cur).ok_or_else(|| {
761 MCSError::MemoryError("Path reconstruction failed".into())
762 })?;
763 }
764 path.reverse();
765 Ok(path)
766 }
767
768 pub fn compact(&mut self) -> Result<()> {
773 let mut create_entities: Vec<Entity> = Vec::new();
775 let mut create_relations: Vec<Relation> = Vec::new();
776
777 for slot in &self.entity_slots {
778 if let Some(stored) = slot.as_ref().filter(|e| e.is_live()) {
779 create_entities.push(self.entity_to_output(stored));
780 }
781 }
782 for rel in &self.relations {
783 create_relations.push(Relation {
784 from: self.interner.lookup(rel.from).to_string(),
785 to: self.interner.lookup(rel.to).to_string(),
786 relation_type: self.interner.lookup(rel.relation_type).to_string(),
787 });
788 }
789
790 let tmp_path = self.store.path().with_extension("tmp");
792 let mut tmp_store = BinaryStore::new(&tmp_path).map_err(MCSError::IoError)?;
793 for entity in &create_entities {
794 let mut buf = Vec::new();
795 store_enc::encode_create_entity(&mut buf, &entity.name, &entity.entity_type, &entity.observations)
796 .map_err(MCSError::IoError)?;
797 tmp_store.write_record(RecordKind::CreateEntity, &buf).map_err(MCSError::IoError)?;
798 }
799 for relation in &create_relations {
800 let mut buf = Vec::new();
801 store_enc::encode_create_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
802 .map_err(MCSError::IoError)?;
803 tmp_store.write_record(RecordKind::CreateRelation, &buf).map_err(MCSError::IoError)?;
804 }
805 tmp_store.flush_and_sync().map_err(MCSError::IoError)?;
806 drop(tmp_store);
807
808 std::fs::rename(&tmp_path, self.store.path()).map_err(MCSError::IoError)?;
810
811 let path = self.store.path().clone();
817 *self = KnowledgeGraph::new(&path).map_err(MCSError::IoError)?;
818
819 Ok(())
820 }
821
822 pub fn create_entities(&mut self, entities: &[Entity]) -> Result<Vec<Entity>> {
825 for entity in entities {
827 if entity.name.is_empty() {
828 return Err(MCSError::InvalidParams(
829 "Entity name must not be empty".into(),
830 ));
831 }
832 }
833 let mut created = Vec::new();
834 for entity in entities {
835 let existing = self.interner.get_optional(&entity.name)
837 .and_then(|id| {
838 let hash = self.interner.get_hash(id);
839 self.name_table.lookup(hash, id)
840 });
841 if existing.is_some() {
842 continue;
843 }
844 let mut buf = Vec::new();
846 store_enc::encode_create_entity(&mut buf, &entity.name, &entity.entity_type, &entity.observations)
847 .map_err(MCSError::IoError)?;
848 self.store.write_record(RecordKind::CreateEntity, &buf)
849 .map_err(MCSError::IoError)?;
850
851 let name_id = self.interner.intern(&entity.name);
852 let hash = self.interner.get_hash(name_id);
853 let type_id = self.interner.intern(&entity.entity_type);
854 let obs_ids: Vec<StrId> = entity
855 .observations
856 .iter()
857 .map(|o| self.interner.intern(o))
858 .collect();
859 let reused = self.free_slots.pop();
862 let slot = reused.unwrap_or(self.entity_slots.len() as u32);
863 self.search
864 .index_entity(&mut self.interner, slot, name_id, type_id, &obs_ids);
865 let stored = Some(StoredEntity {
866 state: ENTITY_SLOT_LIVE,
867 name: name_id,
868 entity_type: type_id,
869 observations: obs_ids,
870 });
871 match reused {
872 Some(s) => self.entity_slots[s as usize] = stored,
873 None => self.entity_slots.push(stored),
874 }
875 self.name_table.insert(&self.interner, hash, name_id, slot);
876 created.push(Entity {
877 name: entity.name.clone(),
878 entity_type: entity.entity_type.clone(),
879 observations: entity.observations.clone(),
880 });
881 }
882 Ok(created)
883 }
884
885 pub fn create_relations(&mut self, relations: &[Relation]) -> Result<Vec<Relation>> {
886 for relation in relations {
888 if relation.from.is_empty() || relation.to.is_empty() {
889 return Err(MCSError::InvalidParams(
890 "Relation endpoints must not be empty".into(),
891 ));
892 }
893 }
894 let mut created = Vec::new();
895 let mut rel_set: AHashSet<(StrId, StrId, StrId)> = AHashSet::new();
897 for rel in &self.relations {
898 rel_set.insert((rel.from, rel.to, rel.relation_type));
899 }
900 for relation in relations {
901 let from_id = self.interner.intern(&relation.from);
902 let to_id = self.interner.intern(&relation.to);
903 let type_id = self.interner.intern(&relation.relation_type);
904 if !rel_set.insert((from_id, to_id, type_id)) {
905 continue;
906 }
907 let mut buf = Vec::new();
909 store_enc::encode_create_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
910 .map_err(MCSError::IoError)?;
911 self.store.write_record(RecordKind::CreateRelation, &buf)
912 .map_err(MCSError::IoError)?;
913
914 self.relations.push(StoredRelation {
915 from: from_id,
916 to: to_id,
917 relation_type: type_id,
918 });
919 created.push(Relation {
920 from: relation.from.clone(),
921 to: relation.to.clone(),
922 relation_type: relation.relation_type.clone(),
923 });
924 }
925 Ok(created)
926 }
927
928 pub fn add_observations(&mut self, entity_name: &str, contents: &[String]) -> Result<Vec<String>> {
929 let name_id = self.interner.get_optional(entity_name)
930 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
931 let hash = self.interner.get_hash(name_id);
932 let slot = self
933 .name_table
934 .lookup(hash, name_id)
935 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
936 let stored = self
937 .entity_slots
938 .get_mut(slot as usize)
939 .and_then(|e| e.as_mut())
940 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
941
942 let existing: AHashSet<StrId> = stored.observations.iter().copied().collect();
944 let mut added = Vec::new();
945 let mut interned_added = Vec::new();
946 for content in contents {
947 let cid = self.interner.intern(content);
948 if existing.contains(&cid) {
949 continue;
950 }
951 stored.observations.push(cid);
952 interned_added.push(cid);
953 added.push(content.clone());
954 }
955 if !added.is_empty() {
956 let mut buf = Vec::new();
958 store_enc::encode_add_observations(&mut buf, entity_name, &added)
959 .map_err(MCSError::IoError)?;
960 self.store.write_record(RecordKind::AddObservations, &buf)
961 .map_err(MCSError::IoError)?;
962
963 self.search
966 .index_additional(&mut self.interner, slot, &interned_added);
967 }
968 Ok(added)
969 }
970
971 pub fn delete_entities(&mut self, entity_names: &[String]) -> Result<()> {
972 let mut deleted_names = Vec::new();
973 for name in entity_names {
974 let name_id_opt = self.interner.get_optional(name);
975 if let Some(name_id) = name_id_opt {
976 let hash = self.interner.get_hash(name_id);
977 if let Some(slot) = self.name_table.lookup(hash, name_id)
978 && let Some(Some(_)) = self.entity_slots.get(slot as usize)
979 {
980 let mut buf = Vec::new();
982 store_enc::encode_delete_entity(&mut buf, name)
983 .map_err(MCSError::IoError)?;
984 self.store.write_record(RecordKind::DeleteEntity, &buf)
985 .map_err(MCSError::IoError)?;
986
987 self.entity_slots[slot as usize] = None;
988 self.free_slots.push(slot);
989 self.search.remove_entity(slot);
990 self.name_table.remove(&self.interner, hash, name_id);
991 deleted_names.push(name.clone());
992 }
993 }
994 }
995 if !deleted_names.is_empty() {
996 let deleted_ids: AHashSet<StrId> = deleted_names.iter()
998 .map(|n| self.interner.intern(n))
999 .collect();
1000 self.relations
1001 .retain(|r| !deleted_ids.contains(&r.from) && !deleted_ids.contains(&r.to));
1002 }
1003 Ok(())
1004 }
1005
1006 pub fn delete_observations(&mut self, entity_name: &str, observations: &[String]) -> Result<()> {
1007 let name_id = self.interner.get_optional(entity_name)
1008 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1009 let hash = self.interner.get_hash(name_id);
1010 let slot = self
1011 .name_table
1012 .lookup(hash, name_id)
1013 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1014 let stored = self
1015 .entity_slots
1016 .get_mut(slot as usize)
1017 .and_then(|e| e.as_mut())
1018 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1019 let remove_ids: AHashSet<StrId> = observations.iter().map(|o| self.interner.intern(o)).collect();
1020 stored.observations.retain(|o| !remove_ids.contains(o));
1021 let mut buf = Vec::new();
1023 store_enc::encode_delete_observations(&mut buf, entity_name, observations)
1024 .map_err(MCSError::IoError)?;
1025 self.store.write_record(RecordKind::DeleteObservations, &buf)
1026 .map_err(MCSError::IoError)?;
1027
1028 self.search.remove_entity(slot);
1029 self.search
1030 .index_entity(&mut self.interner, slot, stored.name, stored.entity_type, &stored.observations);
1031 Ok(())
1032 }
1033
1034 pub fn delete_relations(&mut self, relations: &[Relation]) -> Result<()> {
1035 let rels: AHashSet<(StrId, StrId, StrId)> = relations
1037 .iter()
1038 .map(|r| {
1039 (
1040 self.interner.intern(&r.from),
1041 self.interner.intern(&r.to),
1042 self.interner.intern(&r.relation_type),
1043 )
1044 })
1045 .collect();
1046 self.relations
1047 .retain(|r| !rels.contains(&(r.from, r.to, r.relation_type)));
1048 for relation in relations {
1049 let mut buf = Vec::new();
1050 store_enc::encode_delete_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
1051 .map_err(MCSError::IoError)?;
1052 self.store.write_record(RecordKind::DeleteRelation, &buf)
1053 .map_err(MCSError::IoError)?;
1054 }
1055 Ok(())
1056 }
1057
1058 pub fn read_graph(&self) -> KnowledgeGraphOut {
1059 self.read_graph_view().to_owned_out()
1060 }
1061
1062 pub fn read_graph_view(&self) -> GraphView<'_> {
1066 let entities: Vec<&StoredEntity> = self
1067 .entity_slots
1068 .iter()
1069 .filter_map(|s| s.as_ref().filter(|e| e.is_live()))
1070 .collect();
1071 let relations: Vec<&StoredRelation> = self.relations.iter().collect();
1072 GraphView { kg: self, entities, relations }
1073 }
1074
1075 pub fn search_nodes(&self, query: &str) -> KnowledgeGraphOut {
1078 self.search_nodes_filtered(query, None, 0, usize::MAX)
1079 }
1080
1081 pub fn open_nodes(&self, names: &[String]) -> KnowledgeGraphOut {
1082 self.open_nodes_view(names).to_owned_out()
1083 }
1084
1085 pub fn open_nodes_view(&self, names: &[String]) -> GraphView<'_> {
1087 let name_ids: AHashSet<StrId> = names.iter()
1088 .filter_map(|n| self.interner.get_optional(n))
1089 .collect();
1090 let entities: Vec<&StoredEntity> = self
1091 .entity_slots
1092 .iter()
1093 .filter_map(|s| {
1094 s.as_ref()
1095 .filter(|stored| stored.is_live() && name_ids.contains(&stored.name))
1096 })
1097 .collect();
1098 let matched_names: AHashSet<StrId> = entities.iter().map(|e| e.name).collect();
1099 let relations: Vec<&StoredRelation> = self
1100 .relations
1101 .iter()
1102 .filter(|r| matched_names.contains(&r.from) || matched_names.contains(&r.to))
1103 .collect();
1104 GraphView { kg: self, entities, relations }
1105 }
1106
1107 fn entity_to_output(&self, stored: &StoredEntity) -> Entity {
1112 Entity {
1113 name: self.interner.lookup(stored.name).to_string(),
1114 entity_type: self.interner.lookup(stored.entity_type).to_string(),
1115 observations: stored
1116 .observations
1117 .iter()
1118 .map(|o| self.interner.lookup(*o).to_string())
1119 .collect(),
1120 }
1121 }
1122
1123 #[inline]
1124 fn relation_to_output(&self, r: &StoredRelation) -> Relation {
1125 Relation {
1126 from: self.interner.lookup(r.from).to_string(),
1127 to: self.interner.lookup(r.to).to_string(),
1128 relation_type: self.interner.lookup(r.relation_type).to_string(),
1129 }
1130 }
1131
1132 fn lookup_live_slot(&self, name: &str) -> Option<u32> {
1134 let name_id = self.interner.get_optional(name)?;
1135 let hash = self.interner.get_hash(name_id);
1136 let slot = self.name_table.lookup(hash, name_id)?;
1137 let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
1138 stored.is_live().then_some(slot)
1139 }
1140
1141 fn entity_by_name_id(&self, name_id: StrId) -> Option<Entity> {
1143 let hash = self.interner.get_hash(name_id);
1144 let slot = self.name_table.lookup(hash, name_id)?;
1145 let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
1146 stored.is_live().then(|| self.entity_to_output(stored))
1147 }
1148
1149 pub fn entity_type_counts(&self) -> Vec<(String, usize)> {
1153 let mut counts: AHashMap<StrId, usize> = AHashMap::new();
1154 for st in self
1155 .entity_slots
1156 .iter()
1157 .filter_map(|s| s.as_ref())
1158 .filter(|e| e.is_live())
1159 {
1160 *counts.entry(st.entity_type).or_insert(0) += 1;
1161 }
1162 self.rank_counts(counts)
1163 }
1164
1165 pub fn relation_type_counts(&self) -> Vec<(String, usize)> {
1167 let mut counts: AHashMap<StrId, usize> = AHashMap::new();
1168 for r in &self.relations {
1169 *counts.entry(r.relation_type).or_insert(0) += 1;
1170 }
1171 self.rank_counts(counts)
1172 }
1173
1174 fn rank_counts(&self, counts: AHashMap<StrId, usize>) -> Vec<(String, usize)> {
1175 let mut out: Vec<(String, usize)> = counts
1176 .into_iter()
1177 .map(|(id, c)| (self.interner.lookup(id).to_string(), c))
1178 .collect();
1179 out.sort_unstable_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
1180 out
1181 }
1182
1183 pub fn search_nodes_filtered(
1187 &self,
1188 query: &str,
1189 entity_type: Option<&str>,
1190 offset: usize,
1191 limit: usize,
1192 ) -> KnowledgeGraphOut {
1193 self.search_nodes_view(query, entity_type, offset, limit).to_owned_out()
1194 }
1195
1196 pub fn search_nodes_view(
1198 &self,
1199 query: &str,
1200 entity_type: Option<&str>,
1201 offset: usize,
1202 limit: usize,
1203 ) -> GraphView<'_> {
1204 let type_id = match entity_type {
1205 Some(t) => match self.interner.get_optional(t) {
1206 Some(id) => Some(id),
1207 None => return GraphView { kg: self, entities: Vec::new(), relations: Vec::new() },
1208 },
1209 None => None,
1210 };
1211
1212 let ranked = self.search.search_ranked(query, &self.interner);
1213 let mut selected: AHashSet<StrId> = AHashSet::new();
1214 let mut entities: Vec<&StoredEntity> = Vec::new();
1215 let mut skipped = 0usize;
1216 for (slot, _score) in ranked {
1217 let Some(st) = self
1218 .entity_slots
1219 .get(slot as usize)
1220 .and_then(|s| s.as_ref())
1221 .filter(|e| e.is_live())
1222 else {
1223 continue;
1224 };
1225 if type_id.is_some_and(|tid| st.entity_type != tid) {
1226 continue;
1227 }
1228 if skipped < offset {
1229 skipped += 1;
1230 continue;
1231 }
1232 if entities.len() >= limit {
1233 break;
1234 }
1235 selected.insert(st.name);
1236 entities.push(st);
1237 }
1238
1239 let relations: Vec<&StoredRelation> = self
1240 .relations
1241 .iter()
1242 .filter(|r| selected.contains(&r.from) || selected.contains(&r.to))
1243 .collect();
1244 GraphView { kg: self, entities, relations }
1245 }
1246
1247 pub fn read_graph_filtered(
1251 &self,
1252 entity_type: Option<&str>,
1253 offset: usize,
1254 limit: usize,
1255 ) -> KnowledgeGraphOut {
1256 self.read_graph_filtered_view(entity_type, offset, limit).to_owned_out()
1257 }
1258
1259 pub fn read_graph_filtered_view(
1261 &self,
1262 entity_type: Option<&str>,
1263 offset: usize,
1264 limit: usize,
1265 ) -> GraphView<'_> {
1266 let type_id = match entity_type {
1267 Some(t) => match self.interner.get_optional(t) {
1268 Some(id) => Some(id),
1269 None => return GraphView { kg: self, entities: Vec::new(), relations: Vec::new() },
1270 },
1271 None => None,
1272 };
1273
1274 let mut selected: AHashSet<StrId> = AHashSet::new();
1275 let mut entities: Vec<&StoredEntity> = Vec::new();
1276 let mut skipped = 0usize;
1277 for st in self
1278 .entity_slots
1279 .iter()
1280 .filter_map(|s| s.as_ref())
1281 .filter(|e| e.is_live())
1282 {
1283 if type_id.is_some_and(|tid| st.entity_type != tid) {
1284 continue;
1285 }
1286 if skipped < offset {
1287 skipped += 1;
1288 continue;
1289 }
1290 if entities.len() >= limit {
1291 break;
1292 }
1293 selected.insert(st.name);
1294 entities.push(st);
1295 }
1296
1297 let relations: Vec<&StoredRelation> = self
1298 .relations
1299 .iter()
1300 .filter(|r| selected.contains(&r.from) && selected.contains(&r.to))
1301 .collect();
1302 GraphView { kg: self, entities, relations }
1303 }
1304
1305 pub fn neighbors(
1313 &self,
1314 name: &str,
1315 direction: Direction,
1316 rtype: Option<&str>,
1317 depth: u32,
1318 ) -> Result<KnowledgeGraphOut> {
1319 self.lookup_live_slot(name)
1320 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
1321 let start = self.interner.get_optional(name).unwrap();
1323
1324 let rtype_id = match rtype {
1326 Some(r) => match self.interner.get_optional(r) {
1327 Some(id) => Some(id),
1328 None => {
1329 let entities = self.entity_by_name_id(start).into_iter().collect();
1330 return Ok(KnowledgeGraphOut { entities, relations: Vec::new() });
1331 }
1332 },
1333 None => None,
1334 };
1335
1336 let mut visited: AHashSet<StrId> = AHashSet::new();
1337 visited.insert(start);
1338
1339 let type_ok = |r: &StoredRelation| rtype_id.is_none_or(|rt| r.relation_type == rt);
1340
1341 if depth == 1 {
1342 for r in self.relations.iter().filter(|r| type_ok(r)) {
1343 match direction {
1344 Direction::Out => {
1345 if r.from == start {
1346 visited.insert(r.to);
1347 }
1348 }
1349 Direction::In => {
1350 if r.to == start {
1351 visited.insert(r.from);
1352 }
1353 }
1354 Direction::Both => {
1355 if r.from == start {
1356 visited.insert(r.to);
1357 } else if r.to == start {
1358 visited.insert(r.from);
1359 }
1360 }
1361 }
1362 }
1363 } else if depth >= 2 {
1364 let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
1366 for r in self.relations.iter().filter(|r| type_ok(r)) {
1367 match direction {
1368 Direction::Out => adj.entry(r.from).or_default().push(r.to),
1369 Direction::In => adj.entry(r.to).or_default().push(r.from),
1370 Direction::Both => {
1371 adj.entry(r.from).or_default().push(r.to);
1372 adj.entry(r.to).or_default().push(r.from);
1373 }
1374 }
1375 }
1376 let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
1377 queue.push_back((start, 0));
1378 while let Some((node, d)) = queue.pop_front() {
1379 if d >= depth {
1380 continue;
1381 }
1382 if let Some(nbrs) = adj.get(&node) {
1383 for &nb in nbrs {
1384 if visited.insert(nb) {
1385 queue.push_back((nb, d + 1));
1386 }
1387 }
1388 }
1389 }
1390 }
1391
1392 let mut entities = Vec::with_capacity(visited.len());
1393 for &nid in &visited {
1394 if let Some(e) = self.entity_by_name_id(nid) {
1395 entities.push(e);
1396 }
1397 }
1398 let relations = self
1399 .relations
1400 .iter()
1401 .filter(|r| type_ok(r) && visited.contains(&r.from) && visited.contains(&r.to))
1402 .map(|r| self.relation_to_output(r))
1403 .collect();
1404 Ok(KnowledgeGraphOut { entities, relations })
1405 }
1406
1407 pub fn describe_entity(&self, name: &str) -> Result<serde_json::Value> {
1411 let name_id = self
1412 .interner
1413 .get_optional(name)
1414 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
1415 let entity = self
1416 .entity_by_name_id(name_id)
1417 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
1418
1419 let mut incident: Vec<Relation> = Vec::new();
1420 let mut neighbor_seen: AHashSet<StrId> = AHashSet::new();
1421 let mut neighbors: Vec<&str> = Vec::new();
1422 for r in &self.relations {
1423 if r.from == name_id || r.to == name_id {
1424 incident.push(self.relation_to_output(r));
1425 let other = if r.from == name_id { r.to } else { r.from };
1426 if other != name_id && neighbor_seen.insert(other) {
1427 neighbors.push(self.interner.lookup(other));
1428 }
1429 }
1430 }
1431
1432 Ok(serde_json::json!({
1433 "entity": entity,
1434 "relations": incident,
1435 "neighbors": neighbors,
1436 "degree": incident.len(),
1437 }))
1438 }
1439
1440 pub fn upsert_entities(&mut self, entities: &[Entity]) -> Result<Vec<serde_json::Value>> {
1445 for e in entities {
1446 if e.name.is_empty() {
1447 return Err(MCSError::InvalidParams(
1448 "Entity name must not be empty".into(),
1449 ));
1450 }
1451 }
1452 let mut out = Vec::with_capacity(entities.len());
1453 for e in entities {
1454 if self.lookup_live_slot(&e.name).is_some() {
1455 let added = self.add_observations(&e.name, &e.observations)?;
1456 out.push(serde_json::json!({
1457 "name": e.name,
1458 "created": false,
1459 "addedObservations": added,
1460 }));
1461 } else {
1462 let created = self.create_entities(std::slice::from_ref(e))?;
1463 out.push(serde_json::json!({
1464 "name": e.name,
1465 "created": !created.is_empty(),
1466 "addedObservations": e.observations,
1467 }));
1468 }
1469 }
1470 Ok(out)
1471 }
1472
1473 pub fn export(&self, format: &str) -> Result<String> {
1475 match format {
1476 "json" => serde_json::to_string(&self.read_graph()).map_err(MCSError::JsonError),
1477 "mermaid" => Ok(self.export_mermaid()),
1478 "dot" => Ok(self.export_dot()),
1479 other => Err(MCSError::InvalidParams(format!(
1480 "Unknown export format '{other}' (expected json|mermaid|dot)"
1481 ))),
1482 }
1483 }
1484
1485 fn diagram_node_ids(&self) -> (AHashMap<StrId, usize>, Vec<(usize, StrId)>) {
1487 let mut ids: AHashMap<StrId, usize> = AHashMap::new();
1488 let mut order: Vec<(usize, StrId)> = Vec::new();
1489 for st in self
1490 .entity_slots
1491 .iter()
1492 .filter_map(|s| s.as_ref())
1493 .filter(|e| e.is_live())
1494 {
1495 let n = ids.len();
1496 ids.insert(st.name, n);
1497 order.push((n, st.name));
1498 }
1499 (ids, order)
1500 }
1501
1502 fn export_mermaid(&self) -> String {
1503 let (ids, order) = self.diagram_node_ids();
1504 let mut s = String::with_capacity(64 + order.len() * 32 + self.relations.len() * 32);
1505 s.push_str("graph LR\n");
1506 for (n, name_id) in &order {
1507 let label = sanitize_label(self.interner.lookup(*name_id));
1508 s.push_str(&format!(" n{n}[\"{label}\"]\n"));
1509 }
1510 for r in &self.relations {
1511 if let (Some(&a), Some(&b)) = (ids.get(&r.from), ids.get(&r.to)) {
1512 let rel = sanitize_label(self.interner.lookup(r.relation_type));
1513 s.push_str(&format!(" n{a} -->|{rel}| n{b}\n"));
1514 }
1515 }
1516 s
1517 }
1518
1519 fn export_dot(&self) -> String {
1520 let (ids, order) = self.diagram_node_ids();
1521 let mut s = String::with_capacity(64 + order.len() * 32 + self.relations.len() * 32);
1522 s.push_str("digraph G {\n");
1523 for (n, name_id) in &order {
1524 let label = sanitize_label(self.interner.lookup(*name_id));
1525 s.push_str(&format!(" n{n} [label=\"{label}\"];\n"));
1526 }
1527 for r in &self.relations {
1528 if let (Some(&a), Some(&b)) = (ids.get(&r.from), ids.get(&r.to)) {
1529 let rel = sanitize_label(self.interner.lookup(r.relation_type));
1530 s.push_str(&format!(" n{a} -> n{b} [label=\"{rel}\"];\n"));
1531 }
1532 }
1533 s.push_str("}\n");
1534 s
1535 }
1536
1537 pub fn merge_entities(&mut self, source: &str, target: &str) -> Result<serde_json::Value> {
1545 if source == target {
1546 return Err(MCSError::InvalidParams(
1547 "Source and target must be different entities".into(),
1548 ));
1549 }
1550 self.lookup_live_slot(source).ok_or_else(|| {
1551 MCSError::InvalidParams(format!("Source entity '{source}' not found"))
1552 })?;
1553 self.lookup_live_slot(target).ok_or_else(|| {
1554 MCSError::InvalidParams(format!("Target entity '{target}' not found"))
1555 })?;
1556
1557 let source_entity = self.get_entity(source).unwrap();
1558 let moved_obs_count = source_entity.observations.len();
1559
1560 let added_count = if !source_entity.observations.is_empty() {
1562 self.add_observations(target, &source_entity.observations)?.len()
1563 } else {
1564 0
1565 };
1566
1567 let source_id = self.interner.get_optional(source).unwrap();
1569 let source_rels: Vec<Relation> = self
1570 .relations
1571 .iter()
1572 .filter(|r| r.from == source_id || r.to == source_id)
1573 .filter_map(|r| {
1574 let new_from = if r.from == source_id {
1575 target
1576 } else {
1577 self.interner.lookup(r.from)
1578 };
1579 let new_to = if r.to == source_id {
1580 target
1581 } else {
1582 self.interner.lookup(r.to)
1583 };
1584 if new_from == new_to {
1586 None
1587 } else {
1588 Some(Relation {
1589 from: new_from.to_string(),
1590 to: new_to.to_string(),
1591 relation_type: self.interner.lookup(r.relation_type).to_string(),
1592 })
1593 }
1594 })
1595 .collect();
1596
1597 let redirected = self.create_relations(&source_rels)?.len() as u32;
1598
1599 self.delete_entities(&[source.to_string()])?;
1601
1602 Ok(serde_json::json!({
1603 "source": source,
1604 "target": target,
1605 "movedObservations": moved_obs_count,
1606 "addedObservations": added_count,
1607 "redirectedRelations": redirected,
1608 }))
1609 }
1610
1611 pub fn extract_subgraph(&self, names: &[String], depth: u32) -> Result<KnowledgeGraphOut> {
1615 if names.is_empty() {
1616 return Ok(KnowledgeGraphOut {
1617 entities: Vec::new(),
1618 relations: Vec::new(),
1619 });
1620 }
1621 let mut visited: AHashSet<StrId> = AHashSet::new();
1623 let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
1624 for name in names {
1625 if let Some(id) = self.interner.get_optional(name) {
1626 if visited.insert(id) {
1627 queue.push_back((id, 0));
1628 }
1629 }
1630 }
1631 let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
1633 for r in &self.relations {
1634 adj.entry(r.from).or_default().push(r.to);
1635 adj.entry(r.to).or_default().push(r.from);
1636 }
1637 while let Some((node, d)) = queue.pop_front() {
1638 if d >= depth {
1639 continue;
1640 }
1641 if let Some(nbrs) = adj.get(&node) {
1642 for &nb in nbrs {
1643 if visited.insert(nb) {
1644 queue.push_back((nb, d + 1));
1645 }
1646 }
1647 }
1648 }
1649 let mut entities: Vec<Entity> = Vec::with_capacity(visited.len());
1650 for &nid in &visited {
1651 if let Some(e) = self.entity_by_name_id(nid) {
1652 entities.push(e);
1653 }
1654 }
1655 let relations: Vec<Relation> = self
1656 .relations
1657 .iter()
1658 .filter(|r| visited.contains(&r.from) && visited.contains(&r.to))
1659 .map(|r| self.relation_to_output(r))
1660 .collect();
1661 Ok(KnowledgeGraphOut { entities, relations })
1662 }
1663
1664 pub fn batch_get_entities(&self, names: &[String]) -> Vec<Option<Entity>> {
1666 names.iter().map(|n| self.get_entity(n)).collect()
1667 }
1668
1669 fn dfs_all_paths(
1672 adj: &AHashMap<StrId, Vec<StrId>>,
1673 current: StrId,
1674 target: StrId,
1675 max_depth: usize,
1676 max_paths: usize,
1677 visited: &mut AHashSet<StrId>,
1678 current_path: &mut Vec<StrId>,
1679 all_paths: &mut Vec<Vec<StrId>>,
1680 ) {
1681 if all_paths.len() >= max_paths {
1682 return;
1683 }
1684 if current == target && current_path.len() > 1 {
1685 all_paths.push(current_path.clone());
1686 return;
1687 }
1688 if current_path.len() > max_depth {
1689 return;
1690 }
1691 if let Some(neighbors) = adj.get(¤t) {
1692 for &nb in neighbors {
1693 if visited.insert(nb) {
1694 current_path.push(nb);
1695 Self::dfs_all_paths(
1696 adj, nb, target, max_depth, max_paths, visited, current_path, all_paths,
1697 );
1698 current_path.pop();
1699 visited.remove(&nb);
1700 }
1701 }
1702 }
1703 }
1704
1705 pub fn find_all_paths(
1709 &self,
1710 from: &str,
1711 to: &str,
1712 max_depth: usize,
1713 max_paths: usize,
1714 ) -> Result<Vec<Vec<String>>> {
1715 let from_id = self
1716 .interner
1717 .get_optional(from)
1718 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
1719 let to_id = self
1720 .interner
1721 .get_optional(to)
1722 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
1723 if self.lookup_live_slot(from).is_none() {
1725 return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
1726 }
1727 if self.lookup_live_slot(to).is_none() {
1728 return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
1729 }
1730 if from_id == to_id {
1731 return Ok(vec![vec![from.to_string()]]);
1732 }
1733 let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
1735 for r in &self.relations {
1736 adj.entry(r.from).or_default().push(r.to);
1737 adj.entry(r.to).or_default().push(r.from);
1738 }
1739 let mut all_paths: Vec<Vec<StrId>> = Vec::new();
1740 let mut current_path = Vec::new();
1741 let mut visited: AHashSet<StrId> = AHashSet::new();
1742 visited.insert(from_id);
1743 current_path.push(from_id);
1744 Self::dfs_all_paths(
1745 &adj,
1746 from_id,
1747 to_id,
1748 max_depth,
1749 max_paths,
1750 &mut visited,
1751 &mut current_path,
1752 &mut all_paths,
1753 );
1754 if all_paths.is_empty() {
1755 return Err(MCSError::MemoryError(format!(
1756 "No path found between '{from}' and '{to}'"
1757 )));
1758 }
1759 let result: Vec<Vec<String>> = all_paths
1760 .into_iter()
1761 .map(|path| {
1762 path.into_iter()
1763 .map(|id| self.interner.lookup(id).to_string())
1764 .collect()
1765 })
1766 .collect();
1767 Ok(result)
1768 }
1769
1770 pub fn flush_and_sync(&mut self) -> Result<()> {
1774 self.store.flush_and_sync().map_err(MCSError::IoError)
1775 }
1776}