1use std::collections::VecDeque;
2
3use ahash::{AHashMap, AHashSet};
4use std::path::Path;
5
6use serde::ser::{Serialize, SerializeSeq, SerializeStruct, Serializer};
7
8use crate::errors::{MCSError, Result};
9use crate::intern::{StrId, StringInterner};
10use crate::types::{Entity, Relation, KnowledgeGraphOut};
11use crate::search::SearchIndex;
12use crate::store::{self as store_enc, BinaryStore, RecordKind};
13
14const ENTITY_SLOT_LIVE: u8 = 1;
15const NAME_TABLE_SHARDS: usize = 4;
16
17#[cfg(target_arch = "x86_64")]
22#[inline(always)]
23unsafe fn prefetch_addr(addr: *const u8) {
24 std::arch::x86_64::_mm_prefetch::<3>(addr);
26}
27
28#[cfg(not(target_arch = "x86_64"))]
29#[inline(always)]
30const unsafe fn prefetch_addr(_addr: *const u8) {}
31
32fn sync_parent_dir(path: &Path) -> std::io::Result<()> {
35 let dir = path.parent().filter(|p| !p.as_os_str().is_empty());
36 let dir = match dir {
37 Some(d) => d,
38 None => Path::new("."),
39 };
40 match std::fs::File::open(dir) {
41 Ok(f) => match f.sync_all() {
42 Ok(()) => Ok(()),
43 Err(e) if e.kind() == std::io::ErrorKind::InvalidInput => Ok(()),
45 Err(e) => Err(e),
46 },
47 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
48 Err(e) => Err(e),
49 }
50}
51
52#[cfg_attr(feature = "cache_align", repr(align(64)))]
61struct StoredEntity {
62 state: u8,
63 name: StrId,
64 entity_type: StrId,
65 observations: Vec<StrId>,
66}
67
68impl StoredEntity {
69 const fn is_live(&self) -> bool {
70 self.state == ENTITY_SLOT_LIVE
71 }
72}
73
74#[cfg_attr(feature = "cache_align", repr(align(16)))]
78struct StoredRelation {
79 from: StrId,
80 to: StrId,
81 relation_type: StrId,
82}
83
84pub struct GraphView<'a> {
98 kg: &'a KnowledgeGraph,
99 entities: Vec<&'a StoredEntity>,
100 relations: Vec<&'a StoredRelation>,
101}
102
103impl GraphView<'_> {
104 pub fn to_owned_out(&self) -> KnowledgeGraphOut {
108 KnowledgeGraphOut {
109 entities: self.entities.iter().map(|e| self.kg.entity_to_output(e)).collect(),
110 relations: self.relations.iter().map(|r| self.kg.relation_to_output(r)).collect(),
111 }
112 }
113}
114
115impl Serialize for GraphView<'_> {
116 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
117 let mut st = s.serialize_struct("KnowledgeGraphOut", 2)?;
118 st.serialize_field("entities", &EntityListRef { kg: self.kg, items: &self.entities })?;
119 st.serialize_field("relations", &RelationListRef { kg: self.kg, items: &self.relations })?;
120 st.end()
121 }
122}
123
124struct EntityListRef<'a> { kg: &'a KnowledgeGraph, items: &'a [&'a StoredEntity] }
125impl Serialize for EntityListRef<'_> {
126 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
127 let mut seq = s.serialize_seq(Some(self.items.len()))?;
128 for &e in self.items {
129 seq.serialize_element(&EntityRef { kg: self.kg, e })?;
130 }
131 seq.end()
132 }
133}
134
135struct RelationListRef<'a> { kg: &'a KnowledgeGraph, items: &'a [&'a StoredRelation] }
136impl Serialize for RelationListRef<'_> {
137 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
138 let mut seq = s.serialize_seq(Some(self.items.len()))?;
139 for &r in self.items {
140 seq.serialize_element(&RelationRef { kg: self.kg, r })?;
141 }
142 seq.end()
143 }
144}
145
146struct EntityRef<'a> { kg: &'a KnowledgeGraph, e: &'a StoredEntity }
147impl Serialize for EntityRef<'_> {
148 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
149 let mut st = s.serialize_struct("Entity", 3)?;
150 st.serialize_field("name", self.kg.interner.lookup(self.e.name))?;
151 st.serialize_field("entityType", self.kg.interner.lookup(self.e.entity_type))?;
152 st.serialize_field("observations", &ObsRef { kg: self.kg, obs: &self.e.observations })?;
153 st.end()
154 }
155}
156
157struct ObsRef<'a> { kg: &'a KnowledgeGraph, obs: &'a [StrId] }
158impl Serialize for ObsRef<'_> {
159 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
160 let mut seq = s.serialize_seq(Some(self.obs.len()))?;
161 for &o in self.obs {
162 seq.serialize_element(self.kg.interner.lookup(o))?;
163 }
164 seq.end()
165 }
166}
167
168struct RelationRef<'a> { kg: &'a KnowledgeGraph, r: &'a StoredRelation }
169impl Serialize for RelationRef<'_> {
170 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
171 let mut st = s.serialize_struct("Relation", 3)?;
172 st.serialize_field("from", self.kg.interner.lookup(self.r.from))?;
173 st.serialize_field("to", self.kg.interner.lookup(self.r.to))?;
174 st.serialize_field("relationType", self.kg.interner.lookup(self.r.relation_type))?;
175 st.end()
176 }
177}
178
179#[derive(Clone, Copy, PartialEq, Eq, Debug)]
181pub enum Direction {
182 Out,
184 In,
186 Both,
188}
189
190impl Direction {
191 pub fn parse(s: Option<&str>) -> Self {
193 match s {
194 Some("out") => Direction::Out,
195 Some("in") => Direction::In,
196 _ => Direction::Both,
197 }
198 }
199}
200
201fn sanitize_label(s: &str) -> String {
203 let mut out = String::with_capacity(s.len());
204 for c in s.chars() {
205 match c {
206 '"' => out.push('\''),
207 '\n' | '\r' => out.push(' '),
208 _ => out.push(c),
209 }
210 }
211 out
212}
213
214const EMPTY_SLOT: u8 = 0xFF;
224
225#[inline(always)]
226const fn h2(hash: u64) -> u8 {
227 (hash & 0x7F) as u8
228}
229
230#[inline(always)]
231const fn h1(hash: u64, mask: usize) -> usize {
232 ((hash >> 7) as usize) & mask
233}
234
235struct NameTableShard {
236 ctrl: Vec<u8>, names: Vec<StrId>,
238 slots: Vec<u32>,
239 mask: usize,
240 count: usize,
241}
242
243impl NameTableShard {
244 fn new(capacity: usize) -> Self {
245 let cap = capacity.next_power_of_two().max(16);
246 Self {
247 ctrl: vec![EMPTY_SLOT; cap],
248 names: vec![StrId::EMPTY; cap],
249 slots: vec![u32::MAX; cap],
250 mask: cap - 1,
251 count: 0,
252 }
253 }
254
255 #[inline(always)]
256 fn lookup(&self, hash: u64, name: StrId) -> Option<u32> {
257 let stamp = h2(hash);
258 let mask = self.mask;
259 let mut idx = h1(hash, mask);
260 let ctrl = self.ctrl.as_ptr();
261 let names = self.names.as_ptr();
262 let slots = self.slots.as_ptr();
263 let len = self.ctrl.len();
264
265 for _ in 0..len {
266 let prefetch_idx = idx.wrapping_add(4) & mask;
268 unsafe { prefetch_addr(ctrl.add(prefetch_idx)) };
269
270 unsafe {
272 let c = *ctrl.add(idx);
273 if c & 0x80 != 0 {
275 return None;
276 }
277 if c == stamp && *names.add(idx) == name {
279 return Some(*slots.add(idx));
280 }
281 }
282 idx = (idx + 1) & mask;
283 }
284 None
285 }
286
287 fn insert(&mut self, interner: &StringInterner, hash: u64, name: StrId, slot: u32) {
288 if self.count * 4 > self.ctrl.len() * 3 {
289 self.grow(interner);
290 }
291 let stamp = h2(hash);
292 let mask = self.mask;
293 let mut idx = h1(hash, mask);
294 loop {
295 unsafe {
297 if *self.ctrl.get_unchecked(idx) & 0x80 != 0 {
298 *self.ctrl.get_unchecked_mut(idx) = stamp;
299 *self.names.get_unchecked_mut(idx) = name;
300 *self.slots.get_unchecked_mut(idx) = slot;
301 self.count += 1;
302 return;
303 }
304 }
305 idx = (idx + 1) & mask;
306 }
307 }
308
309 fn remove(&mut self, interner: &StringInterner, hash: u64, name: StrId) {
310 let stamp = h2(hash);
311 let mask = self.mask;
312 let mut idx = h1(hash, mask);
313 let len = self.ctrl.len();
314 for _ in 0..len {
315 if self.ctrl[idx] & 0x80 != 0 {
316 return;
317 }
318 if self.ctrl[idx] == stamp && self.names[idx] == name {
319 self.ctrl[idx] = EMPTY_SLOT;
321 self.names[idx] = StrId::EMPTY;
322 self.slots[idx] = u32::MAX;
323 self.count -= 1;
324
325 let mut next = (idx + 1) & mask;
326 while self.ctrl[next] & 0x80 == 0 {
327 let nn = self.names[next];
328 let ns = self.slots[next];
329 let nh = interner.get_hash(nn);
332 self.ctrl[next] = EMPTY_SLOT;
333 self.names[next] = StrId::EMPTY;
334 self.slots[next] = u32::MAX;
335 self.count -= 1;
336
337 let nstamp = h2(nh);
339 let mut re_idx = h1(nh, mask);
340 while self.ctrl[re_idx] & 0x80 == 0 {
341 re_idx = (re_idx + 1) & mask;
342 }
343 self.ctrl[re_idx] = nstamp;
344 self.names[re_idx] = nn;
345 self.slots[re_idx] = ns;
346 self.count += 1;
347
348 next = (next + 1) & mask;
349 }
350 return;
351 }
352 idx = (idx + 1) & mask;
353 }
354 }
355
356 fn grow(&mut self, interner: &StringInterner) {
357 let new_cap = self.ctrl.len() * 2;
358 let new_mask = new_cap - 1;
359 let mut new_ctrl = vec![EMPTY_SLOT; new_cap];
360 let mut new_names = vec![StrId::EMPTY; new_cap];
361 let mut new_slots = vec![u32::MAX; new_cap];
362
363 for i in 0..self.ctrl.len() {
364 if self.ctrl[i] & 0x80 == 0 {
365 let name = self.names[i];
367 let hash = interner.get_hash(name);
368 let stamp = h2(hash);
369 let mut idx = h1(hash, new_mask);
370 while new_ctrl[idx] & 0x80 == 0 {
371 idx = (idx + 1) & new_mask;
372 }
373 new_ctrl[idx] = stamp;
374 new_names[idx] = name;
375 new_slots[idx] = self.slots[i];
376 }
377 }
378
379 self.ctrl = new_ctrl;
380 self.names = new_names;
381 self.slots = new_slots;
382 self.mask = new_mask;
383 }
384}
385
386struct ShardedNameTable {
387 shards: [NameTableShard; NAME_TABLE_SHARDS],
388}
389
390impl ShardedNameTable {
391 fn new(capacity_per_shard: usize) -> Self {
392 Self {
393 shards: [
394 NameTableShard::new(capacity_per_shard),
395 NameTableShard::new(capacity_per_shard),
396 NameTableShard::new(capacity_per_shard),
397 NameTableShard::new(capacity_per_shard),
398 ],
399 }
400 }
401
402 #[inline(always)]
403 const fn shard(hash: u64) -> usize {
404 (hash as usize) & (NAME_TABLE_SHARDS - 1)
405 }
406
407 #[inline(always)]
408 fn lookup(&self, hash: u64, name: StrId) -> Option<u32> {
409 self.shards[Self::shard(hash)].lookup(hash, name)
410 }
411
412 #[inline(always)]
413 fn insert(&mut self, interner: &StringInterner, hash: u64, name: StrId, slot: u32) {
414 self.shards[Self::shard(hash)].insert(interner, hash, name, slot);
415 }
416
417 #[inline(always)]
418 fn remove(&mut self, interner: &StringInterner, hash: u64, name: StrId) {
419 self.shards[Self::shard(hash)].remove(interner, hash, name);
420 }
421}
422
423pub struct KnowledgeGraph {
427 interner: StringInterner,
428 entity_slots: Vec<Option<StoredEntity>>,
429 free_slots: Vec<u32>,
432 name_table: ShardedNameTable,
433 relations: Vec<StoredRelation>,
434 search: SearchIndex,
435 store: BinaryStore,
436}
437
438impl KnowledgeGraph {
439 pub fn new(path: &Path) -> std::io::Result<Self> {
440 let store = BinaryStore::new(path)?;
441
442 let mut interner = StringInterner::with_capacity(65536, 1024);
444 let mut entity_slots: Vec<Option<StoredEntity>> = Vec::with_capacity(256);
445 let mut name_table = ShardedNameTable::new(64);
446 let mut relations: Vec<StoredRelation> = Vec::with_capacity(64);
447 let mut search = SearchIndex::new();
448
449 let mut pending: Option<Vec<(RecordKind, Vec<u8>)>> = None;
454 store.replay(|kind, data| {
455 match kind {
456 RecordKind::TxnBegin => pending = Some(Vec::new()),
457 RecordKind::TxnCommit => {
458 if let Some(buffered) = pending.take() {
459 for (k, d) in &buffered {
460 Self::apply_record(
461 *k, d, &mut interner, &mut entity_slots, &mut search,
462 &mut name_table, &mut relations,
463 );
464 }
465 }
466 }
467 other => match pending.as_mut() {
468 Some(buffered) => buffered.push((other, data.to_vec())),
469 None => Self::apply_record(
470 other, data, &mut interner, &mut entity_slots, &mut search,
471 &mut name_table, &mut relations,
472 ),
473 },
474 }
475 })?;
476
477 let free_slots: Vec<u32> = entity_slots
479 .iter()
480 .enumerate()
481 .filter(|(_, s)| s.is_none())
482 .map(|(i, _)| i as u32)
483 .collect();
484
485 Ok(Self {
486 interner,
487 entity_slots,
488 free_slots,
489 name_table,
490 relations,
491 search,
492 store,
493 })
494 }
495
496 #[allow(clippy::too_many_arguments)]
504 fn apply_record(
505 kind: RecordKind,
506 data: &[u8],
507 interner: &mut StringInterner,
508 entity_slots: &mut Vec<Option<StoredEntity>>,
509 search: &mut SearchIndex,
510 name_table: &mut ShardedNameTable,
511 relations: &mut Vec<StoredRelation>,
512 ) {
513 match kind {
514 RecordKind::CreateEntity => {
515 if let Some((name, etype, obs)) = store_enc::decode_create_entity(data) {
516 Self::replay_create_entity(
517 interner, entity_slots, search, name_table, name, etype, &obs,
518 );
519 }
520 }
521 RecordKind::CreateRelation => {
522 if let Some((from, to, rtype)) = store_enc::decode_create_relation(data) {
523 let from_id = interner.intern(from);
524 let to_id = interner.intern(to);
525 let type_id = interner.intern(rtype);
526 relations.push(StoredRelation {
527 from: from_id,
528 to: to_id,
529 relation_type: type_id,
530 });
531 }
532 }
533 RecordKind::AddObservations => {
534 if let Some((name, obs)) = store_enc::decode_add_observations(data) {
535 Self::replay_add_observations(
536 interner, entity_slots, search, name_table, name, &obs,
537 );
538 }
539 }
540 RecordKind::DeleteEntity => {
541 if let Some(name) = store_enc::decode_delete_entity(data) {
542 Self::replay_delete_entity(
543 interner, entity_slots, relations, search, name_table, name,
544 );
545 }
546 }
547 RecordKind::DeleteObservations => {
548 if let Some((name, obs)) = store_enc::decode_delete_observations(data) {
549 Self::replay_delete_observations(
550 interner, entity_slots, search, name_table, name, &obs,
551 );
552 }
553 }
554 RecordKind::DeleteRelation => {
555 if let Some((from, to, rtype)) = store_enc::decode_delete_relation(data) {
556 let from_id = interner.intern(from);
557 let to_id = interner.intern(to);
558 let type_id = interner.intern(rtype);
559 relations.retain(|r| {
560 !(r.from == from_id && r.to == to_id && r.relation_type == type_id)
561 });
562 }
563 }
564 RecordKind::TxnBegin | RecordKind::TxnCommit => {}
565 }
566 }
567
568 #[allow(clippy::ptr_arg)]
569 fn replay_create_entity(
570 interner: &mut StringInterner,
571 entities: &mut Vec<Option<StoredEntity>>,
572 search: &mut SearchIndex,
573 name_table: &mut ShardedNameTable,
574 name: &str,
575 etype: &str,
576 observations: &[&str],
577 ) {
578 let name_id = interner.intern(name);
579 let type_id = interner.intern(etype);
580 let obs_ids: Vec<StrId> = observations.iter().map(|o| interner.intern(o)).collect();
581 let slot = entities.len() as u32;
582 entities.push(Some(StoredEntity {
583 state: ENTITY_SLOT_LIVE,
584 name: name_id,
585 entity_type: type_id,
586 observations: obs_ids.clone(),
587 }));
588 let hash = interner.get_hash(name_id);
589 name_table.insert(&*interner, hash, name_id, slot);
590 search.index_entity(interner, slot, name_id, type_id, &obs_ids);
591 }
592
593 fn replay_add_observations(
594 interner: &mut StringInterner,
595 entities: &mut [Option<StoredEntity>],
596 search: &mut SearchIndex,
597 name_table: &mut ShardedNameTable,
598 name: &str,
599 observations: &[&str],
600 ) {
601 let name_id = interner.intern(name);
602 let hash = interner.get_hash(name_id);
603 if let Some(slot) = name_table.lookup(hash, name_id)
604 && let Some(Some(entity)) = entities.get_mut(slot as usize)
605 {
606 for &o in observations {
607 let oid = interner.intern(o);
608 if !entity.observations.contains(&oid) {
609 entity.observations.push(oid);
610 }
611 }
612 search.remove_entity(slot);
613 search.index_entity(
614 interner,
615 slot,
616 entity.name,
617 entity.entity_type,
618 &entity.observations,
619 );
620 }
621 }
622
623 fn replay_delete_entity(
624 interner: &mut StringInterner,
625 entities: &mut [Option<StoredEntity>],
626 rels: &mut Vec<StoredRelation>,
627 search: &mut SearchIndex,
628 name_table: &mut ShardedNameTable,
629 name: &str,
630 ) {
631 let name_id = interner.intern(name);
632 let hash = interner.get_hash(name_id);
633 if let Some(slot) = name_table.lookup(hash, name_id)
634 && let Some(Some(_)) = entities.get(slot as usize)
635 {
636 entities[slot as usize] = None;
637 search.remove_entity(slot);
638 name_table.remove(&*interner, hash, name_id);
639 }
640 rels.retain(|r| r.from != name_id && r.to != name_id);
641 }
642
643 fn replay_delete_observations(
644 interner: &mut StringInterner,
645 entities: &mut [Option<StoredEntity>],
646 search: &mut SearchIndex,
647 name_table: &mut ShardedNameTable,
648 name: &str,
649 observations: &[&str],
650 ) {
651 let name_id = interner.intern(name);
652 let hash = interner.get_hash(name_id);
653 if let Some(slot) = name_table.lookup(hash, name_id)
654 && let Some(Some(entity)) = entities.get_mut(slot as usize)
655 {
656 let remove_ids: Vec<StrId> = observations.iter().map(|o| interner.intern(o)).collect();
657 entity.observations.retain(|o| !remove_ids.contains(o));
658 search.remove_entity(slot);
659 search.index_entity(
660 interner,
661 slot,
662 entity.name,
663 entity.entity_type,
664 &entity.observations,
665 );
666 }
667 }
668
669 pub const fn interner(&self) -> &StringInterner {
674 &self.interner
675 }
676
677 pub fn get_entity(&self, name: &str) -> Option<Entity> {
679 let name_id = self.interner.get_optional(name)?;
680 let hash = self.interner.get_hash(name_id);
681 let slot = self.name_table.lookup(hash, name_id)?;
682 let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
683 if !stored.is_live() {
684 return None;
685 }
686 Some(self.entity_to_output(stored))
687 }
688
689 pub fn graph_stats(&self) -> serde_json::Value {
691 let live_entities = self
692 .entity_slots
693 .iter()
694 .filter(|s| s.as_ref().is_some_and(|e| e.is_live()))
695 .count();
696 let total_relations = self.relations.len();
697 let index_entries = self.search.len();
698 let total_obs: usize = self
699 .entity_slots
700 .iter()
701 .filter_map(|s| s.as_ref())
702 .filter(|e| e.is_live())
703 .map(|e| e.observations.len())
704 .sum();
705
706 serde_json::json!({
707 "entities": live_entities,
708 "relations": total_relations,
709 "totalObservations": total_obs,
710 "searchIndexEntries": index_entries,
711 "internedStrings": self.interner.len(),
712 "internedBytes": self.interner.total_bytes(),
713 })
714 }
715
716 pub fn search_relations(&self, from: Option<&str>, to: Option<&str>, rtype: Option<&str>) -> Vec<Relation> {
720 let from_id = match from {
721 Some(f) => match self.interner.get_optional(f) {
722 Some(id) => Some(id),
723 None => return Vec::new(),
724 },
725 None => None,
726 };
727 let to_id = match to {
728 Some(t) => match self.interner.get_optional(t) {
729 Some(id) => Some(id),
730 None => return Vec::new(),
731 },
732 None => None,
733 };
734 let rtype_id = match rtype {
735 Some(r) => match self.interner.get_optional(r) {
736 Some(id) => Some(id),
737 None => return Vec::new(),
738 },
739 None => None,
740 };
741
742 self.relations
743 .iter()
744 .filter(|r| {
745 from_id.is_none_or(|f| r.from == f)
746 && to_id.is_none_or(|t| r.to == t)
747 && rtype_id.is_none_or(|rt| r.relation_type == rt)
748 })
749 .map(|r| Relation {
750 from: self.interner.lookup(r.from).to_string(),
751 to: self.interner.lookup(r.to).to_string(),
752 relation_type: self.interner.lookup(r.relation_type).to_string(),
753 })
754 .collect()
755 }
756
757 pub fn find_path(&self, from: &str, to: &str) -> Result<Vec<String>> {
760 let from_id = self.interner.get_optional(from)
761 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
762 let to_id = self.interner.get_optional(to)
763 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
764 let hash_from = self.interner.get_hash(from_id);
765 let hash_to = self.interner.get_hash(to_id);
766
767 if self.name_table.lookup(hash_from, from_id).is_none() {
768 return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
769 }
770 if self.name_table.lookup(hash_to, to_id).is_none() {
771 return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
772 }
773 if from_id == to_id {
774 return Ok(vec![from.to_string()]);
775 }
776
777 let mut adj: AHashMap<StrId, Vec<(StrId, StrId)>> = AHashMap::new();
779 for rel in &self.relations {
780 adj.entry(rel.from).or_default().push((rel.to, rel.relation_type));
781 adj.entry(rel.to).or_default().push((rel.from, rel.relation_type));
782 }
783
784 let mut visited: AHashSet<StrId> = AHashSet::new();
786 let mut parent: AHashMap<StrId, StrId> = AHashMap::new();
787 let mut queue: VecDeque<StrId> = VecDeque::new();
788
789 visited.insert(from_id);
790 queue.push_back(from_id);
791
792 while let Some(current) = queue.pop_front() {
793 if current == to_id {
794 break;
795 }
796
797 if let Some(neighbors) = adj.get(¤t) {
798 for &(neighbor, _) in neighbors {
799 if visited.insert(neighbor) {
800 parent.insert(neighbor, current);
801 queue.push_back(neighbor);
802 }
803 }
804 }
805 }
806
807 if !parent.contains_key(&to_id) && from_id != to_id {
808 return Err(MCSError::MemoryError(format!(
809 "No path found between '{from}' and '{to}'"
810 )));
811 }
812
813 let mut path: Vec<String> = Vec::new();
815 let mut cur = to_id;
816 loop {
817 path.push(self.interner.lookup(cur).to_string());
818 if cur == from_id {
819 break;
820 }
821 cur = *parent.get(&cur).ok_or_else(|| {
822 MCSError::MemoryError("Path reconstruction failed".into())
823 })?;
824 }
825 path.reverse();
826 Ok(path)
827 }
828
829 pub fn compact(&mut self) -> Result<()> {
834 let mut create_entities: Vec<Entity> = Vec::new();
836 let mut create_relations: Vec<Relation> = Vec::new();
837
838 for slot in &self.entity_slots {
839 if let Some(stored) = slot.as_ref().filter(|e| e.is_live()) {
840 create_entities.push(self.entity_to_output(stored));
841 }
842 }
843 for rel in &self.relations {
844 create_relations.push(Relation {
845 from: self.interner.lookup(rel.from).to_string(),
846 to: self.interner.lookup(rel.to).to_string(),
847 relation_type: self.interner.lookup(rel.relation_type).to_string(),
848 });
849 }
850
851 let tmp_path = self.store.path().with_extension("tmp");
858 if let Err(e) = std::fs::remove_file(&tmp_path)
859 && e.kind() != std::io::ErrorKind::NotFound
860 {
861 return Err(MCSError::IoError(e));
862 }
863 let mut tmp_store = BinaryStore::new(&tmp_path).map_err(MCSError::IoError)?;
864 for entity in &create_entities {
865 let mut buf = Vec::new();
866 store_enc::encode_create_entity(&mut buf, &entity.name, &entity.entity_type, &entity.observations)
867 .map_err(MCSError::IoError)?;
868 tmp_store.write_record(RecordKind::CreateEntity, &buf).map_err(MCSError::IoError)?;
869 }
870 for relation in &create_relations {
871 let mut buf = Vec::new();
872 store_enc::encode_create_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
873 .map_err(MCSError::IoError)?;
874 tmp_store.write_record(RecordKind::CreateRelation, &buf).map_err(MCSError::IoError)?;
875 }
876 tmp_store.flush_and_sync().map_err(MCSError::IoError)?;
877 drop(tmp_store);
878
879 std::fs::rename(&tmp_path, self.store.path()).map_err(MCSError::IoError)?;
884 sync_parent_dir(self.store.path()).map_err(MCSError::IoError)?;
885
886 let path = self.store.path().clone();
892 *self = KnowledgeGraph::new(&path).map_err(MCSError::IoError)?;
893
894 Ok(())
895 }
896
897 pub fn create_entities(&mut self, entities: &[Entity]) -> Result<Vec<Entity>> {
900 for entity in entities {
902 if entity.name.is_empty() {
903 return Err(MCSError::InvalidParams(
904 "Entity name must not be empty".into(),
905 ));
906 }
907 }
908 let mut created = Vec::new();
909 for entity in entities {
910 let existing = self.interner.get_optional(&entity.name)
912 .and_then(|id| {
913 let hash = self.interner.get_hash(id);
914 self.name_table.lookup(hash, id)
915 });
916 if existing.is_some() {
917 continue;
918 }
919 let mut buf = Vec::new();
921 store_enc::encode_create_entity(&mut buf, &entity.name, &entity.entity_type, &entity.observations)
922 .map_err(MCSError::IoError)?;
923 self.store.write_record(RecordKind::CreateEntity, &buf)
924 .map_err(MCSError::IoError)?;
925
926 let name_id = self.interner.intern(&entity.name);
927 let hash = self.interner.get_hash(name_id);
928 let type_id = self.interner.intern(&entity.entity_type);
929 let obs_ids: Vec<StrId> = entity
930 .observations
931 .iter()
932 .map(|o| self.interner.intern(o))
933 .collect();
934 let reused = self.free_slots.pop();
937 let slot = reused.unwrap_or(self.entity_slots.len() as u32);
938 self.search
939 .index_entity(&mut self.interner, slot, name_id, type_id, &obs_ids);
940 let stored = Some(StoredEntity {
941 state: ENTITY_SLOT_LIVE,
942 name: name_id,
943 entity_type: type_id,
944 observations: obs_ids,
945 });
946 match reused {
947 Some(s) => self.entity_slots[s as usize] = stored,
948 None => self.entity_slots.push(stored),
949 }
950 self.name_table.insert(&self.interner, hash, name_id, slot);
951 created.push(Entity {
952 name: entity.name.clone(),
953 entity_type: entity.entity_type.clone(),
954 observations: entity.observations.clone(),
955 });
956 }
957 Ok(created)
958 }
959
960 pub fn create_relations(&mut self, relations: &[Relation]) -> Result<Vec<Relation>> {
961 for relation in relations {
963 if relation.from.is_empty() || relation.to.is_empty() {
964 return Err(MCSError::InvalidParams(
965 "Relation endpoints must not be empty".into(),
966 ));
967 }
968 }
969 let mut created = Vec::new();
970 let mut rel_set: AHashSet<(StrId, StrId, StrId)> = AHashSet::new();
972 for rel in &self.relations {
973 rel_set.insert((rel.from, rel.to, rel.relation_type));
974 }
975 for relation in relations {
976 let from_id = self.interner.intern(&relation.from);
977 let to_id = self.interner.intern(&relation.to);
978 let type_id = self.interner.intern(&relation.relation_type);
979 if !rel_set.insert((from_id, to_id, type_id)) {
980 continue;
981 }
982 let mut buf = Vec::new();
984 store_enc::encode_create_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
985 .map_err(MCSError::IoError)?;
986 self.store.write_record(RecordKind::CreateRelation, &buf)
987 .map_err(MCSError::IoError)?;
988
989 self.relations.push(StoredRelation {
990 from: from_id,
991 to: to_id,
992 relation_type: type_id,
993 });
994 created.push(Relation {
995 from: relation.from.clone(),
996 to: relation.to.clone(),
997 relation_type: relation.relation_type.clone(),
998 });
999 }
1000 Ok(created)
1001 }
1002
1003 pub fn add_observations(&mut self, entity_name: &str, contents: &[String]) -> Result<Vec<String>> {
1004 let name_id = self.interner.get_optional(entity_name)
1005 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1006 let hash = self.interner.get_hash(name_id);
1007 let slot = self
1008 .name_table
1009 .lookup(hash, name_id)
1010 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1011 let existing: AHashSet<StrId> = self
1014 .entity_slots
1015 .get(slot as usize)
1016 .and_then(|e| e.as_ref())
1017 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?
1018 .observations
1019 .iter()
1020 .copied()
1021 .collect();
1022
1023 let mut added = Vec::new();
1026 let mut interned_added = Vec::new();
1027 let mut seen: AHashSet<StrId> = AHashSet::new();
1028 for content in contents {
1029 let cid = self.interner.intern(content);
1030 if existing.contains(&cid) || !seen.insert(cid) {
1031 continue;
1032 }
1033 interned_added.push(cid);
1034 added.push(content.clone());
1035 }
1036 if added.is_empty() {
1037 return Ok(added);
1038 }
1039
1040 let mut buf = Vec::new();
1043 store_enc::encode_add_observations(&mut buf, entity_name, &added)
1044 .map_err(MCSError::IoError)?;
1045 self.store.write_record(RecordKind::AddObservations, &buf)
1046 .map_err(MCSError::IoError)?;
1047
1048 let stored = self
1050 .entity_slots
1051 .get_mut(slot as usize)
1052 .and_then(|e| e.as_mut())
1053 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1054 stored.observations.extend_from_slice(&interned_added);
1055
1056 self.search
1059 .index_additional(&mut self.interner, slot, &interned_added);
1060 Ok(added)
1061 }
1062
1063 pub fn delete_entities(&mut self, entity_names: &[String]) -> Result<()> {
1064 let mut deleted_names = Vec::new();
1065 for name in entity_names {
1066 let name_id_opt = self.interner.get_optional(name);
1067 if let Some(name_id) = name_id_opt {
1068 let hash = self.interner.get_hash(name_id);
1069 if let Some(slot) = self.name_table.lookup(hash, name_id)
1070 && let Some(Some(_)) = self.entity_slots.get(slot as usize)
1071 {
1072 let mut buf = Vec::new();
1074 store_enc::encode_delete_entity(&mut buf, name)
1075 .map_err(MCSError::IoError)?;
1076 self.store.write_record(RecordKind::DeleteEntity, &buf)
1077 .map_err(MCSError::IoError)?;
1078
1079 self.entity_slots[slot as usize] = None;
1080 self.free_slots.push(slot);
1081 self.search.remove_entity(slot);
1082 self.name_table.remove(&self.interner, hash, name_id);
1083 deleted_names.push(name.clone());
1084 }
1085 }
1086 }
1087 if !deleted_names.is_empty() {
1088 let deleted_ids: AHashSet<StrId> = deleted_names.iter()
1090 .map(|n| self.interner.intern(n))
1091 .collect();
1092 self.relations
1093 .retain(|r| !deleted_ids.contains(&r.from) && !deleted_ids.contains(&r.to));
1094 }
1095 Ok(())
1096 }
1097
1098 pub fn delete_observations(&mut self, entity_name: &str, observations: &[String]) -> Result<()> {
1099 let name_id = self.interner.get_optional(entity_name)
1100 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1101 let hash = self.interner.get_hash(name_id);
1102 let slot = self
1103 .name_table
1104 .lookup(hash, name_id)
1105 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1106 self.entity_slots
1108 .get(slot as usize)
1109 .and_then(|e| e.as_ref())
1110 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1111 let remove_ids: AHashSet<StrId> = observations.iter().map(|o| self.interner.intern(o)).collect();
1112
1113 let mut buf = Vec::new();
1115 store_enc::encode_delete_observations(&mut buf, entity_name, observations)
1116 .map_err(MCSError::IoError)?;
1117 self.store.write_record(RecordKind::DeleteObservations, &buf)
1118 .map_err(MCSError::IoError)?;
1119
1120 let stored = self
1122 .entity_slots
1123 .get_mut(slot as usize)
1124 .and_then(|e| e.as_mut())
1125 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1126 stored.observations.retain(|o| !remove_ids.contains(o));
1127 self.search.remove_entity(slot);
1128 self.search
1129 .index_entity(&mut self.interner, slot, stored.name, stored.entity_type, &stored.observations);
1130 Ok(())
1131 }
1132
1133 pub fn delete_relations(&mut self, relations: &[Relation]) -> Result<()> {
1134 let rels: AHashSet<(StrId, StrId, StrId)> = relations
1136 .iter()
1137 .map(|r| {
1138 (
1139 self.interner.intern(&r.from),
1140 self.interner.intern(&r.to),
1141 self.interner.intern(&r.relation_type),
1142 )
1143 })
1144 .collect();
1145 for relation in relations {
1148 let mut buf = Vec::new();
1149 store_enc::encode_delete_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
1150 .map_err(MCSError::IoError)?;
1151 self.store.write_record(RecordKind::DeleteRelation, &buf)
1152 .map_err(MCSError::IoError)?;
1153 }
1154 self.relations
1155 .retain(|r| !rels.contains(&(r.from, r.to, r.relation_type)));
1156 Ok(())
1157 }
1158
1159 pub fn read_graph(&self) -> KnowledgeGraphOut {
1160 self.read_graph_view().to_owned_out()
1161 }
1162
1163 pub fn read_graph_view(&self) -> GraphView<'_> {
1167 let entities: Vec<&StoredEntity> = self
1168 .entity_slots
1169 .iter()
1170 .filter_map(|s| s.as_ref().filter(|e| e.is_live()))
1171 .collect();
1172 let relations: Vec<&StoredRelation> = self.relations.iter().collect();
1173 GraphView { kg: self, entities, relations }
1174 }
1175
1176 pub fn search_nodes(&self, query: &str) -> KnowledgeGraphOut {
1179 self.search_nodes_filtered(query, None, 0, usize::MAX)
1180 }
1181
1182 pub fn open_nodes(&self, names: &[String]) -> KnowledgeGraphOut {
1183 self.open_nodes_view(names).to_owned_out()
1184 }
1185
1186 pub fn open_nodes_view(&self, names: &[String]) -> GraphView<'_> {
1188 let name_ids: AHashSet<StrId> = names.iter()
1189 .filter_map(|n| self.interner.get_optional(n))
1190 .collect();
1191 let entities: Vec<&StoredEntity> = self
1192 .entity_slots
1193 .iter()
1194 .filter_map(|s| {
1195 s.as_ref()
1196 .filter(|stored| stored.is_live() && name_ids.contains(&stored.name))
1197 })
1198 .collect();
1199 let matched_names: AHashSet<StrId> = entities.iter().map(|e| e.name).collect();
1200 let relations: Vec<&StoredRelation> = self
1201 .relations
1202 .iter()
1203 .filter(|r| matched_names.contains(&r.from) || matched_names.contains(&r.to))
1204 .collect();
1205 GraphView { kg: self, entities, relations }
1206 }
1207
1208 fn entity_to_output(&self, stored: &StoredEntity) -> Entity {
1213 Entity {
1214 name: self.interner.lookup(stored.name).to_string(),
1215 entity_type: self.interner.lookup(stored.entity_type).to_string(),
1216 observations: stored
1217 .observations
1218 .iter()
1219 .map(|o| self.interner.lookup(*o).to_string())
1220 .collect(),
1221 }
1222 }
1223
1224 #[inline]
1225 fn relation_to_output(&self, r: &StoredRelation) -> Relation {
1226 Relation {
1227 from: self.interner.lookup(r.from).to_string(),
1228 to: self.interner.lookup(r.to).to_string(),
1229 relation_type: self.interner.lookup(r.relation_type).to_string(),
1230 }
1231 }
1232
1233 fn lookup_live_slot(&self, name: &str) -> Option<u32> {
1235 let name_id = self.interner.get_optional(name)?;
1236 let hash = self.interner.get_hash(name_id);
1237 let slot = self.name_table.lookup(hash, name_id)?;
1238 let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
1239 stored.is_live().then_some(slot)
1240 }
1241
1242 fn entity_by_name_id(&self, name_id: StrId) -> Option<Entity> {
1244 let hash = self.interner.get_hash(name_id);
1245 let slot = self.name_table.lookup(hash, name_id)?;
1246 let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
1247 stored.is_live().then(|| self.entity_to_output(stored))
1248 }
1249
1250 pub fn entity_type_counts(&self) -> Vec<(String, usize)> {
1254 let mut counts: AHashMap<StrId, usize> = AHashMap::new();
1255 for st in self
1256 .entity_slots
1257 .iter()
1258 .filter_map(|s| s.as_ref())
1259 .filter(|e| e.is_live())
1260 {
1261 *counts.entry(st.entity_type).or_insert(0) += 1;
1262 }
1263 self.rank_counts(counts)
1264 }
1265
1266 pub fn relation_type_counts(&self) -> Vec<(String, usize)> {
1268 let mut counts: AHashMap<StrId, usize> = AHashMap::new();
1269 for r in &self.relations {
1270 *counts.entry(r.relation_type).or_insert(0) += 1;
1271 }
1272 self.rank_counts(counts)
1273 }
1274
1275 fn rank_counts(&self, counts: AHashMap<StrId, usize>) -> Vec<(String, usize)> {
1276 let mut out: Vec<(String, usize)> = counts
1277 .into_iter()
1278 .map(|(id, c)| (self.interner.lookup(id).to_string(), c))
1279 .collect();
1280 out.sort_unstable_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
1281 out
1282 }
1283
1284 pub fn search_nodes_filtered(
1288 &self,
1289 query: &str,
1290 entity_type: Option<&str>,
1291 offset: usize,
1292 limit: usize,
1293 ) -> KnowledgeGraphOut {
1294 self.search_nodes_view(query, entity_type, offset, limit).to_owned_out()
1295 }
1296
1297 pub fn search_nodes_view(
1299 &self,
1300 query: &str,
1301 entity_type: Option<&str>,
1302 offset: usize,
1303 limit: usize,
1304 ) -> GraphView<'_> {
1305 let type_id = match entity_type {
1306 Some(t) => match self.interner.get_optional(t) {
1307 Some(id) => Some(id),
1308 None => return GraphView { kg: self, entities: Vec::new(), relations: Vec::new() },
1309 },
1310 None => None,
1311 };
1312
1313 let ranked = self.search.search_ranked(query, &self.interner);
1314 let mut selected: AHashSet<StrId> = AHashSet::new();
1315 let mut entities: Vec<&StoredEntity> = Vec::new();
1316 let mut skipped = 0usize;
1317 for (slot, _score) in ranked {
1318 let Some(st) = self
1319 .entity_slots
1320 .get(slot as usize)
1321 .and_then(|s| s.as_ref())
1322 .filter(|e| e.is_live())
1323 else {
1324 continue;
1325 };
1326 if type_id.is_some_and(|tid| st.entity_type != tid) {
1327 continue;
1328 }
1329 if skipped < offset {
1330 skipped += 1;
1331 continue;
1332 }
1333 if entities.len() >= limit {
1334 break;
1335 }
1336 selected.insert(st.name);
1337 entities.push(st);
1338 }
1339
1340 let relations: Vec<&StoredRelation> = self
1341 .relations
1342 .iter()
1343 .filter(|r| selected.contains(&r.from) || selected.contains(&r.to))
1344 .collect();
1345 GraphView { kg: self, entities, relations }
1346 }
1347
1348 pub fn read_graph_filtered(
1352 &self,
1353 entity_type: Option<&str>,
1354 offset: usize,
1355 limit: usize,
1356 ) -> KnowledgeGraphOut {
1357 self.read_graph_filtered_view(entity_type, offset, limit).to_owned_out()
1358 }
1359
1360 pub fn read_graph_filtered_view(
1362 &self,
1363 entity_type: Option<&str>,
1364 offset: usize,
1365 limit: usize,
1366 ) -> GraphView<'_> {
1367 let type_id = match entity_type {
1368 Some(t) => match self.interner.get_optional(t) {
1369 Some(id) => Some(id),
1370 None => return GraphView { kg: self, entities: Vec::new(), relations: Vec::new() },
1371 },
1372 None => None,
1373 };
1374
1375 let mut selected: AHashSet<StrId> = AHashSet::new();
1376 let mut entities: Vec<&StoredEntity> = Vec::new();
1377 let mut skipped = 0usize;
1378 for st in self
1379 .entity_slots
1380 .iter()
1381 .filter_map(|s| s.as_ref())
1382 .filter(|e| e.is_live())
1383 {
1384 if type_id.is_some_and(|tid| st.entity_type != tid) {
1385 continue;
1386 }
1387 if skipped < offset {
1388 skipped += 1;
1389 continue;
1390 }
1391 if entities.len() >= limit {
1392 break;
1393 }
1394 selected.insert(st.name);
1395 entities.push(st);
1396 }
1397
1398 let relations: Vec<&StoredRelation> = self
1399 .relations
1400 .iter()
1401 .filter(|r| selected.contains(&r.from) && selected.contains(&r.to))
1402 .collect();
1403 GraphView { kg: self, entities, relations }
1404 }
1405
1406 pub fn neighbors(
1414 &self,
1415 name: &str,
1416 direction: Direction,
1417 rtype: Option<&str>,
1418 depth: u32,
1419 ) -> Result<KnowledgeGraphOut> {
1420 self.lookup_live_slot(name)
1421 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
1422 let start = self.interner.get_optional(name).unwrap();
1424
1425 let rtype_id = match rtype {
1427 Some(r) => match self.interner.get_optional(r) {
1428 Some(id) => Some(id),
1429 None => {
1430 let entities = self.entity_by_name_id(start).into_iter().collect();
1431 return Ok(KnowledgeGraphOut { entities, relations: Vec::new() });
1432 }
1433 },
1434 None => None,
1435 };
1436
1437 let mut visited: AHashSet<StrId> = AHashSet::new();
1438 visited.insert(start);
1439
1440 let type_ok = |r: &StoredRelation| rtype_id.is_none_or(|rt| r.relation_type == rt);
1441
1442 if depth == 1 {
1443 for r in self.relations.iter().filter(|r| type_ok(r)) {
1444 match direction {
1445 Direction::Out => {
1446 if r.from == start {
1447 visited.insert(r.to);
1448 }
1449 }
1450 Direction::In => {
1451 if r.to == start {
1452 visited.insert(r.from);
1453 }
1454 }
1455 Direction::Both => {
1456 if r.from == start {
1457 visited.insert(r.to);
1458 } else if r.to == start {
1459 visited.insert(r.from);
1460 }
1461 }
1462 }
1463 }
1464 } else if depth >= 2 {
1465 let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
1467 for r in self.relations.iter().filter(|r| type_ok(r)) {
1468 match direction {
1469 Direction::Out => adj.entry(r.from).or_default().push(r.to),
1470 Direction::In => adj.entry(r.to).or_default().push(r.from),
1471 Direction::Both => {
1472 adj.entry(r.from).or_default().push(r.to);
1473 adj.entry(r.to).or_default().push(r.from);
1474 }
1475 }
1476 }
1477 let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
1478 queue.push_back((start, 0));
1479 while let Some((node, d)) = queue.pop_front() {
1480 if d >= depth {
1481 continue;
1482 }
1483 if let Some(nbrs) = adj.get(&node) {
1484 for &nb in nbrs {
1485 if visited.insert(nb) {
1486 queue.push_back((nb, d + 1));
1487 }
1488 }
1489 }
1490 }
1491 }
1492
1493 let mut entities = Vec::with_capacity(visited.len());
1494 for &nid in &visited {
1495 if let Some(e) = self.entity_by_name_id(nid) {
1496 entities.push(e);
1497 }
1498 }
1499 let relations = self
1500 .relations
1501 .iter()
1502 .filter(|r| type_ok(r) && visited.contains(&r.from) && visited.contains(&r.to))
1503 .map(|r| self.relation_to_output(r))
1504 .collect();
1505 Ok(KnowledgeGraphOut { entities, relations })
1506 }
1507
1508 pub fn describe_entity(&self, name: &str) -> Result<serde_json::Value> {
1512 let name_id = self
1513 .interner
1514 .get_optional(name)
1515 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
1516 let entity = self
1517 .entity_by_name_id(name_id)
1518 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
1519
1520 let mut incident: Vec<Relation> = Vec::new();
1521 let mut neighbor_seen: AHashSet<StrId> = AHashSet::new();
1522 let mut neighbors: Vec<&str> = Vec::new();
1523 for r in &self.relations {
1524 if r.from == name_id || r.to == name_id {
1525 incident.push(self.relation_to_output(r));
1526 let other = if r.from == name_id { r.to } else { r.from };
1527 if other != name_id && neighbor_seen.insert(other) {
1528 neighbors.push(self.interner.lookup(other));
1529 }
1530 }
1531 }
1532
1533 Ok(serde_json::json!({
1534 "entity": entity,
1535 "relations": incident,
1536 "neighbors": neighbors,
1537 "degree": incident.len(),
1538 }))
1539 }
1540
1541 pub fn upsert_entities(&mut self, entities: &[Entity]) -> Result<Vec<serde_json::Value>> {
1546 for e in entities {
1547 if e.name.is_empty() {
1548 return Err(MCSError::InvalidParams(
1549 "Entity name must not be empty".into(),
1550 ));
1551 }
1552 }
1553 let mut out = Vec::with_capacity(entities.len());
1554 for e in entities {
1555 if self.lookup_live_slot(&e.name).is_some() {
1556 let added = self.add_observations(&e.name, &e.observations)?;
1557 out.push(serde_json::json!({
1558 "name": e.name,
1559 "created": false,
1560 "addedObservations": added,
1561 }));
1562 } else {
1563 let created = self.create_entities(std::slice::from_ref(e))?;
1564 out.push(serde_json::json!({
1565 "name": e.name,
1566 "created": !created.is_empty(),
1567 "addedObservations": e.observations,
1568 }));
1569 }
1570 }
1571 Ok(out)
1572 }
1573
1574 pub fn export(&self, format: &str) -> Result<String> {
1576 match format {
1577 "json" => serde_json::to_string(&self.read_graph()).map_err(MCSError::JsonError),
1578 "mermaid" => Ok(self.export_mermaid()),
1579 "dot" => Ok(self.export_dot()),
1580 other => Err(MCSError::InvalidParams(format!(
1581 "Unknown export format '{other}' (expected json|mermaid|dot)"
1582 ))),
1583 }
1584 }
1585
1586 fn diagram_node_ids(&self) -> (AHashMap<StrId, usize>, Vec<(usize, StrId)>) {
1588 let mut ids: AHashMap<StrId, usize> = AHashMap::new();
1589 let mut order: Vec<(usize, StrId)> = Vec::new();
1590 for st in self
1591 .entity_slots
1592 .iter()
1593 .filter_map(|s| s.as_ref())
1594 .filter(|e| e.is_live())
1595 {
1596 let n = ids.len();
1597 ids.insert(st.name, n);
1598 order.push((n, st.name));
1599 }
1600 (ids, order)
1601 }
1602
1603 fn export_mermaid(&self) -> String {
1604 let (ids, order) = self.diagram_node_ids();
1605 let mut s = String::with_capacity(64 + order.len() * 32 + self.relations.len() * 32);
1606 s.push_str("graph LR\n");
1607 for (n, name_id) in &order {
1608 let label = sanitize_label(self.interner.lookup(*name_id));
1609 s.push_str(&format!(" n{n}[\"{label}\"]\n"));
1610 }
1611 for r in &self.relations {
1612 if let (Some(&a), Some(&b)) = (ids.get(&r.from), ids.get(&r.to)) {
1613 let rel = sanitize_label(self.interner.lookup(r.relation_type));
1614 s.push_str(&format!(" n{a} -->|{rel}| n{b}\n"));
1615 }
1616 }
1617 s
1618 }
1619
1620 fn export_dot(&self) -> String {
1621 let (ids, order) = self.diagram_node_ids();
1622 let mut s = String::with_capacity(64 + order.len() * 32 + self.relations.len() * 32);
1623 s.push_str("digraph G {\n");
1624 for (n, name_id) in &order {
1625 let label = sanitize_label(self.interner.lookup(*name_id));
1626 s.push_str(&format!(" n{n} [label=\"{label}\"];\n"));
1627 }
1628 for r in &self.relations {
1629 if let (Some(&a), Some(&b)) = (ids.get(&r.from), ids.get(&r.to)) {
1630 let rel = sanitize_label(self.interner.lookup(r.relation_type));
1631 s.push_str(&format!(" n{a} -> n{b} [label=\"{rel}\"];\n"));
1632 }
1633 }
1634 s.push_str("}\n");
1635 s
1636 }
1637
1638 pub fn merge_entities(&mut self, source: &str, target: &str) -> Result<serde_json::Value> {
1651 if source == target {
1652 return Err(MCSError::InvalidParams(
1653 "Source and target must be different entities".into(),
1654 ));
1655 }
1656 self.lookup_live_slot(source).ok_or_else(|| {
1657 MCSError::InvalidParams(format!("Source entity '{source}' not found"))
1658 })?;
1659 let target_slot = self.lookup_live_slot(target).ok_or_else(|| {
1660 MCSError::InvalidParams(format!("Target entity '{target}' not found"))
1661 })?;
1662
1663 let source_entity = self.get_entity(source).unwrap();
1664 let moved_obs_count = source_entity.observations.len();
1665 let source_id = self.interner.get_optional(source).unwrap();
1666 let target_id = self.interner.get_optional(target).unwrap();
1667
1668 let target_existing: AHashSet<StrId> = self.entity_slots[target_slot as usize]
1671 .as_ref()
1672 .unwrap()
1673 .observations
1674 .iter()
1675 .copied()
1676 .collect();
1677 let mut obs_seen: AHashSet<StrId> = AHashSet::new();
1678 let mut obs_to_add: Vec<String> = Vec::new();
1679 for o in &source_entity.observations {
1680 if let Some(oid) = self.interner.get_optional(o)
1681 && !target_existing.contains(&oid)
1682 && obs_seen.insert(oid)
1683 {
1684 obs_to_add.push(o.clone());
1685 }
1686 }
1687
1688 let existing_rels: AHashSet<(StrId, StrId, StrId)> =
1691 self.relations.iter().map(|r| (r.from, r.to, r.relation_type)).collect();
1692 let mut rel_seen: AHashSet<(StrId, StrId, StrId)> = AHashSet::new();
1693 let mut redirect: Vec<Relation> = Vec::new();
1694 for r in &self.relations {
1695 if r.from != source_id && r.to != source_id {
1696 continue;
1697 }
1698 let new_from = if r.from == source_id { target_id } else { r.from };
1699 let new_to = if r.to == source_id { target_id } else { r.to };
1700 if new_from == new_to {
1701 continue; }
1703 let key = (new_from, new_to, r.relation_type);
1704 if existing_rels.contains(&key) || !rel_seen.insert(key) {
1705 continue;
1706 }
1707 redirect.push(Relation {
1708 from: self.interner.lookup(new_from).to_string(),
1709 to: self.interner.lookup(new_to).to_string(),
1710 relation_type: self.interner.lookup(r.relation_type).to_string(),
1711 });
1712 }
1713
1714 let added_count = obs_to_add.len();
1715 let redirected = redirect.len() as u32;
1716
1717 let mut records: Vec<(RecordKind, Vec<u8>)> = Vec::new();
1719 if !obs_to_add.is_empty() {
1720 let mut buf = Vec::new();
1721 store_enc::encode_add_observations(&mut buf, target, &obs_to_add)
1722 .map_err(MCSError::IoError)?;
1723 records.push((RecordKind::AddObservations, buf));
1724 }
1725 for r in &redirect {
1726 let mut buf = Vec::new();
1727 store_enc::encode_create_relation(&mut buf, &r.from, &r.to, &r.relation_type)
1728 .map_err(MCSError::IoError)?;
1729 records.push((RecordKind::CreateRelation, buf));
1730 }
1731 let mut del_buf = Vec::new();
1732 store_enc::encode_delete_entity(&mut del_buf, source).map_err(MCSError::IoError)?;
1733 records.push((RecordKind::DeleteEntity, del_buf));
1734
1735 self.store.write_record(RecordKind::TxnBegin, &[]).map_err(MCSError::IoError)?;
1737 for (kind, data) in &records {
1738 self.store.write_record(*kind, data).map_err(MCSError::IoError)?;
1739 }
1740 self.store.write_record(RecordKind::TxnCommit, &[]).map_err(MCSError::IoError)?;
1741
1742 for (kind, data) in &records {
1744 Self::apply_record(
1745 *kind, data, &mut self.interner, &mut self.entity_slots, &mut self.search,
1746 &mut self.name_table, &mut self.relations,
1747 );
1748 }
1749
1750 Ok(serde_json::json!({
1751 "source": source,
1752 "target": target,
1753 "movedObservations": moved_obs_count,
1754 "addedObservations": added_count,
1755 "redirectedRelations": redirected,
1756 }))
1757 }
1758
1759 pub fn extract_subgraph(&self, names: &[String], depth: u32) -> Result<KnowledgeGraphOut> {
1763 if names.is_empty() {
1764 return Ok(KnowledgeGraphOut {
1765 entities: Vec::new(),
1766 relations: Vec::new(),
1767 });
1768 }
1769 let mut visited: AHashSet<StrId> = AHashSet::new();
1771 let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
1772 for name in names {
1773 if let Some(id) = self.interner.get_optional(name)
1774 && visited.insert(id)
1775 {
1776 queue.push_back((id, 0));
1777 }
1778 }
1779 let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
1781 for r in &self.relations {
1782 adj.entry(r.from).or_default().push(r.to);
1783 adj.entry(r.to).or_default().push(r.from);
1784 }
1785 while let Some((node, d)) = queue.pop_front() {
1786 if d >= depth {
1787 continue;
1788 }
1789 if let Some(nbrs) = adj.get(&node) {
1790 for &nb in nbrs {
1791 if visited.insert(nb) {
1792 queue.push_back((nb, d + 1));
1793 }
1794 }
1795 }
1796 }
1797 let mut entities: Vec<Entity> = Vec::with_capacity(visited.len());
1798 for &nid in &visited {
1799 if let Some(e) = self.entity_by_name_id(nid) {
1800 entities.push(e);
1801 }
1802 }
1803 let relations: Vec<Relation> = self
1804 .relations
1805 .iter()
1806 .filter(|r| visited.contains(&r.from) && visited.contains(&r.to))
1807 .map(|r| self.relation_to_output(r))
1808 .collect();
1809 Ok(KnowledgeGraphOut { entities, relations })
1810 }
1811
1812 pub fn batch_get_entities(&self, names: &[String]) -> Vec<Option<Entity>> {
1814 names.iter().map(|n| self.get_entity(n)).collect()
1815 }
1816
1817 #[allow(clippy::too_many_arguments)]
1820 fn dfs_all_paths(
1821 adj: &AHashMap<StrId, Vec<StrId>>,
1822 current: StrId,
1823 target: StrId,
1824 max_depth: usize,
1825 max_paths: usize,
1826 visited: &mut AHashSet<StrId>,
1827 current_path: &mut Vec<StrId>,
1828 all_paths: &mut Vec<Vec<StrId>>,
1829 ) {
1830 if all_paths.len() >= max_paths {
1831 return;
1832 }
1833 if current == target && current_path.len() > 1 {
1834 all_paths.push(current_path.clone());
1835 return;
1836 }
1837 if current_path.len() > max_depth {
1838 return;
1839 }
1840 if let Some(neighbors) = adj.get(¤t) {
1841 for &nb in neighbors {
1842 if visited.insert(nb) {
1843 current_path.push(nb);
1844 Self::dfs_all_paths(
1845 adj, nb, target, max_depth, max_paths, visited, current_path, all_paths,
1846 );
1847 current_path.pop();
1848 visited.remove(&nb);
1849 }
1850 }
1851 }
1852 }
1853
1854 pub fn find_all_paths(
1858 &self,
1859 from: &str,
1860 to: &str,
1861 max_depth: usize,
1862 max_paths: usize,
1863 ) -> Result<Vec<Vec<String>>> {
1864 let from_id = self
1865 .interner
1866 .get_optional(from)
1867 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
1868 let to_id = self
1869 .interner
1870 .get_optional(to)
1871 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
1872 if self.lookup_live_slot(from).is_none() {
1874 return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
1875 }
1876 if self.lookup_live_slot(to).is_none() {
1877 return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
1878 }
1879 if from_id == to_id {
1880 return Ok(vec![vec![from.to_string()]]);
1881 }
1882 let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
1884 for r in &self.relations {
1885 adj.entry(r.from).or_default().push(r.to);
1886 adj.entry(r.to).or_default().push(r.from);
1887 }
1888 let mut all_paths: Vec<Vec<StrId>> = Vec::new();
1889 let mut current_path = Vec::new();
1890 let mut visited: AHashSet<StrId> = AHashSet::new();
1891 visited.insert(from_id);
1892 current_path.push(from_id);
1893 Self::dfs_all_paths(
1894 &adj,
1895 from_id,
1896 to_id,
1897 max_depth,
1898 max_paths,
1899 &mut visited,
1900 &mut current_path,
1901 &mut all_paths,
1902 );
1903 if all_paths.is_empty() {
1904 return Err(MCSError::MemoryError(format!(
1905 "No path found between '{from}' and '{to}'"
1906 )));
1907 }
1908 let result: Vec<Vec<String>> = all_paths
1909 .into_iter()
1910 .map(|path| {
1911 path.into_iter()
1912 .map(|id| self.interner.lookup(id).to_string())
1913 .collect()
1914 })
1915 .collect();
1916 Ok(result)
1917 }
1918
1919 pub fn flush_and_sync(&mut self) -> Result<()> {
1923 self.store.flush_and_sync().map_err(MCSError::IoError)
1924 }
1925}