1use std::collections::VecDeque;
2use std::path::Path;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::{Arc, Condvar, Mutex as StdMutex};
5
6use ahash::{AHashMap, AHashSet};
7use arc_swap::ArcSwap;
8
9use serde::ser::{Serialize, SerializeSeq, SerializeStruct, Serializer};
10
11use crate::config::Durability;
12use crate::errors::{MCSError, Result};
13use crate::intern::{StrId, StringInterner};
14use crate::types::{Entity, Relation, KnowledgeGraphOut};
15use crate::search::SearchIndex;
16use crate::store::{self as store_enc, BinaryStore, RecordKind};
17
18const NAME_TABLE_SHARDS: usize = 4;
19
20#[cfg(target_arch = "x86_64")]
25#[inline(always)]
26unsafe fn prefetch_addr(addr: *const u8) {
27 std::arch::x86_64::_mm_prefetch::<3>(addr);
29}
30
31#[cfg(not(target_arch = "x86_64"))]
32#[inline(always)]
33const unsafe fn prefetch_addr(_addr: *const u8) {}
34
35fn sync_parent_dir(path: &Path) -> std::io::Result<()> {
38 let dir = path.parent().filter(|p| !p.as_os_str().is_empty());
39 let dir = match dir {
40 Some(d) => d,
41 None => Path::new("."),
42 };
43 match std::fs::File::open(dir) {
44 Ok(f) => match f.sync_all() {
45 Ok(()) => Ok(()),
46 Err(e) if e.kind() == std::io::ErrorKind::InvalidInput => Ok(()),
48 Err(e) => Err(e),
49 },
50 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
51 Err(e) => Err(e),
52 }
53}
54
55#[derive(Clone)]
64#[cfg_attr(feature = "cache_align", repr(align(64)))]
65pub(crate) struct StoredEntity {
66 pub(crate) name: StrId,
67 pub(crate) entity_type: StrId,
68 pub(crate) observations: Vec<StrId>,
69}
70
71#[derive(Clone)]
75#[cfg_attr(feature = "cache_align", repr(align(16)))]
76pub(crate) struct StoredRelation {
77 pub(crate) from: StrId,
78 pub(crate) to: StrId,
79 pub(crate) relation_type: StrId,
80}
81
82pub struct GraphView<'a> {
96 kg: &'a KnowledgeGraph,
97 entities: Vec<&'a StoredEntity>,
98 relations: Vec<&'a StoredRelation>,
99}
100
101impl GraphView<'_> {
102 pub fn to_owned_out(&self) -> KnowledgeGraphOut {
106 KnowledgeGraphOut {
107 entities: self.entities.iter().map(|e| self.kg.entity_to_output(e)).collect(),
108 relations: self.relations.iter().map(|r| self.kg.relation_to_output(r)).collect(),
109 }
110 }
111}
112
113impl Serialize for GraphView<'_> {
114 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
115 let mut st = s.serialize_struct("KnowledgeGraphOut", 2)?;
116 st.serialize_field("entities", &EntityListRef { kg: self.kg, items: &self.entities })?;
117 st.serialize_field("relations", &RelationListRef { kg: self.kg, items: &self.relations })?;
118 st.end()
119 }
120}
121
122struct EntityListRef<'a> { kg: &'a KnowledgeGraph, items: &'a [&'a StoredEntity] }
123impl Serialize for EntityListRef<'_> {
124 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
125 let mut seq = s.serialize_seq(Some(self.items.len()))?;
126 for &e in self.items {
127 seq.serialize_element(&EntityRef { kg: self.kg, e })?;
128 }
129 seq.end()
130 }
131}
132
133struct RelationListRef<'a> { kg: &'a KnowledgeGraph, items: &'a [&'a StoredRelation] }
134impl Serialize for RelationListRef<'_> {
135 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
136 let mut seq = s.serialize_seq(Some(self.items.len()))?;
137 for &r in self.items {
138 seq.serialize_element(&RelationRef { kg: self.kg, r })?;
139 }
140 seq.end()
141 }
142}
143
144struct EntityRef<'a> { kg: &'a KnowledgeGraph, e: &'a StoredEntity }
145impl Serialize for EntityRef<'_> {
146 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
147 let mut st = s.serialize_struct("Entity", 3)?;
148 st.serialize_field("name", self.kg.interner.lookup(self.e.name))?;
149 st.serialize_field("entityType", self.kg.interner.lookup(self.e.entity_type))?;
150 st.serialize_field("observations", &ObsRef { kg: self.kg, obs: &self.e.observations })?;
151 st.end()
152 }
153}
154
155struct ObsRef<'a> { kg: &'a KnowledgeGraph, obs: &'a [StrId] }
156impl Serialize for ObsRef<'_> {
157 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
158 let mut seq = s.serialize_seq(Some(self.obs.len()))?;
159 for &o in self.obs {
160 seq.serialize_element(self.kg.interner.lookup(o))?;
161 }
162 seq.end()
163 }
164}
165
166struct RelationRef<'a> { kg: &'a KnowledgeGraph, r: &'a StoredRelation }
167impl Serialize for RelationRef<'_> {
168 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
169 let mut st = s.serialize_struct("Relation", 3)?;
170 st.serialize_field("from", self.kg.interner.lookup(self.r.from))?;
171 st.serialize_field("to", self.kg.interner.lookup(self.r.to))?;
172 st.serialize_field("relationType", self.kg.interner.lookup(self.r.relation_type))?;
173 st.end()
174 }
175}
176
177#[derive(Clone, Copy, PartialEq, Eq, Debug)]
179pub enum Direction {
180 Out,
182 In,
184 Both,
186}
187
188impl Direction {
189 pub fn parse(s: Option<&str>) -> Self {
191 match s {
192 Some("out") => Direction::Out,
193 Some("in") => Direction::In,
194 _ => Direction::Both,
195 }
196 }
197}
198
199fn sanitize_label(s: &str) -> String {
201 let mut out = String::with_capacity(s.len());
202 for c in s.chars() {
203 match c {
204 '"' => out.push('\''),
205 '\n' | '\r' => out.push(' '),
206 _ => out.push(c),
207 }
208 }
209 out
210}
211
212const EMPTY_SLOT: u8 = 0xFF;
222
223#[inline(always)]
224const fn h2(hash: u64) -> u8 {
225 (hash & 0x7F) as u8
226}
227
228#[inline(always)]
229const fn h1(hash: u64, mask: usize) -> usize {
230 ((hash >> 7) as usize) & mask
231}
232
233#[derive(Clone)]
234struct NameTableShard {
235 ctrl: Vec<u8>, names: Vec<StrId>,
237 slots: Vec<u32>,
238 mask: usize,
239 count: usize,
240}
241
242impl NameTableShard {
243 fn new(capacity: usize) -> Self {
244 let cap = capacity.next_power_of_two().max(16);
245 Self {
246 ctrl: vec![EMPTY_SLOT; cap],
247 names: vec![StrId::EMPTY; cap],
248 slots: vec![u32::MAX; cap],
249 mask: cap - 1,
250 count: 0,
251 }
252 }
253
254 #[inline(always)]
255 fn lookup(&self, hash: u64, name: StrId) -> Option<u32> {
256 let stamp = h2(hash);
257 let mask = self.mask;
258 let mut idx = h1(hash, mask);
259 let ctrl = self.ctrl.as_ptr();
260 let names = self.names.as_ptr();
261 let slots = self.slots.as_ptr();
262 let len = self.ctrl.len();
263
264 for _ in 0..len {
265 let prefetch_idx = idx.wrapping_add(4) & mask;
267 unsafe { prefetch_addr(ctrl.add(prefetch_idx)) };
268
269 unsafe {
271 let c = *ctrl.add(idx);
272 if c & 0x80 != 0 {
274 return None;
275 }
276 if c == stamp && *names.add(idx) == name {
278 return Some(*slots.add(idx));
279 }
280 }
281 idx = (idx + 1) & mask;
282 }
283 None
284 }
285
286 fn insert(&mut self, interner: &StringInterner, hash: u64, name: StrId, slot: u32) {
287 if self.count * 4 > self.ctrl.len() * 3 {
288 self.grow(interner);
289 }
290 let stamp = h2(hash);
291 let mask = self.mask;
292 let mut idx = h1(hash, mask);
293 loop {
294 unsafe {
296 if *self.ctrl.get_unchecked(idx) & 0x80 != 0 {
297 *self.ctrl.get_unchecked_mut(idx) = stamp;
298 *self.names.get_unchecked_mut(idx) = name;
299 *self.slots.get_unchecked_mut(idx) = slot;
300 self.count += 1;
301 return;
302 }
303 }
304 idx = (idx + 1) & mask;
305 }
306 }
307
308 fn remove(&mut self, interner: &StringInterner, hash: u64, name: StrId) {
309 let stamp = h2(hash);
310 let mask = self.mask;
311 let mut idx = h1(hash, mask);
312 let len = self.ctrl.len();
313 for _ in 0..len {
314 if self.ctrl[idx] & 0x80 != 0 {
315 return;
316 }
317 if self.ctrl[idx] == stamp && self.names[idx] == name {
318 self.ctrl[idx] = EMPTY_SLOT;
320 self.names[idx] = StrId::EMPTY;
321 self.slots[idx] = u32::MAX;
322 self.count -= 1;
323
324 let mut next = (idx + 1) & mask;
325 while self.ctrl[next] & 0x80 == 0 {
326 let nn = self.names[next];
327 let ns = self.slots[next];
328 let nh = interner.get_hash(nn);
331 self.ctrl[next] = EMPTY_SLOT;
332 self.names[next] = StrId::EMPTY;
333 self.slots[next] = u32::MAX;
334 self.count -= 1;
335
336 let nstamp = h2(nh);
338 let mut re_idx = h1(nh, mask);
339 while self.ctrl[re_idx] & 0x80 == 0 {
340 re_idx = (re_idx + 1) & mask;
341 }
342 self.ctrl[re_idx] = nstamp;
343 self.names[re_idx] = nn;
344 self.slots[re_idx] = ns;
345 self.count += 1;
346
347 next = (next + 1) & mask;
348 }
349 return;
350 }
351 idx = (idx + 1) & mask;
352 }
353 }
354
355 fn grow(&mut self, interner: &StringInterner) {
356 let new_cap = self.ctrl.len() * 2;
357 let new_mask = new_cap - 1;
358 let mut new_ctrl = vec![EMPTY_SLOT; new_cap];
359 let mut new_names = vec![StrId::EMPTY; new_cap];
360 let mut new_slots = vec![u32::MAX; new_cap];
361
362 for i in 0..self.ctrl.len() {
363 if self.ctrl[i] & 0x80 == 0 {
364 let name = self.names[i];
366 let hash = interner.get_hash(name);
367 let stamp = h2(hash);
368 let mut idx = h1(hash, new_mask);
369 while new_ctrl[idx] & 0x80 == 0 {
370 idx = (idx + 1) & new_mask;
371 }
372 new_ctrl[idx] = stamp;
373 new_names[idx] = name;
374 new_slots[idx] = self.slots[i];
375 }
376 }
377
378 self.ctrl = new_ctrl;
379 self.names = new_names;
380 self.slots = new_slots;
381 self.mask = new_mask;
382 }
383}
384
385#[derive(Clone)]
386struct ShardedNameTable {
387 shards: [NameTableShard; NAME_TABLE_SHARDS],
388}
389
390impl ShardedNameTable {
391 fn new(capacity_per_shard: usize) -> Self {
392 Self {
393 shards: [
394 NameTableShard::new(capacity_per_shard),
395 NameTableShard::new(capacity_per_shard),
396 NameTableShard::new(capacity_per_shard),
397 NameTableShard::new(capacity_per_shard),
398 ],
399 }
400 }
401
402 #[inline(always)]
403 const fn shard(hash: u64) -> usize {
404 (hash as usize) & (NAME_TABLE_SHARDS - 1)
405 }
406
407 #[inline(always)]
408 fn lookup(&self, hash: u64, name: StrId) -> Option<u32> {
409 self.shards[Self::shard(hash)].lookup(hash, name)
410 }
411
412 #[inline(always)]
413 fn insert(&mut self, interner: &StringInterner, hash: u64, name: StrId, slot: u32) {
414 self.shards[Self::shard(hash)].insert(interner, hash, name, slot);
415 }
416
417 #[inline(always)]
418 fn remove(&mut self, interner: &StringInterner, hash: u64, name: StrId) {
419 self.shards[Self::shard(hash)].remove(interner, hash, name);
420 }
421}
422
423pub struct KnowledgeGraph {
427 interner: StringInterner,
428 entity_slots: Vec<Option<StoredEntity>>,
429 free_slots: Vec<u32>,
432 name_table: ShardedNameTable,
433 relations: Vec<StoredRelation>,
434 adjacency: AHashMap<StrId, Vec<(StrId, StrId)>>,
438 search: SearchIndex,
439 store: BinaryStore,
440}
441
442#[derive(Clone)]
447pub struct ReadSnapshot {
448 pub(crate) interner: Arc<StringInterner>,
449 pub(crate) entity_slots: Arc<[Option<StoredEntity>]>,
450 name_table: Arc<ShardedNameTable>,
451 pub(crate) relations: Arc<[StoredRelation]>,
452 adjacency: Arc<AHashMap<StrId, Vec<(StrId, StrId)>>>,
453 search: Arc<SearchIndex>,
454}
455
456pub(crate) fn push_json_str(buf: &mut String, s: &str) {
459 buf.push('"');
460 for c in s.chars() {
461 match c {
462 '"' => buf.push_str("\\\""),
463 '\\' => buf.push_str("\\\\"),
464 '\n' => buf.push_str("\\n"),
465 '\r' => buf.push_str("\\r"),
466 '\t' => buf.push_str("\\t"),
467 c if c.is_control() => {
468 use std::fmt::Write;
469 write!(buf, "\\u{:04x}", c as u32).unwrap();
470 }
471 c => buf.push(c),
472 }
473 }
474 buf.push('"');
475}
476
477pub struct ReadGraphView<'a> {
480 snap: &'a ReadSnapshot,
481 entities: Vec<&'a StoredEntity>,
482 relations: Vec<&'a StoredRelation>,
483}
484
485impl Serialize for ReadGraphView<'_> {
486 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
487 let mut st = s.serialize_struct("KnowledgeGraphOut", 2)?;
488 st.serialize_field("entities", &ReadEntityListRef { snap: self.snap, items: &self.entities })?;
489 st.serialize_field("relations", &ReadRelationListRef { snap: self.snap, items: &self.relations })?;
490 st.end()
491 }
492}
493
494struct ReadEntityListRef<'a> { snap: &'a ReadSnapshot, items: &'a [&'a StoredEntity] }
495impl Serialize for ReadEntityListRef<'_> {
496 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
497 let mut seq = s.serialize_seq(Some(self.items.len()))?;
498 for &e in self.items {
499 seq.serialize_element(&ReadEntityRef { snap: self.snap, e })?;
500 }
501 seq.end()
502 }
503}
504
505struct ReadRelationListRef<'a> { snap: &'a ReadSnapshot, items: &'a [&'a StoredRelation] }
506impl Serialize for ReadRelationListRef<'_> {
507 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
508 let mut seq = s.serialize_seq(Some(self.items.len()))?;
509 for &r in self.items {
510 seq.serialize_element(&ReadRelationRef { snap: self.snap, r })?;
511 }
512 seq.end()
513 }
514}
515
516struct ReadEntityRef<'a> { snap: &'a ReadSnapshot, e: &'a StoredEntity }
517impl Serialize for ReadEntityRef<'_> {
518 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
519 let mut st = s.serialize_struct("Entity", 3)?;
520 st.serialize_field("name", self.snap.interner.lookup(self.e.name))?;
521 st.serialize_field("entityType", self.snap.interner.lookup(self.e.entity_type))?;
522 st.serialize_field("observations", &ReadObsRef { snap: self.snap, obs: &self.e.observations })?;
523 st.end()
524 }
525}
526
527struct ReadObsRef<'a> { snap: &'a ReadSnapshot, obs: &'a [StrId] }
528impl Serialize for ReadObsRef<'_> {
529 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
530 let mut seq = s.serialize_seq(Some(self.obs.len()))?;
531 for &o in self.obs {
532 seq.serialize_element(self.snap.interner.lookup(o))?;
533 }
534 seq.end()
535 }
536}
537
538struct ReadRelationRef<'a> { snap: &'a ReadSnapshot, r: &'a StoredRelation }
539impl Serialize for ReadRelationRef<'_> {
540 fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
541 let mut st = s.serialize_struct("Relation", 3)?;
542 st.serialize_field("from", self.snap.interner.lookup(self.r.from))?;
543 st.serialize_field("to", self.snap.interner.lookup(self.r.to))?;
544 st.serialize_field("relationType", self.snap.interner.lookup(self.r.relation_type))?;
545 st.end()
546 }
547}
548
549impl ReadSnapshot {
551
552 pub fn read_graph_json(&self) -> String {
555 let live = self.entity_slots.iter().filter(|s| s.is_some()).count();
557 let cap = live * 64 + self.relations.len() * 60 + 128;
558 let mut buf = String::with_capacity(cap);
559
560 buf.push_str(r#"{"entities":["#);
562 let mut first = true;
563 for slot in self.entity_slots.iter() {
564 let Some(e) = slot.as_ref() else { continue };
565 if first { first = false } else { buf.push(',') }
566 buf.push('{');
567 buf.push_str(r#""name":"#);
569 push_json_str(&mut buf, self.interner.lookup(e.name));
570 buf.push(',');
571 buf.push_str(r#""entityType":"#);
573 push_json_str(&mut buf, self.interner.lookup(e.entity_type));
574 buf.push(',');
575 buf.push_str(r#""observations":["#);
577 for (oi, o) in e.observations.iter().enumerate() {
578 if oi > 0 { buf.push(',') }
579 push_json_str(&mut buf, self.interner.lookup(*o));
580 }
581 buf.push_str("]}");
582 }
583
584 buf.push_str(r#"],"relations":["#);
586 first = true;
587 for r in self.relations.iter() {
588 if first { first = false } else { buf.push(',') }
589 buf.push('{');
590 buf.push_str(r#""from":"#);
591 push_json_str(&mut buf, self.interner.lookup(r.from));
592 buf.push(',');
593 buf.push_str(r#""to":"#);
594 push_json_str(&mut buf, self.interner.lookup(r.to));
595 buf.push(',');
596 buf.push_str(r#""relationType":"#);
597 push_json_str(&mut buf, self.interner.lookup(r.relation_type));
598 buf.push('}');
599 }
600 buf.push_str("]}");
601
602 buf
603 }
604
605 pub fn read_graph_view(&self) -> ReadGraphView<'_> {
608 let entities: Vec<&StoredEntity> = self
609 .entity_slots
610 .iter()
611 .filter_map(|s| s.as_ref())
612 .collect();
613 let relations: Vec<&StoredRelation> = self.relations.iter().collect();
614 ReadGraphView { snap: self, entities, relations }
615 }
616
617 fn lookup_live_slot(&self, name: &str) -> Option<u32> {
618 let name_id = self.interner.get_optional(name)?;
619 let hash = self.interner.get_hash(name_id);
620 let slot = self.name_table.lookup(hash, name_id)?;
621 self.entity_slots
622 .get(slot as usize)
623 .and_then(|s| s.as_ref())
624 ?;
625 Some(slot)
626 }
627
628 fn entity_by_name_id(&self, name_id: StrId) -> Option<Entity> {
629 let hash = self.interner.get_hash(name_id);
630 let slot = self.name_table.lookup(hash, name_id)?;
631 let e = self.entity_slots.get(slot as usize)?.as_ref()?;
632 Some(self.entity_to_output(e))
633 }
634
635 pub(crate) fn entity_to_output(&self, e: &StoredEntity) -> Entity {
636 Entity {
637 name: self.interner.lookup(e.name).to_string(),
638 entity_type: self.interner.lookup(e.entity_type).to_string(),
639 observations: e
640 .observations
641 .iter()
642 .map(|o| self.interner.lookup(*o).to_string())
643 .collect(),
644 }
645 }
646
647 pub(crate) fn relation_to_output(&self, r: &StoredRelation) -> Relation {
648 Relation {
649 from: self.interner.lookup(r.from).to_string(),
650 to: self.interner.lookup(r.to).to_string(),
651 relation_type: self.interner.lookup(r.relation_type).to_string(),
652 }
653 }
654
655 pub fn open_nodes(&self, names: &[String]) -> KnowledgeGraphOut {
657 let name_ids: std::collections::HashSet<StrId> = names
658 .iter()
659 .filter_map(|n| self.interner.get_optional(n))
660 .collect();
661 let entities: Vec<Entity> = self
662 .entity_slots
663 .iter()
664 .filter_map(|s| {
665 let e = s.as_ref()?;
666 if name_ids.contains(&e.name) {
667 Some(self.entity_to_output(e))
668 } else {
669 None
670 }
671 })
672 .collect();
673 let matched: std::collections::HashSet<StrId> = entities.iter()
674 .filter_map(|e| self.interner.get_optional(&e.name))
675 .collect();
676 let relations: Vec<Relation> = self
677 .relations
678 .iter()
679 .filter(|r| matched.contains(&r.from) || matched.contains(&r.to))
680 .map(|r| self.relation_to_output(r))
681 .collect();
682 KnowledgeGraphOut { entities, relations }
683 }
684
685 pub fn read_graph(&self) -> KnowledgeGraphOut {
687 let entities: Vec<Entity> = self
688 .entity_slots
689 .iter()
690 .filter_map(|s| s.as_ref())
691 .map(|e| self.entity_to_output(e))
692 .collect();
693 let relations: Vec<Relation> = self
694 .relations
695 .iter()
696 .map(|r| self.relation_to_output(r))
697 .collect();
698 KnowledgeGraphOut { entities, relations }
699 }
700
701 pub fn get_entity(&self, name: &str) -> Option<Entity> {
703 self.lookup_live_slot(name)?;
704 let name_id = self.interner.get_optional(name)?;
705 self.entity_by_name_id(name_id)
706 }
707
708 pub fn neighbors(
710 &self,
711 name: &str,
712 direction: Direction,
713 rtype: Option<&str>,
714 depth: u32,
715 ) -> Result<KnowledgeGraphOut> {
716 self.lookup_live_slot(name)
717 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
718 let start = self.interner.get_optional(name).unwrap();
719
720 let rtype_id = match rtype {
721 Some(r) => match self.interner.get_optional(r) {
722 Some(id) => Some(id),
723 None => {
724 let entities = self.entity_by_name_id(start).into_iter().collect();
725 return Ok(KnowledgeGraphOut { entities, relations: Vec::new() });
726 }
727 },
728 None => None,
729 };
730
731 let mut visited: AHashSet<StrId> = AHashSet::new();
732 visited.insert(start);
733
734 let type_ok = |r: &StoredRelation, rt: Option<StrId>| rt.is_none_or(|rt_id| r.relation_type == rt_id);
735
736 if depth == 1 {
737 for r in self.relations.iter().filter(|r| type_ok(r, rtype_id)) {
738 match direction {
739 Direction::Out => {
740 if r.from == start { visited.insert(r.to); }
741 }
742 Direction::In => {
743 if r.to == start { visited.insert(r.from); }
744 }
745 Direction::Both => {
746 if r.from == start { visited.insert(r.to); }
747 else if r.to == start { visited.insert(r.from); }
748 }
749 }
750 }
751 } else if depth >= 2 {
752 let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
753 match direction {
754 Direction::Both => {
755 for (&node, edges) in &*self.adjacency {
756 for &(nb, rt) in edges {
757 if rtype_id.is_none_or(|rt_id| rt == rt_id) {
758 adj.entry(node).or_default().push(nb);
759 }
760 }
761 }
762 }
763 Direction::Out | Direction::In => {
764 for r in self.relations.iter().filter(|r| type_ok(r, rtype_id)) {
765 match direction {
766 Direction::Out => adj.entry(r.from).or_default().push(r.to),
767 Direction::In => adj.entry(r.to).or_default().push(r.from),
768 _ => unreachable!(),
769 }
770 }
771 }
772 }
773 let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
774 queue.push_back((start, 0));
775 while let Some((node, d)) = queue.pop_front() {
776 if d >= depth { continue; }
777 if let Some(nbrs) = adj.get(&node) {
778 for &nb in nbrs {
779 if visited.insert(nb) {
780 queue.push_back((nb, d + 1));
781 }
782 }
783 }
784 }
785 }
786
787 let mut entities = Vec::with_capacity(visited.len());
788 for &nid in &visited {
789 if let Some(e) = self.entity_by_name_id(nid) {
790 entities.push(e);
791 }
792 }
793 let relations: Vec<Relation> = self
794 .relations
795 .iter()
796 .filter(|r| type_ok(r, rtype_id) && visited.contains(&r.from) && visited.contains(&r.to))
797 .map(|r| self.relation_to_output(r))
798 .collect();
799 Ok(KnowledgeGraphOut { entities, relations })
800 }
801
802 pub fn describe_entity(&self, name: &str) -> Result<serde_json::Value> {
804 let name_id = self
805 .interner
806 .get_optional(name)
807 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
808 let entity = self
809 .entity_by_name_id(name_id)
810 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
811
812 let mut incident: Vec<Relation> = Vec::new();
813 let mut neighbor_seen: AHashSet<StrId> = AHashSet::new();
814 let mut neighbors: Vec<&str> = Vec::new();
815 for r in self.relations.iter() {
816 if r.from == name_id || r.to == name_id {
817 incident.push(self.relation_to_output(r));
818 let other = if r.from == name_id { r.to } else { r.from };
819 if other != name_id && neighbor_seen.insert(other) {
820 neighbors.push(self.interner.lookup(other));
821 }
822 }
823 }
824
825 Ok(serde_json::json!({
826 "entity": entity,
827 "relations": incident,
828 "neighbors": neighbors,
829 "degree": incident.len(),
830 }))
831 }
832
833 pub fn find_path(&self, from: &str, to: &str) -> Result<Vec<String>> {
835 let from_id = self
836 .interner
837 .get_optional(from)
838 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
839 let to_id = self
840 .interner
841 .get_optional(to)
842 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
843 if self.lookup_live_slot(from).is_none() {
844 return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
845 }
846 if self.lookup_live_slot(to).is_none() {
847 return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
848 }
849
850 let mut visited: AHashSet<StrId> = AHashSet::new();
852 let mut parent: AHashMap<StrId, StrId> = AHashMap::new();
853 let mut queue: VecDeque<StrId> = VecDeque::new();
854
855 visited.insert(from_id);
856 queue.push_back(from_id);
857
858 while let Some(current) = queue.pop_front() {
859 if current == to_id { break; }
860 if let Some(neighbors) = self.adjacency.get(¤t) {
861 for &(neighbor, _) in neighbors {
862 if visited.insert(neighbor) {
863 parent.insert(neighbor, current);
864 queue.push_back(neighbor);
865 }
866 }
867 }
868 }
869
870 if !visited.contains(&to_id) {
871 return Err(MCSError::MemoryError(format!(
872 "No path found between '{from}' and '{to}'"
873 )));
874 }
875
876 let mut path = Vec::new();
877 let mut cur = to_id;
878 path.push(self.interner.lookup(cur).to_string());
879 while let Some(&p) = parent.get(&cur) {
880 path.push(self.interner.lookup(p).to_string());
881 cur = p;
882 }
883 path.reverse();
884 Ok(path)
885 }
886
887 pub fn extract_subgraph(&self, names: &[String], depth: u32) -> Result<KnowledgeGraphOut> {
889 if names.is_empty() {
890 return Ok(KnowledgeGraphOut { entities: Vec::new(), relations: Vec::new() });
891 }
892 let mut visited: AHashSet<StrId> = AHashSet::new();
893 let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
894 for name in names {
895 if let Some(id) = self.interner.get_optional(name)
896 && visited.insert(id)
897 {
898 queue.push_back((id, 0));
899 }
900 }
901 let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
902 for (&node, edges) in &*self.adjacency {
903 let nbrs: Vec<StrId> = edges.iter().map(|(to, _)| *to).collect();
904 adj.insert(node, nbrs);
905 }
906 while let Some((node, d)) = queue.pop_front() {
907 if d >= depth { continue; }
908 if let Some(nbrs) = adj.get(&node) {
909 for &nb in nbrs {
910 if visited.insert(nb) {
911 queue.push_back((nb, d + 1));
912 }
913 }
914 }
915 }
916 let mut entities: Vec<Entity> = Vec::with_capacity(visited.len());
917 for &nid in &visited {
918 if let Some(e) = self.entity_by_name_id(nid) {
919 entities.push(e);
920 }
921 }
922 let relations: Vec<Relation> = self
923 .relations
924 .iter()
925 .filter(|r| visited.contains(&r.from) && visited.contains(&r.to))
926 .map(|r| self.relation_to_output(r))
927 .collect();
928 Ok(KnowledgeGraphOut { entities, relations })
929 }
930
931 pub fn batch_get_entities(&self, names: &[String]) -> Vec<Option<Entity>> {
933 names.iter().map(|n| self.get_entity(n)).collect()
934 }
935
936 pub fn graph_stats(&self) -> serde_json::Value {
938 let entity_count = self
939 .entity_slots
940 .iter()
941 .filter(|s| s.is_some())
942 .count();
943 let relation_count = self.relations.len();
944 let type_counts = self.entity_type_counts();
945 let relation_type_counts = self.relation_type_counts();
946 serde_json::json!({
947 "entities": entity_count,
948 "relations": relation_count,
949 "entityTypes": type_counts,
950 "relationTypes": relation_type_counts,
951 })
952 }
953
954 pub fn search_relations(&self, from: Option<&str>, to: Option<&str>, rtype: Option<&str>) -> Vec<Relation> {
956 let from_id = from.and_then(|n| self.interner.get_optional(n));
957 let to_id = to.and_then(|n| self.interner.get_optional(n));
958 let rtype_id = rtype.and_then(|n| self.interner.get_optional(n));
959 self.relations
960 .iter()
961 .filter(|r| {
962 from_id.is_none_or(|id| r.from == id)
963 && to_id.is_none_or(|id| r.to == id)
964 && rtype_id.is_none_or(|id| r.relation_type == id)
965 })
966 .map(|r| self.relation_to_output(r))
967 .collect()
968 }
969
970 pub fn entity_type_counts(&self) -> Vec<(String, usize)> {
972 let mut counts: AHashMap<StrId, usize> = AHashMap::new();
973 for slot in self.entity_slots.iter() {
974 if let Some(e) = slot.as_ref() {
975 *counts.entry(e.entity_type).or_default() += 1;
976 }
977 }
978 let mut result: Vec<(String, usize)> = counts
979 .into_iter()
980 .map(|(id, c)| (self.interner.lookup(id).to_string(), c))
981 .collect();
982 result.sort_by(|a, b| a.0.cmp(&b.0));
983 result
984 }
985
986 pub fn relation_type_counts(&self) -> Vec<(String, usize)> {
988 let mut counts: AHashMap<StrId, usize> = AHashMap::new();
989 for r in self.relations.iter() {
990 *counts.entry(r.relation_type).or_default() += 1;
991 }
992 let mut result: Vec<(String, usize)> = counts
993 .into_iter()
994 .map(|(id, c)| (self.interner.lookup(id).to_string(), c))
995 .collect();
996 result.sort_by(|a, b| a.0.cmp(&b.0));
997 result
998 }
999
1000 pub fn export(&self, format: &str) -> Result<String> {
1002 match format {
1003 "json" => serde_json::to_string(&self.read_graph()).map_err(MCSError::JsonError),
1004 "mermaid" => Ok(self.export_mermaid()),
1005 "dot" => Ok(self.export_dot()),
1006 other => Err(MCSError::InvalidParams(format!(
1007 "Unknown export format '{other}' (expected json|mermaid|dot)"
1008 ))),
1009 }
1010 }
1011
1012 fn export_mermaid(&self) -> String {
1013 let mut out = String::with_capacity(4096);
1014 out.push_str("graph LR\n");
1015 for r in self.relations.iter() {
1016 let from = sanitize_label(self.interner.lookup(r.from));
1017 let to = sanitize_label(self.interner.lookup(r.to));
1018 let rt = sanitize_label(self.interner.lookup(r.relation_type));
1019 out.push_str(&format!(" {} -- \"{}\" --> {}\n", from, rt, to));
1020 }
1021 out
1022 }
1023
1024 fn export_dot(&self) -> String {
1025 let mut out = String::with_capacity(4096);
1026 out.push_str("digraph KG {\n");
1027 out.push_str(" rankdir=LR;\n");
1028 for slot in self.entity_slots.iter() {
1029 if let Some(e) = slot.as_ref() {
1030 let name = sanitize_label(self.interner.lookup(e.name));
1031 let etype = sanitize_label(self.interner.lookup(e.entity_type));
1032 out.push_str(&format!(" \"{}\" [label=\"{}\n({})\"];\n", name, name, etype));
1033 }
1034 }
1035 for r in self.relations.iter() {
1036 let from = sanitize_label(self.interner.lookup(r.from));
1037 let to = sanitize_label(self.interner.lookup(r.to));
1038 let rt = sanitize_label(self.interner.lookup(r.relation_type));
1039 out.push_str(&format!(" \"{}\" -> \"{}\" [label=\"{}\"];\n", from, to, rt));
1040 }
1041 out.push_str("}\n");
1042 out
1043 }
1044
1045 pub fn find_all_paths(
1047 &self,
1048 from: &str,
1049 to: &str,
1050 max_depth: usize,
1051 max_paths: usize,
1052 ) -> Result<Vec<Vec<String>>> {
1053 let from_id = self
1054 .interner
1055 .get_optional(from)
1056 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
1057 let to_id = self
1058 .interner
1059 .get_optional(to)
1060 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
1061 if self.lookup_live_slot(from).is_none() {
1062 return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
1063 }
1064 if self.lookup_live_slot(to).is_none() {
1065 return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
1066 }
1067 if from_id == to_id {
1068 return Ok(vec![vec![from.to_string()]]);
1069 }
1070 let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::with_capacity(self.adjacency.len());
1071 for (&node, edges) in &*self.adjacency {
1072 let nbrs: Vec<StrId> = edges.iter().map(|(to, _)| *to).collect();
1073 adj.insert(node, nbrs);
1074 }
1075 let mut all_paths: Vec<Vec<StrId>> = Vec::new();
1076 let mut current_path = Vec::new();
1077 let mut visited: AHashSet<StrId> = AHashSet::new();
1078 visited.insert(from_id);
1079 current_path.push(from_id);
1080 Self::dfs_all_paths(
1081 &adj,
1082 from_id,
1083 to_id,
1084 max_depth,
1085 max_paths,
1086 &mut visited,
1087 &mut current_path,
1088 &mut all_paths,
1089 );
1090 if all_paths.is_empty() {
1091 return Err(MCSError::MemoryError(format!(
1092 "No path found between '{from}' and '{to}'"
1093 )));
1094 }
1095 let result: Vec<Vec<String>> = all_paths
1096 .into_iter()
1097 .map(|path| {
1098 path.into_iter()
1099 .map(|id| self.interner.lookup(id).to_string())
1100 .collect()
1101 })
1102 .collect();
1103 Ok(result)
1104 }
1105
1106 fn dfs_all_paths(
1107 adj: &AHashMap<StrId, Vec<StrId>>,
1108 current: StrId,
1109 target: StrId,
1110 max_depth: usize,
1111 max_paths: usize,
1112 visited: &mut AHashSet<StrId>,
1113 current_path: &mut Vec<StrId>,
1114 all_paths: &mut Vec<Vec<StrId>>,
1115 ) {
1116 if all_paths.len() >= max_paths { return; }
1117 if current == target && current_path.len() > 1 {
1118 all_paths.push(current_path.clone());
1119 return;
1120 }
1121 if current_path.len() > max_depth { return; }
1122 if let Some(neighbors) = adj.get(¤t) {
1123 for &nb in neighbors {
1124 if !visited.contains(&nb) {
1125 visited.insert(nb);
1126 current_path.push(nb);
1127 Self::dfs_all_paths(adj, nb, target, max_depth, max_paths, visited, current_path, all_paths);
1128 current_path.pop();
1129 visited.remove(&nb);
1130 }
1131 }
1132 }
1133 }
1134
1135 pub fn search_entities(&self, query: &str) -> Result<Vec<Entity>> {
1137 let token = query.to_lowercase();
1138 let matching = self.search.search(&token, &self.interner);
1139 Ok(matching
1140 .iter()
1141 .filter_map(|idx| {
1142 self.entity_slots
1143 .get(*idx as usize)?
1144 .as_ref()
1145 .map(|e| self.entity_to_output(e))
1146 })
1147 .collect())
1148 }
1149}
1150
1151impl KnowledgeGraph {
1152 pub fn new(path: &Path) -> std::io::Result<Self> {
1153 Self::open(path, None)
1154 }
1155
1156 fn open(
1159 path: &Path,
1160 sync_slot: Option<Arc<arc_swap::ArcSwap<std::fs::File>>>,
1161 ) -> std::io::Result<Self> {
1162 let store = BinaryStore::new_with_slot(path, sync_slot)?;
1163
1164 let mut interner = StringInterner::with_capacity(65536, 1024);
1166 let mut entity_slots: Vec<Option<StoredEntity>> = Vec::with_capacity(256);
1167 let mut name_table = ShardedNameTable::new(64);
1168 let mut relations: Vec<StoredRelation> = Vec::with_capacity(64);
1169 let mut search = SearchIndex::new();
1170
1171 let mut pending: Option<Vec<(RecordKind, Vec<u8>)>> = None;
1176 store.replay(|kind, data| {
1177 match kind {
1178 RecordKind::TxnBegin => pending = Some(Vec::new()),
1179 RecordKind::TxnCommit => {
1180 if let Some(buffered) = pending.take() {
1181 for (k, d) in &buffered {
1182 Self::apply_record(
1183 *k, d, &mut interner, &mut entity_slots, &mut search,
1184 &mut name_table, &mut relations,
1185 );
1186 }
1187 }
1188 }
1189 other => match pending.as_mut() {
1190 Some(buffered) => buffered.push((other, data.to_vec())),
1191 None => Self::apply_record(
1192 other, data, &mut interner, &mut entity_slots, &mut search,
1193 &mut name_table, &mut relations,
1194 ),
1195 },
1196 }
1197 })?;
1198
1199 let free_slots: Vec<u32> = entity_slots
1201 .iter()
1202 .enumerate()
1203 .filter(|(_, s)| s.is_none())
1204 .map(|(i, _)| i as u32)
1205 .collect();
1206
1207 let mut adjacency: AHashMap<StrId, Vec<(StrId, StrId)>> = AHashMap::new();
1208 for rel in &relations {
1209 adjacency.entry(rel.from).or_default().push((rel.to, rel.relation_type));
1210 adjacency.entry(rel.to).or_default().push((rel.from, rel.relation_type));
1211 }
1212
1213 Ok(Self {
1214 interner,
1215 entity_slots,
1216 free_slots,
1217 name_table,
1218 relations,
1219 adjacency,
1220 search,
1221 store,
1222 })
1223 }
1224
1225 #[allow(clippy::too_many_arguments)]
1233 fn apply_record(
1234 kind: RecordKind,
1235 data: &[u8],
1236 interner: &mut StringInterner,
1237 entity_slots: &mut Vec<Option<StoredEntity>>,
1238 search: &mut SearchIndex,
1239 name_table: &mut ShardedNameTable,
1240 relations: &mut Vec<StoredRelation>,
1241 ) {
1242 match kind {
1243 RecordKind::CreateEntity => {
1244 if let Some((name, etype, obs)) = store_enc::decode_create_entity(data) {
1245 Self::replay_create_entity(
1246 interner, entity_slots, search, name_table, name, etype, &obs,
1247 );
1248 }
1249 }
1250 RecordKind::CreateRelation => {
1251 if let Some((from, to, rtype)) = store_enc::decode_create_relation(data) {
1252 let from_id = interner.intern(from);
1253 let to_id = interner.intern(to);
1254 let type_id = interner.intern(rtype);
1255 relations.push(StoredRelation {
1256 from: from_id,
1257 to: to_id,
1258 relation_type: type_id,
1259 });
1260 }
1261 }
1262 RecordKind::AddObservations => {
1263 if let Some((name, obs)) = store_enc::decode_add_observations(data) {
1264 Self::replay_add_observations(
1265 interner, entity_slots, search, name_table, name, &obs,
1266 );
1267 }
1268 }
1269 RecordKind::DeleteEntity => {
1270 if let Some(name) = store_enc::decode_delete_entity(data) {
1271 Self::replay_delete_entity(
1272 interner, entity_slots, relations, search, name_table, name,
1273 );
1274 }
1275 }
1276 RecordKind::DeleteObservations => {
1277 if let Some((name, obs)) = store_enc::decode_delete_observations(data) {
1278 Self::replay_delete_observations(
1279 interner, entity_slots, search, name_table, name, &obs,
1280 );
1281 }
1282 }
1283 RecordKind::DeleteRelation => {
1284 if let Some((from, to, rtype)) = store_enc::decode_delete_relation(data) {
1285 let from_id = interner.intern(from);
1286 let to_id = interner.intern(to);
1287 let type_id = interner.intern(rtype);
1288 relations.retain(|r| {
1289 !(r.from == from_id && r.to == to_id && r.relation_type == type_id)
1290 });
1291 }
1292 }
1293 RecordKind::TxnBegin | RecordKind::TxnCommit => {}
1294 }
1295 }
1296
1297 #[allow(clippy::ptr_arg)]
1298 fn replay_create_entity(
1299 interner: &mut StringInterner,
1300 entities: &mut Vec<Option<StoredEntity>>,
1301 search: &mut SearchIndex,
1302 name_table: &mut ShardedNameTable,
1303 name: &str,
1304 etype: &str,
1305 observations: &[&str],
1306 ) {
1307 let name_id = interner.intern(name);
1308 let type_id = interner.intern(etype);
1309 let obs_ids: Vec<StrId> = observations.iter().map(|o| interner.intern(o)).collect();
1310 let slot = entities.len() as u32;
1311 entities.push(Some(StoredEntity {
1312 name: name_id,
1313 entity_type: type_id,
1314 observations: obs_ids.clone(),
1315 }));
1316 let hash = interner.get_hash(name_id);
1317 name_table.insert(&*interner, hash, name_id, slot);
1318 search.index_entity(interner, slot, name_id, type_id, &obs_ids);
1319 }
1320
1321 fn replay_add_observations(
1322 interner: &mut StringInterner,
1323 entities: &mut [Option<StoredEntity>],
1324 search: &mut SearchIndex,
1325 name_table: &mut ShardedNameTable,
1326 name: &str,
1327 observations: &[&str],
1328 ) {
1329 let name_id = interner.intern(name);
1330 let hash = interner.get_hash(name_id);
1331 if let Some(slot) = name_table.lookup(hash, name_id)
1332 && let Some(Some(entity)) = entities.get_mut(slot as usize)
1333 {
1334 for &o in observations {
1335 let oid = interner.intern(o);
1336 if !entity.observations.contains(&oid) {
1337 entity.observations.push(oid);
1338 }
1339 }
1340 search.remove_entity(slot);
1341 search.index_entity(
1342 interner,
1343 slot,
1344 entity.name,
1345 entity.entity_type,
1346 &entity.observations,
1347 );
1348 }
1349 }
1350
1351 fn replay_delete_entity(
1352 interner: &mut StringInterner,
1353 entities: &mut [Option<StoredEntity>],
1354 rels: &mut Vec<StoredRelation>,
1355 search: &mut SearchIndex,
1356 name_table: &mut ShardedNameTable,
1357 name: &str,
1358 ) {
1359 let name_id = interner.intern(name);
1360 let hash = interner.get_hash(name_id);
1361 if let Some(slot) = name_table.lookup(hash, name_id)
1362 && let Some(Some(_)) = entities.get(slot as usize)
1363 {
1364 entities[slot as usize] = None;
1365 search.remove_entity(slot);
1366 name_table.remove(&*interner, hash, name_id);
1367 }
1368 rels.retain(|r| r.from != name_id && r.to != name_id);
1369 }
1370
1371 fn replay_delete_observations(
1372 interner: &mut StringInterner,
1373 entities: &mut [Option<StoredEntity>],
1374 search: &mut SearchIndex,
1375 name_table: &mut ShardedNameTable,
1376 name: &str,
1377 observations: &[&str],
1378 ) {
1379 let name_id = interner.intern(name);
1380 let hash = interner.get_hash(name_id);
1381 if let Some(slot) = name_table.lookup(hash, name_id)
1382 && let Some(Some(entity)) = entities.get_mut(slot as usize)
1383 {
1384 let remove_ids: Vec<StrId> = observations.iter().map(|o| interner.intern(o)).collect();
1385 entity.observations.retain(|o| !remove_ids.contains(o));
1386 search.remove_entity(slot);
1387 search.index_entity(
1388 interner,
1389 slot,
1390 entity.name,
1391 entity.entity_type,
1392 &entity.observations,
1393 );
1394 }
1395 }
1396
1397 pub const fn interner(&self) -> &StringInterner {
1402 &self.interner
1403 }
1404
1405 pub fn get_entity(&self, name: &str) -> Option<Entity> {
1407 let name_id = self.interner.get_optional(name)?;
1408 let hash = self.interner.get_hash(name_id);
1409 let slot = self.name_table.lookup(hash, name_id)?;
1410 let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
1411 Some(self.entity_to_output(stored))
1412 }
1413
1414 pub fn graph_stats(&self) -> serde_json::Value {
1416 let live_entities = self
1417 .entity_slots
1418 .iter()
1419 .filter(|s| s.is_some())
1420 .count();
1421 let total_relations = self.relations.len();
1422 let index_entries = self.search.len();
1423 let total_obs: usize = self
1424 .entity_slots
1425 .iter()
1426 .filter_map(|s| s.as_ref())
1427 .map(|e| e.observations.len())
1428 .sum();
1429
1430 serde_json::json!({
1431 "entities": live_entities,
1432 "relations": total_relations,
1433 "totalObservations": total_obs,
1434 "searchIndexEntries": index_entries,
1435 "internedStrings": self.interner.len(),
1436 "internedBytes": self.interner.total_bytes(),
1437 })
1438 }
1439
1440 pub fn search_relations(&self, from: Option<&str>, to: Option<&str>, rtype: Option<&str>) -> Vec<Relation> {
1444 let from_id = match from {
1445 Some(f) => match self.interner.get_optional(f) {
1446 Some(id) => Some(id),
1447 None => return Vec::new(),
1448 },
1449 None => None,
1450 };
1451 let to_id = match to {
1452 Some(t) => match self.interner.get_optional(t) {
1453 Some(id) => Some(id),
1454 None => return Vec::new(),
1455 },
1456 None => None,
1457 };
1458 let rtype_id = match rtype {
1459 Some(r) => match self.interner.get_optional(r) {
1460 Some(id) => Some(id),
1461 None => return Vec::new(),
1462 },
1463 None => None,
1464 };
1465
1466 self.relations
1467 .iter()
1468 .filter(|r| {
1469 from_id.is_none_or(|f| r.from == f)
1470 && to_id.is_none_or(|t| r.to == t)
1471 && rtype_id.is_none_or(|rt| r.relation_type == rt)
1472 })
1473 .map(|r| Relation {
1474 from: self.interner.lookup(r.from).to_string(),
1475 to: self.interner.lookup(r.to).to_string(),
1476 relation_type: self.interner.lookup(r.relation_type).to_string(),
1477 })
1478 .collect()
1479 }
1480
1481 pub fn find_path(&self, from: &str, to: &str) -> Result<Vec<String>> {
1484 let from_id = self.interner.get_optional(from)
1485 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
1486 let to_id = self.interner.get_optional(to)
1487 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
1488 let hash_from = self.interner.get_hash(from_id);
1489 let hash_to = self.interner.get_hash(to_id);
1490
1491 if self.name_table.lookup(hash_from, from_id).is_none() {
1492 return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
1493 }
1494 if self.name_table.lookup(hash_to, to_id).is_none() {
1495 return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
1496 }
1497 if from_id == to_id {
1498 return Ok(vec![from.to_string()]);
1499 }
1500
1501 let mut visited: AHashSet<StrId> = AHashSet::new();
1503 let mut parent: AHashMap<StrId, StrId> = AHashMap::new();
1504 let mut queue: VecDeque<StrId> = VecDeque::new();
1505
1506 visited.insert(from_id);
1507 queue.push_back(from_id);
1508
1509 while let Some(current) = queue.pop_front() {
1510 if current == to_id {
1511 break;
1512 }
1513
1514 if let Some(neighbors) = self.adjacency.get(¤t) {
1515 for &(neighbor, _) in neighbors {
1516 if visited.insert(neighbor) {
1517 parent.insert(neighbor, current);
1518 queue.push_back(neighbor);
1519 }
1520 }
1521 }
1522 }
1523
1524 if !parent.contains_key(&to_id) && from_id != to_id {
1525 return Err(MCSError::MemoryError(format!(
1526 "No path found between '{from}' and '{to}'"
1527 )));
1528 }
1529
1530 let mut path: Vec<String> = Vec::new();
1532 let mut cur = to_id;
1533 loop {
1534 path.push(self.interner.lookup(cur).to_string());
1535 if cur == from_id {
1536 break;
1537 }
1538 cur = *parent.get(&cur).ok_or_else(|| {
1539 MCSError::MemoryError("Path reconstruction failed".into())
1540 })?;
1541 }
1542 path.reverse();
1543 Ok(path)
1544 }
1545
1546 pub fn compact(&mut self) -> Result<()> {
1551 let mut create_entities: Vec<Entity> = Vec::new();
1553 let mut create_relations: Vec<Relation> = Vec::new();
1554
1555 for slot in &self.entity_slots {
1556 if let Some(stored) = slot.as_ref() {
1557 create_entities.push(self.entity_to_output(stored));
1558 }
1559 }
1560 for rel in &self.relations {
1561 create_relations.push(Relation {
1562 from: self.interner.lookup(rel.from).to_string(),
1563 to: self.interner.lookup(rel.to).to_string(),
1564 relation_type: self.interner.lookup(rel.relation_type).to_string(),
1565 });
1566 }
1567
1568 let tmp_path = self.store.path().with_extension("tmp");
1575 if let Err(e) = std::fs::remove_file(&tmp_path)
1576 && e.kind() != std::io::ErrorKind::NotFound
1577 {
1578 return Err(MCSError::IoError(e));
1579 }
1580 let mut tmp_store = BinaryStore::new(&tmp_path).map_err(MCSError::IoError)?;
1581 for entity in &create_entities {
1582 let mut buf = Vec::new();
1583 store_enc::encode_create_entity(&mut buf, &entity.name, &entity.entity_type, &entity.observations)
1584 .map_err(MCSError::IoError)?;
1585 tmp_store.write_record(RecordKind::CreateEntity, &buf).map_err(MCSError::IoError)?;
1586 }
1587 for relation in &create_relations {
1588 let mut buf = Vec::new();
1589 store_enc::encode_create_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
1590 .map_err(MCSError::IoError)?;
1591 tmp_store.write_record(RecordKind::CreateRelation, &buf).map_err(MCSError::IoError)?;
1592 }
1593 tmp_store.flush_and_sync().map_err(MCSError::IoError)?;
1594 drop(tmp_store);
1595
1596 std::fs::rename(&tmp_path, self.store.path()).map_err(MCSError::IoError)?;
1601 sync_parent_dir(self.store.path()).map_err(MCSError::IoError)?;
1602
1603 let path = self.store.path().clone();
1609 let sync_slot = Arc::clone(&self.store.sync_slot);
1612 *self = KnowledgeGraph::open(&path, Some(sync_slot)).map_err(MCSError::IoError)?;
1613
1614 Ok(())
1615 }
1616
1617 pub fn compact_if_needed(&mut self) -> Result<()> {
1620 let total = self.entity_slots.len();
1624 let tombstones = self.free_slots.len();
1625 if total > 16 && tombstones * 10 > total * 3 {
1626 self.compact()?;
1627 }
1628 Ok(())
1629 }
1630
1631 pub fn create_entities(&mut self, entities: &[Entity]) -> Result<Vec<Entity>> {
1634 for entity in entities {
1636 if entity.name.is_empty() {
1637 return Err(MCSError::InvalidParams(
1638 "Entity name must not be empty".into(),
1639 ));
1640 }
1641 }
1642 let mut records: Vec<(RecordKind, Vec<u8>)> = Vec::new();
1643 let mut created = Vec::new();
1644 for entity in entities {
1645 let existing = self.interner.get_optional(&entity.name)
1646 .and_then(|id| {
1647 let hash = self.interner.get_hash(id);
1648 self.name_table.lookup(hash, id)
1649 });
1650 if existing.is_some() {
1651 continue;
1652 }
1653 let mut buf = Vec::new();
1654 store_enc::encode_create_entity(&mut buf, &entity.name, &entity.entity_type, &entity.observations)
1655 .map_err(MCSError::IoError)?;
1656 records.push((RecordKind::CreateEntity, buf));
1657 created.push(entity.clone());
1658 }
1659 if records.is_empty() {
1660 return Ok(created);
1661 }
1662 self.store.write_record(RecordKind::TxnBegin, &[]).map_err(MCSError::IoError)?;
1664 for (kind, data) in &records {
1665 self.store.write_record(*kind, data).map_err(MCSError::IoError)?;
1666 }
1667 self.store.write_record(RecordKind::TxnCommit, &[]).map_err(MCSError::IoError)?;
1668
1669 for entity in &created {
1671 let name_id = self.interner.intern(&entity.name);
1672 let hash = self.interner.get_hash(name_id);
1673 let type_id = self.interner.intern(&entity.entity_type);
1674 let obs_ids: Vec<StrId> = entity
1675 .observations
1676 .iter()
1677 .map(|o| self.interner.intern(o))
1678 .collect();
1679 let reused = self.free_slots.pop();
1680 let slot = reused.unwrap_or(self.entity_slots.len() as u32);
1681 self.search
1682 .index_entity(&mut self.interner, slot, name_id, type_id, &obs_ids);
1683 let stored = Some(StoredEntity {
1684 name: name_id,
1685 entity_type: type_id,
1686 observations: obs_ids,
1687 });
1688 match reused {
1689 Some(s) => self.entity_slots[s as usize] = stored,
1690 None => self.entity_slots.push(stored),
1691 }
1692 self.name_table.insert(&self.interner, hash, name_id, slot);
1693 }
1694 Ok(created)
1695 }
1696
1697 pub fn create_relations(&mut self, relations: &[Relation]) -> Result<Vec<Relation>> {
1698 for relation in relations {
1700 if relation.from.is_empty() || relation.to.is_empty() {
1701 return Err(MCSError::InvalidParams(
1702 "Relation endpoints must not be empty".into(),
1703 ));
1704 }
1705 }
1706 let mut rel_set: AHashSet<(StrId, StrId, StrId)> = AHashSet::new();
1708 for rel in &self.relations {
1709 rel_set.insert((rel.from, rel.to, rel.relation_type));
1710 }
1711 let mut records: Vec<(RecordKind, Vec<u8>)> = Vec::new();
1712 let mut interned: Vec<StoredRelation> = Vec::new();
1713 for relation in relations {
1714 let from_id = self.interner.intern(&relation.from);
1715 let to_id = self.interner.intern(&relation.to);
1716 let type_id = self.interner.intern(&relation.relation_type);
1717 if !rel_set.insert((from_id, to_id, type_id)) {
1718 continue;
1719 }
1720 let mut buf = Vec::new();
1721 store_enc::encode_create_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
1722 .map_err(MCSError::IoError)?;
1723 records.push((RecordKind::CreateRelation, buf));
1724 interned.push(StoredRelation {
1725 from: from_id,
1726 to: to_id,
1727 relation_type: type_id,
1728 });
1729 }
1730 if records.is_empty() {
1731 return Ok(Vec::new());
1732 }
1733 self.store.write_record(RecordKind::TxnBegin, &[]).map_err(MCSError::IoError)?;
1735 for (kind, data) in &records {
1736 self.store.write_record(*kind, data).map_err(MCSError::IoError)?;
1737 }
1738 self.store.write_record(RecordKind::TxnCommit, &[]).map_err(MCSError::IoError)?;
1739
1740 let mut created = Vec::new();
1742 for sr in &interned {
1743 self.relations.push(StoredRelation {
1744 from: sr.from,
1745 to: sr.to,
1746 relation_type: sr.relation_type,
1747 });
1748 self.adjacency.entry(sr.from).or_default().push((sr.to, sr.relation_type));
1749 self.adjacency.entry(sr.to).or_default().push((sr.from, sr.relation_type));
1750 created.push(Relation {
1751 from: self.interner.lookup(sr.from).to_string(),
1752 to: self.interner.lookup(sr.to).to_string(),
1753 relation_type: self.interner.lookup(sr.relation_type).to_string(),
1754 });
1755 }
1756 Ok(created)
1757 }
1758
1759 pub fn add_observations(&mut self, entity_name: &str, contents: &[String]) -> Result<Vec<String>> {
1760 let name_id = self.interner.get_optional(entity_name)
1761 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1762 let hash = self.interner.get_hash(name_id);
1763 let slot = self
1764 .name_table
1765 .lookup(hash, name_id)
1766 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1767 let existing: AHashSet<StrId> = self
1770 .entity_slots
1771 .get(slot as usize)
1772 .and_then(|e| e.as_ref())
1773 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?
1774 .observations
1775 .iter()
1776 .copied()
1777 .collect();
1778
1779 let mut added = Vec::new();
1782 let mut interned_added = Vec::new();
1783 let mut seen: AHashSet<StrId> = AHashSet::new();
1784 for content in contents {
1785 let cid = self.interner.intern(content);
1786 if existing.contains(&cid) || !seen.insert(cid) {
1787 continue;
1788 }
1789 interned_added.push(cid);
1790 added.push(content.clone());
1791 }
1792 if added.is_empty() {
1793 return Ok(added);
1794 }
1795
1796 let mut buf = Vec::new();
1799 store_enc::encode_add_observations(&mut buf, entity_name, &added)
1800 .map_err(MCSError::IoError)?;
1801 self.store.write_record(RecordKind::AddObservations, &buf)
1802 .map_err(MCSError::IoError)?;
1803
1804 let stored = self
1806 .entity_slots
1807 .get_mut(slot as usize)
1808 .and_then(|e| e.as_mut())
1809 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1810 stored.observations.extend_from_slice(&interned_added);
1811
1812 self.search
1815 .index_additional(&mut self.interner, slot, &interned_added);
1816 Ok(added)
1817 }
1818
1819 pub fn delete_entities(&mut self, entity_names: &[String]) -> Result<()> {
1820 let mut records: Vec<(RecordKind, Vec<u8>)> = Vec::new();
1821 let mut deleted_names = Vec::new();
1822 for name in entity_names {
1823 let name_id_opt = self.interner.get_optional(name);
1824 if let Some(name_id) = name_id_opt {
1825 let hash = self.interner.get_hash(name_id);
1826 if let Some(slot) = self.name_table.lookup(hash, name_id)
1827 && let Some(Some(_)) = self.entity_slots.get(slot as usize)
1828 {
1829 let mut buf = Vec::new();
1830 store_enc::encode_delete_entity(&mut buf, name)
1831 .map_err(MCSError::IoError)?;
1832 records.push((RecordKind::DeleteEntity, buf));
1833 deleted_names.push((name.clone(), name_id, hash, slot));
1834 }
1835 }
1836 }
1837 if records.is_empty() {
1838 return Ok(());
1839 }
1840 self.store.write_record(RecordKind::TxnBegin, &[]).map_err(MCSError::IoError)?;
1842 for (kind, data) in &records {
1843 self.store.write_record(*kind, data).map_err(MCSError::IoError)?;
1844 }
1845 self.store.write_record(RecordKind::TxnCommit, &[]).map_err(MCSError::IoError)?;
1846
1847 for (_name, _name_id, _hash, slot) in &deleted_names {
1849 self.entity_slots[*slot as usize] = None;
1850 self.free_slots.push(*slot);
1851 self.search.remove_entity(*slot);
1852 self.name_table.remove(&self.interner, *_hash, *_name_id);
1853 }
1854 let deleted_ids: AHashSet<StrId> = deleted_names.iter()
1855 .map(|(_, id, _, _)| *id)
1856 .collect();
1857 self.relations
1858 .retain(|r| !deleted_ids.contains(&r.from) && !deleted_ids.contains(&r.to));
1859 for id in &deleted_ids {
1860 self.adjacency.remove(id);
1861 }
1862 for list in self.adjacency.values_mut() {
1863 list.retain(|(to, _)| !deleted_ids.contains(to));
1864 }
1865 self.compact_if_needed()?;
1866 Ok(())
1867 }
1868
1869 pub fn delete_observations(&mut self, entity_name: &str, observations: &[String]) -> Result<()> {
1870 let name_id = self.interner.get_optional(entity_name)
1871 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1872 let hash = self.interner.get_hash(name_id);
1873 let slot = self
1874 .name_table
1875 .lookup(hash, name_id)
1876 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1877 self.entity_slots
1879 .get(slot as usize)
1880 .and_then(|e| e.as_ref())
1881 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1882 let remove_ids: AHashSet<StrId> = observations.iter().map(|o| self.interner.intern(o)).collect();
1883
1884 let mut buf = Vec::new();
1886 store_enc::encode_delete_observations(&mut buf, entity_name, observations)
1887 .map_err(MCSError::IoError)?;
1888 self.store.write_record(RecordKind::DeleteObservations, &buf)
1889 .map_err(MCSError::IoError)?;
1890
1891 let stored = self
1893 .entity_slots
1894 .get_mut(slot as usize)
1895 .and_then(|e| e.as_mut())
1896 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
1897 stored.observations.retain(|o| !remove_ids.contains(o));
1898 self.search.remove_entity(slot);
1899 self.search
1900 .index_entity(&mut self.interner, slot, stored.name, stored.entity_type, &stored.observations);
1901 Ok(())
1902 }
1903
1904 pub fn delete_relations(&mut self, relations: &[Relation]) -> Result<()> {
1905 let rels: AHashSet<(StrId, StrId, StrId)> = relations
1907 .iter()
1908 .map(|r| {
1909 (
1910 self.interner.intern(&r.from),
1911 self.interner.intern(&r.to),
1912 self.interner.intern(&r.relation_type),
1913 )
1914 })
1915 .collect();
1916 if rels.is_empty() {
1917 return Ok(());
1918 }
1919 let mut records: Vec<(RecordKind, Vec<u8>)> = Vec::new();
1921 for relation in relations {
1922 let mut buf = Vec::new();
1923 store_enc::encode_delete_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
1924 .map_err(MCSError::IoError)?;
1925 records.push((RecordKind::DeleteRelation, buf));
1926 }
1927 self.store.write_record(RecordKind::TxnBegin, &[]).map_err(MCSError::IoError)?;
1928 for (kind, data) in &records {
1929 self.store.write_record(*kind, data).map_err(MCSError::IoError)?;
1930 }
1931 self.store.write_record(RecordKind::TxnCommit, &[]).map_err(MCSError::IoError)?;
1932
1933 self.relations
1935 .retain(|r| !rels.contains(&(r.from, r.to, r.relation_type)));
1936 for (f, t, rt) in &rels {
1937 if let Some(edges) = self.adjacency.get_mut(f) {
1938 edges.retain(|(to, rtype)| to != t || rtype != rt);
1939 if edges.is_empty() {
1940 self.adjacency.remove(f);
1941 }
1942 }
1943 if let Some(edges) = self.adjacency.get_mut(t) {
1944 edges.retain(|(to, rtype)| to != f || rtype != rt);
1945 if edges.is_empty() {
1946 self.adjacency.remove(t);
1947 }
1948 }
1949 }
1950 Ok(())
1951 }
1952
1953 pub fn read_graph(&self) -> KnowledgeGraphOut {
1954 self.read_graph_view().to_owned_out()
1955 }
1956
1957 pub fn read_graph_view(&self) -> GraphView<'_> {
1961 let entities: Vec<&StoredEntity> = self
1962 .entity_slots
1963 .iter()
1964 .filter_map(|s| s.as_ref())
1965 .collect();
1966 let relations: Vec<&StoredRelation> = self.relations.iter().collect();
1967 GraphView { kg: self, entities, relations }
1968 }
1969
1970 pub fn search_nodes(&self, query: &str) -> KnowledgeGraphOut {
1973 self.search_nodes_filtered(query, None, 0, usize::MAX)
1974 }
1975
1976 pub fn open_nodes(&self, names: &[String]) -> KnowledgeGraphOut {
1977 self.open_nodes_view(names).to_owned_out()
1978 }
1979
1980 pub fn open_nodes_view(&self, names: &[String]) -> GraphView<'_> {
1982 let name_ids: AHashSet<StrId> = names.iter()
1983 .filter_map(|n| self.interner.get_optional(n))
1984 .collect();
1985 let entities: Vec<&StoredEntity> = self
1986 .entity_slots
1987 .iter()
1988 .filter_map(|s| {
1989 s.as_ref()
1990 .filter(|stored| name_ids.contains(&stored.name))
1991 })
1992 .collect();
1993 let matched_names: AHashSet<StrId> = entities.iter().map(|e| e.name).collect();
1994 let relations: Vec<&StoredRelation> = self
1995 .relations
1996 .iter()
1997 .filter(|r| matched_names.contains(&r.from) || matched_names.contains(&r.to))
1998 .collect();
1999 GraphView { kg: self, entities, relations }
2000 }
2001
2002 fn entity_to_output(&self, stored: &StoredEntity) -> Entity {
2007 Entity {
2008 name: self.interner.lookup(stored.name).to_string(),
2009 entity_type: self.interner.lookup(stored.entity_type).to_string(),
2010 observations: stored
2011 .observations
2012 .iter()
2013 .map(|o| self.interner.lookup(*o).to_string())
2014 .collect(),
2015 }
2016 }
2017
2018 #[inline]
2019 fn relation_to_output(&self, r: &StoredRelation) -> Relation {
2020 Relation {
2021 from: self.interner.lookup(r.from).to_string(),
2022 to: self.interner.lookup(r.to).to_string(),
2023 relation_type: self.interner.lookup(r.relation_type).to_string(),
2024 }
2025 }
2026
2027 fn lookup_live_slot(&self, name: &str) -> Option<u32> {
2029 let name_id = self.interner.get_optional(name)?;
2030 let hash = self.interner.get_hash(name_id);
2031 let slot = self.name_table.lookup(hash, name_id)?;
2032 self.entity_slots.get(slot as usize)?.as_ref()?;
2033 Some(slot)
2034 }
2035
2036 fn entity_by_name_id(&self, name_id: StrId) -> Option<Entity> {
2038 let hash = self.interner.get_hash(name_id);
2039 let slot = self.name_table.lookup(hash, name_id)?;
2040 let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
2041 Some(self.entity_to_output(stored))
2042 }
2043
2044 pub fn entity_type_counts(&self) -> Vec<(String, usize)> {
2048 let mut counts: AHashMap<StrId, usize> = AHashMap::new();
2049 for st in self
2050 .entity_slots
2051 .iter()
2052 .filter_map(|s| s.as_ref())
2053 {
2054 *counts.entry(st.entity_type).or_insert(0) += 1;
2055 }
2056 self.rank_counts(counts)
2057 }
2058
2059 pub fn relation_type_counts(&self) -> Vec<(String, usize)> {
2061 let mut counts: AHashMap<StrId, usize> = AHashMap::new();
2062 for r in &self.relations {
2063 *counts.entry(r.relation_type).or_insert(0) += 1;
2064 }
2065 self.rank_counts(counts)
2066 }
2067
2068 fn rank_counts(&self, counts: AHashMap<StrId, usize>) -> Vec<(String, usize)> {
2069 let mut out: Vec<(String, usize)> = counts
2070 .into_iter()
2071 .map(|(id, c)| (self.interner.lookup(id).to_string(), c))
2072 .collect();
2073 out.sort_unstable_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
2074 out
2075 }
2076
2077 pub fn search_nodes_filtered(
2081 &self,
2082 query: &str,
2083 entity_type: Option<&str>,
2084 offset: usize,
2085 limit: usize,
2086 ) -> KnowledgeGraphOut {
2087 self.search_nodes_view(query, entity_type, offset, limit).to_owned_out()
2088 }
2089
2090 pub fn search_nodes_view(
2092 &self,
2093 query: &str,
2094 entity_type: Option<&str>,
2095 offset: usize,
2096 limit: usize,
2097 ) -> GraphView<'_> {
2098 let type_id = match entity_type {
2099 Some(t) => match self.interner.get_optional(t) {
2100 Some(id) => Some(id),
2101 None => return GraphView { kg: self, entities: Vec::new(), relations: Vec::new() },
2102 },
2103 None => None,
2104 };
2105
2106 let ranked = self.search.search_ranked(query, &self.interner);
2107 let mut selected: AHashSet<StrId> = AHashSet::new();
2108 let mut entities: Vec<&StoredEntity> = Vec::new();
2109 let mut skipped = 0usize;
2110 for (slot, _score) in ranked {
2111 let Some(st) = self
2112 .entity_slots
2113 .get(slot as usize)
2114 .and_then(|s| s.as_ref())
2115
2116 else {
2117 continue;
2118 };
2119 if type_id.is_some_and(|tid| st.entity_type != tid) {
2120 continue;
2121 }
2122 if skipped < offset {
2123 skipped += 1;
2124 continue;
2125 }
2126 if entities.len() >= limit {
2127 break;
2128 }
2129 selected.insert(st.name);
2130 entities.push(st);
2131 }
2132
2133 let relations: Vec<&StoredRelation> = self
2134 .relations
2135 .iter()
2136 .filter(|r| selected.contains(&r.from) || selected.contains(&r.to))
2137 .collect();
2138 GraphView { kg: self, entities, relations }
2139 }
2140
2141 pub fn read_graph_filtered(
2145 &self,
2146 entity_type: Option<&str>,
2147 offset: usize,
2148 limit: usize,
2149 ) -> KnowledgeGraphOut {
2150 self.read_graph_filtered_view(entity_type, offset, limit).to_owned_out()
2151 }
2152
2153 pub fn read_graph_filtered_view(
2155 &self,
2156 entity_type: Option<&str>,
2157 offset: usize,
2158 limit: usize,
2159 ) -> GraphView<'_> {
2160 let type_id = match entity_type {
2161 Some(t) => match self.interner.get_optional(t) {
2162 Some(id) => Some(id),
2163 None => return GraphView { kg: self, entities: Vec::new(), relations: Vec::new() },
2164 },
2165 None => None,
2166 };
2167
2168 let mut selected: AHashSet<StrId> = AHashSet::new();
2169 let mut entities: Vec<&StoredEntity> = Vec::new();
2170 let mut skipped = 0usize;
2171 for st in self
2172 .entity_slots
2173 .iter()
2174 .filter_map(|s| s.as_ref())
2175 {
2176 if type_id.is_some_and(|tid| st.entity_type != tid) {
2177 continue;
2178 }
2179 if skipped < offset {
2180 skipped += 1;
2181 continue;
2182 }
2183 if entities.len() >= limit {
2184 break;
2185 }
2186 selected.insert(st.name);
2187 entities.push(st);
2188 }
2189
2190 let relations: Vec<&StoredRelation> = self
2191 .relations
2192 .iter()
2193 .filter(|r| selected.contains(&r.from) && selected.contains(&r.to))
2194 .collect();
2195 GraphView { kg: self, entities, relations }
2196 }
2197
2198 pub fn neighbors(
2206 &self,
2207 name: &str,
2208 direction: Direction,
2209 rtype: Option<&str>,
2210 depth: u32,
2211 ) -> Result<KnowledgeGraphOut> {
2212 self.lookup_live_slot(name)
2213 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
2214 let start = self.interner.get_optional(name).unwrap();
2216
2217 let rtype_id = match rtype {
2219 Some(r) => match self.interner.get_optional(r) {
2220 Some(id) => Some(id),
2221 None => {
2222 let entities = self.entity_by_name_id(start).into_iter().collect();
2223 return Ok(KnowledgeGraphOut { entities, relations: Vec::new() });
2224 }
2225 },
2226 None => None,
2227 };
2228
2229 let mut visited: AHashSet<StrId> = AHashSet::new();
2230 visited.insert(start);
2231
2232 let type_ok = |r: &StoredRelation| rtype_id.is_none_or(|rt| r.relation_type == rt);
2233
2234 if depth == 1 {
2235 for r in self.relations.iter().filter(|r| type_ok(r)) {
2236 match direction {
2237 Direction::Out => {
2238 if r.from == start {
2239 visited.insert(r.to);
2240 }
2241 }
2242 Direction::In => {
2243 if r.to == start {
2244 visited.insert(r.from);
2245 }
2246 }
2247 Direction::Both => {
2248 if r.from == start {
2249 visited.insert(r.to);
2250 } else if r.to == start {
2251 visited.insert(r.from);
2252 }
2253 }
2254 }
2255 }
2256 } else if depth >= 2 {
2257 let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
2261 match direction {
2262 Direction::Both => {
2263 for (&node, edges) in &self.adjacency {
2264 for &(nb, rt) in edges {
2265 if rtype_id.is_none_or(|rt_id| rt == rt_id) {
2266 adj.entry(node).or_default().push(nb);
2267 }
2268 }
2269 }
2270 }
2271 Direction::Out | Direction::In => {
2272 for r in self.relations.iter().filter(|r| type_ok(r)) {
2273 match direction {
2274 Direction::Out => adj.entry(r.from).or_default().push(r.to),
2275 Direction::In => adj.entry(r.to).or_default().push(r.from),
2276 _ => unreachable!(),
2277 }
2278 }
2279 }
2280 }
2281 let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
2282 queue.push_back((start, 0));
2283 while let Some((node, d)) = queue.pop_front() {
2284 if d >= depth {
2285 continue;
2286 }
2287 if let Some(nbrs) = adj.get(&node) {
2288 for &nb in nbrs {
2289 if visited.insert(nb) {
2290 queue.push_back((nb, d + 1));
2291 }
2292 }
2293 }
2294 }
2295 }
2296
2297 let mut entities = Vec::with_capacity(visited.len());
2298 for &nid in &visited {
2299 if let Some(e) = self.entity_by_name_id(nid) {
2300 entities.push(e);
2301 }
2302 }
2303 let relations = self
2304 .relations
2305 .iter()
2306 .filter(|r| type_ok(r) && visited.contains(&r.from) && visited.contains(&r.to))
2307 .map(|r| self.relation_to_output(r))
2308 .collect();
2309 Ok(KnowledgeGraphOut { entities, relations })
2310 }
2311
2312 pub fn describe_entity(&self, name: &str) -> Result<serde_json::Value> {
2316 let name_id = self
2317 .interner
2318 .get_optional(name)
2319 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
2320 let entity = self
2321 .entity_by_name_id(name_id)
2322 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
2323
2324 let mut incident: Vec<Relation> = Vec::new();
2325 let mut neighbor_seen: AHashSet<StrId> = AHashSet::new();
2326 let mut neighbors: Vec<&str> = Vec::new();
2327 for r in &self.relations {
2328 if r.from == name_id || r.to == name_id {
2329 incident.push(self.relation_to_output(r));
2330 let other = if r.from == name_id { r.to } else { r.from };
2331 if other != name_id && neighbor_seen.insert(other) {
2332 neighbors.push(self.interner.lookup(other));
2333 }
2334 }
2335 }
2336
2337 Ok(serde_json::json!({
2338 "entity": entity,
2339 "relations": incident,
2340 "neighbors": neighbors,
2341 "degree": incident.len(),
2342 }))
2343 }
2344
2345 pub fn upsert_entities(&mut self, entities: &[Entity]) -> Result<Vec<serde_json::Value>> {
2350 for e in entities {
2351 if e.name.is_empty() {
2352 return Err(MCSError::InvalidParams(
2353 "Entity name must not be empty".into(),
2354 ));
2355 }
2356 }
2357 let mut out = Vec::with_capacity(entities.len());
2358 for e in entities {
2359 if self.lookup_live_slot(&e.name).is_some() {
2360 let added = self.add_observations(&e.name, &e.observations)?;
2361 out.push(serde_json::json!({
2362 "name": e.name,
2363 "created": false,
2364 "addedObservations": added,
2365 }));
2366 } else {
2367 let created = self.create_entities(std::slice::from_ref(e))?;
2368 out.push(serde_json::json!({
2369 "name": e.name,
2370 "created": !created.is_empty(),
2371 "addedObservations": e.observations,
2372 }));
2373 }
2374 }
2375 Ok(out)
2376 }
2377
2378 pub fn export(&self, format: &str) -> Result<String> {
2380 match format {
2381 "json" => serde_json::to_string(&self.read_graph()).map_err(MCSError::JsonError),
2382 "mermaid" => Ok(self.export_mermaid()),
2383 "dot" => Ok(self.export_dot()),
2384 other => Err(MCSError::InvalidParams(format!(
2385 "Unknown export format '{other}' (expected json|mermaid|dot)"
2386 ))),
2387 }
2388 }
2389
2390 fn diagram_node_ids(&self) -> (AHashMap<StrId, usize>, Vec<(usize, StrId)>) {
2392 let mut ids: AHashMap<StrId, usize> = AHashMap::new();
2393 let mut order: Vec<(usize, StrId)> = Vec::new();
2394 for st in self
2395 .entity_slots
2396 .iter()
2397 .filter_map(|s| s.as_ref())
2398 {
2399 let n = ids.len();
2400 ids.insert(st.name, n);
2401 order.push((n, st.name));
2402 }
2403 (ids, order)
2404 }
2405
2406 fn export_mermaid(&self) -> String {
2407 let (ids, order) = self.diagram_node_ids();
2408 let mut s = String::with_capacity(64 + order.len() * 32 + self.relations.len() * 32);
2409 s.push_str("graph LR\n");
2410 for (n, name_id) in &order {
2411 let label = sanitize_label(self.interner.lookup(*name_id));
2412 s.push_str(&format!(" n{n}[\"{label}\"]\n"));
2413 }
2414 for r in &self.relations {
2415 if let (Some(&a), Some(&b)) = (ids.get(&r.from), ids.get(&r.to)) {
2416 let rel = sanitize_label(self.interner.lookup(r.relation_type));
2417 s.push_str(&format!(" n{a} -->|{rel}| n{b}\n"));
2418 }
2419 }
2420 s
2421 }
2422
2423 fn export_dot(&self) -> String {
2424 let (ids, order) = self.diagram_node_ids();
2425 let mut s = String::with_capacity(64 + order.len() * 32 + self.relations.len() * 32);
2426 s.push_str("digraph G {\n");
2427 for (n, name_id) in &order {
2428 let label = sanitize_label(self.interner.lookup(*name_id));
2429 s.push_str(&format!(" n{n} [label=\"{label}\"];\n"));
2430 }
2431 for r in &self.relations {
2432 if let (Some(&a), Some(&b)) = (ids.get(&r.from), ids.get(&r.to)) {
2433 let rel = sanitize_label(self.interner.lookup(r.relation_type));
2434 s.push_str(&format!(" n{a} -> n{b} [label=\"{rel}\"];\n"));
2435 }
2436 }
2437 s.push_str("}\n");
2438 s
2439 }
2440
2441 pub fn merge_entities(&mut self, source: &str, target: &str) -> Result<serde_json::Value> {
2454 if source == target {
2455 return Err(MCSError::InvalidParams(
2456 "Source and target must be different entities".into(),
2457 ));
2458 }
2459 self.lookup_live_slot(source).ok_or_else(|| {
2460 MCSError::InvalidParams(format!("Source entity '{source}' not found"))
2461 })?;
2462 let target_slot = self.lookup_live_slot(target).ok_or_else(|| {
2463 MCSError::InvalidParams(format!("Target entity '{target}' not found"))
2464 })?;
2465
2466 let source_entity = self.get_entity(source).unwrap();
2467 let moved_obs_count = source_entity.observations.len();
2468 let source_id = self.interner.get_optional(source).unwrap();
2469 let target_id = self.interner.get_optional(target).unwrap();
2470
2471 let target_existing: AHashSet<StrId> = self.entity_slots[target_slot as usize]
2474 .as_ref()
2475 .unwrap()
2476 .observations
2477 .iter()
2478 .copied()
2479 .collect();
2480 let mut obs_seen: AHashSet<StrId> = AHashSet::new();
2481 let mut obs_to_add: Vec<String> = Vec::new();
2482 for o in &source_entity.observations {
2483 if let Some(oid) = self.interner.get_optional(o)
2484 && !target_existing.contains(&oid)
2485 && obs_seen.insert(oid)
2486 {
2487 obs_to_add.push(o.clone());
2488 }
2489 }
2490
2491 let existing_rels: AHashSet<(StrId, StrId, StrId)> =
2494 self.relations.iter().map(|r| (r.from, r.to, r.relation_type)).collect();
2495 let mut rel_seen: AHashSet<(StrId, StrId, StrId)> = AHashSet::new();
2496 let mut redirect: Vec<Relation> = Vec::new();
2497 for r in &self.relations {
2498 if r.from != source_id && r.to != source_id {
2499 continue;
2500 }
2501 let new_from = if r.from == source_id { target_id } else { r.from };
2502 let new_to = if r.to == source_id { target_id } else { r.to };
2503 if new_from == new_to {
2504 continue; }
2506 let key = (new_from, new_to, r.relation_type);
2507 if existing_rels.contains(&key) || !rel_seen.insert(key) {
2508 continue;
2509 }
2510 redirect.push(Relation {
2511 from: self.interner.lookup(new_from).to_string(),
2512 to: self.interner.lookup(new_to).to_string(),
2513 relation_type: self.interner.lookup(r.relation_type).to_string(),
2514 });
2515 }
2516
2517 let added_count = obs_to_add.len();
2518 let redirected = redirect.len() as u32;
2519
2520 let mut records: Vec<(RecordKind, Vec<u8>)> = Vec::new();
2522 if !obs_to_add.is_empty() {
2523 let mut buf = Vec::new();
2524 store_enc::encode_add_observations(&mut buf, target, &obs_to_add)
2525 .map_err(MCSError::IoError)?;
2526 records.push((RecordKind::AddObservations, buf));
2527 }
2528 for r in &redirect {
2529 let mut buf = Vec::new();
2530 store_enc::encode_create_relation(&mut buf, &r.from, &r.to, &r.relation_type)
2531 .map_err(MCSError::IoError)?;
2532 records.push((RecordKind::CreateRelation, buf));
2533 }
2534 let mut del_buf = Vec::new();
2535 store_enc::encode_delete_entity(&mut del_buf, source).map_err(MCSError::IoError)?;
2536 records.push((RecordKind::DeleteEntity, del_buf));
2537
2538 self.store.write_record(RecordKind::TxnBegin, &[]).map_err(MCSError::IoError)?;
2540 for (kind, data) in &records {
2541 self.store.write_record(*kind, data).map_err(MCSError::IoError)?;
2542 }
2543 self.store.write_record(RecordKind::TxnCommit, &[]).map_err(MCSError::IoError)?;
2544
2545 for (kind, data) in &records {
2547 Self::apply_record(
2548 *kind, data, &mut self.interner, &mut self.entity_slots, &mut self.search,
2549 &mut self.name_table, &mut self.relations,
2550 );
2551 }
2552
2553 self.adjacency.remove(&source_id);
2561 for list in self.adjacency.values_mut() {
2562 list.retain(|(to, _)| *to != source_id);
2563 }
2564 for r in &redirect {
2565 let from_id = self.interner.get_optional(&r.from).unwrap();
2567 let to_id = self.interner.get_optional(&r.to).unwrap();
2568 let type_id = self.interner.get_optional(&r.relation_type).unwrap();
2569 self.adjacency.entry(from_id).or_default().push((to_id, type_id));
2570 self.adjacency.entry(to_id).or_default().push((from_id, type_id));
2571 }
2572
2573 self.compact_if_needed()?;
2574
2575 Ok(serde_json::json!({
2576 "source": source,
2577 "target": target,
2578 "movedObservations": moved_obs_count,
2579 "addedObservations": added_count,
2580 "redirectedRelations": redirected,
2581 }))
2582 }
2583
2584 pub fn extract_subgraph(&self, names: &[String], depth: u32) -> Result<KnowledgeGraphOut> {
2588 if names.is_empty() {
2589 return Ok(KnowledgeGraphOut {
2590 entities: Vec::new(),
2591 relations: Vec::new(),
2592 });
2593 }
2594 let mut visited: AHashSet<StrId> = AHashSet::new();
2596 let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
2597 for name in names {
2598 if let Some(id) = self.interner.get_optional(name)
2599 && visited.insert(id)
2600 {
2601 queue.push_back((id, 0));
2602 }
2603 }
2604 let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
2606 for (&node, edges) in &self.adjacency {
2607 let nb: Vec<StrId> = edges.iter().map(|(to, _)| *to).collect();
2608 adj.insert(node, nb);
2609 }
2610 while let Some((node, d)) = queue.pop_front() {
2611 if d >= depth {
2612 continue;
2613 }
2614 if let Some(nbrs) = adj.get(&node) {
2615 for &nb in nbrs {
2616 if visited.insert(nb) {
2617 queue.push_back((nb, d + 1));
2618 }
2619 }
2620 }
2621 }
2622 let mut entities: Vec<Entity> = Vec::with_capacity(visited.len());
2623 for &nid in &visited {
2624 if let Some(e) = self.entity_by_name_id(nid) {
2625 entities.push(e);
2626 }
2627 }
2628 let relations: Vec<Relation> = self
2629 .relations
2630 .iter()
2631 .filter(|r| visited.contains(&r.from) && visited.contains(&r.to))
2632 .map(|r| self.relation_to_output(r))
2633 .collect();
2634 Ok(KnowledgeGraphOut { entities, relations })
2635 }
2636
2637 pub fn batch_get_entities(&self, names: &[String]) -> Vec<Option<Entity>> {
2639 names.iter().map(|n| self.get_entity(n)).collect()
2640 }
2641
2642 #[allow(clippy::too_many_arguments)]
2645 fn dfs_all_paths(
2646 adj: &AHashMap<StrId, Vec<StrId>>,
2647 current: StrId,
2648 target: StrId,
2649 max_depth: usize,
2650 max_paths: usize,
2651 visited: &mut AHashSet<StrId>,
2652 current_path: &mut Vec<StrId>,
2653 all_paths: &mut Vec<Vec<StrId>>,
2654 ) {
2655 if all_paths.len() >= max_paths {
2656 return;
2657 }
2658 if current == target && current_path.len() > 1 {
2659 all_paths.push(current_path.clone());
2660 return;
2661 }
2662 if current_path.len() > max_depth {
2663 return;
2664 }
2665 if let Some(neighbors) = adj.get(¤t) {
2666 for &nb in neighbors {
2667 if visited.insert(nb) {
2668 current_path.push(nb);
2669 Self::dfs_all_paths(
2670 adj, nb, target, max_depth, max_paths, visited, current_path, all_paths,
2671 );
2672 current_path.pop();
2673 visited.remove(&nb);
2674 }
2675 }
2676 }
2677 }
2678
2679 pub fn find_all_paths(
2683 &self,
2684 from: &str,
2685 to: &str,
2686 max_depth: usize,
2687 max_paths: usize,
2688 ) -> Result<Vec<Vec<String>>> {
2689 let from_id = self
2690 .interner
2691 .get_optional(from)
2692 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
2693 let to_id = self
2694 .interner
2695 .get_optional(to)
2696 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
2697 if self.lookup_live_slot(from).is_none() {
2699 return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
2700 }
2701 if self.lookup_live_slot(to).is_none() {
2702 return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
2703 }
2704 if from_id == to_id {
2705 return Ok(vec![vec![from.to_string()]]);
2706 }
2707 let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::with_capacity(self.adjacency.len());
2709 for (&node, edges) in &self.adjacency {
2710 let nbrs: Vec<StrId> = edges.iter().map(|(to, _)| *to).collect();
2711 adj.insert(node, nbrs);
2712 }
2713 let mut all_paths: Vec<Vec<StrId>> = Vec::new();
2714 let mut current_path = Vec::new();
2715 let mut visited: AHashSet<StrId> = AHashSet::new();
2716 visited.insert(from_id);
2717 current_path.push(from_id);
2718 Self::dfs_all_paths(
2719 &adj,
2720 from_id,
2721 to_id,
2722 max_depth,
2723 max_paths,
2724 &mut visited,
2725 &mut current_path,
2726 &mut all_paths,
2727 );
2728 if all_paths.is_empty() {
2729 return Err(MCSError::MemoryError(format!(
2730 "No path found between '{from}' and '{to}'"
2731 )));
2732 }
2733 let result: Vec<Vec<String>> = all_paths
2734 .into_iter()
2735 .map(|path| {
2736 path.into_iter()
2737 .map(|id| self.interner.lookup(id).to_string())
2738 .collect()
2739 })
2740 .collect();
2741 Ok(result)
2742 }
2743
2744 pub fn snapshot(&self) -> ReadSnapshot {
2749 ReadSnapshot {
2750 interner: Arc::new(self.interner.clone()),
2751 entity_slots: Arc::from_iter(self.entity_slots.iter().cloned()),
2752 name_table: Arc::new(self.name_table.clone()),
2753 relations: Arc::from_iter(self.relations.iter().cloned()),
2754 adjacency: Arc::new(self.adjacency.clone()),
2755 search: Arc::new(self.search.clone()),
2756 }
2757 }
2758
2759 pub fn flush(&mut self) -> Result<()> {
2763 self.store.flush().map_err(MCSError::IoError)
2764 }
2765
2766 pub fn sync(&mut self) -> Result<()> {
2769 self.store.sync().map_err(MCSError::IoError)
2770 }
2771
2772 pub fn flush_and_sync(&mut self) -> Result<()> {
2774 self.store.flush_and_sync().map_err(MCSError::IoError)
2775 }
2776}
2777
2778
2779
2780pub struct GraphHandle {
2799 inner: Arc<parking_lot::Mutex<KnowledgeGraph>>,
2800 snapshot: ArcSwap<ReadSnapshot>,
2801 read_cache: ArcSwap<Option<Arc<str>>>,
2803 sync_notify: Arc<(StdMutex<bool>, Condvar)>,
2807 stop_sync: Arc<AtomicBool>,
2809 durability: Durability,
2810}
2811
2812pub struct WriteGuard<'a> {
2814 guard: parking_lot::MutexGuard<'a, KnowledgeGraph>,
2815 snapshot: &'a ArcSwap<ReadSnapshot>,
2816 read_cache: &'a ArcSwap<Option<Arc<str>>>,
2817 sync_notify: &'a (StdMutex<bool>, Condvar),
2818 durability: Durability,
2819 did_publish: bool,
2820}
2821
2822impl WriteGuard<'_> {
2823 pub fn publish(&mut self) {
2827 if self.durability.is_sync() {
2828 if let Err(e) = self.guard.flush_and_sync() {
2829 tracing::error!("WAL sync failed: {e}");
2830 }
2831 } else if let Err(e) = self.guard.flush() {
2832 tracing::error!("WAL flush failed: {e}");
2833 }
2834 let snap = Arc::new(self.guard.snapshot());
2835 self.snapshot.store(snap);
2836 self.read_cache.store(Arc::new(None));
2837 self.did_publish = true;
2838 let (lock, cvar) = self.sync_notify;
2840 let mut pending = lock.lock().unwrap_or_else(|e| e.into_inner());
2841 *pending = true;
2842 cvar.notify_one();
2843 }
2844
2845 pub fn graph(&mut self) -> &mut KnowledgeGraph {
2847 &mut self.guard
2848 }
2849}
2850
2851impl std::ops::Deref for WriteGuard<'_> {
2852 type Target = KnowledgeGraph;
2853 fn deref(&self) -> &KnowledgeGraph {
2854 &self.guard
2855 }
2856}
2857
2858impl std::ops::DerefMut for WriteGuard<'_> {
2859 fn deref_mut(&mut self) -> &mut KnowledgeGraph {
2860 &mut self.guard
2861 }
2862}
2863
2864impl Drop for WriteGuard<'_> {
2865 fn drop(&mut self) {
2866 if !self.did_publish {
2867 self.publish();
2868 }
2869 }
2870}
2871
2872impl Drop for GraphHandle {
2873 fn drop(&mut self) {
2874 self.stop_sync.store(true, Ordering::Relaxed);
2875 let (lock, cvar) = &*self.sync_notify;
2878 let mut pending = lock.lock().unwrap_or_else(|e| e.into_inner());
2879 *pending = true;
2880 cvar.notify_one();
2881 }
2882}
2883
2884impl GraphHandle {
2885 pub fn new(path: &Path, durability: Durability) -> std::io::Result<Self> {
2889 let kg = KnowledgeGraph::new(path)?;
2890 let snapshot = Arc::new(kg.snapshot());
2891 let sync_slot = Arc::clone(&kg.store.sync_slot);
2895 let inner = Arc::new(parking_lot::Mutex::new(kg));
2896
2897 let sync_notify: Arc<(StdMutex<bool>, Condvar)> =
2898 Arc::new((StdMutex::new(false), Condvar::new()));
2899 let notify = Arc::clone(&sync_notify);
2900 let stop_sync = Arc::new(AtomicBool::new(false));
2901
2902 let sync_stop = Arc::clone(&stop_sync);
2906 std::thread::Builder::new()
2907 .name("mcp-memory-sync".into())
2908 .spawn(move || {
2909 let (lock, cvar) = &*notify;
2910 loop {
2911 let mut guard = cvar
2914 .wait_timeout_while(
2915 lock.lock().unwrap_or_else(|e| e.into_inner()),
2916 std::time::Duration::from_secs(1),
2917 |p| !*p,
2918 )
2919 .unwrap_or_else(|e| e.into_inner())
2920 .0;
2921
2922 let should_sync = *guard;
2923 *guard = false;
2924 drop(guard);
2927
2928 if should_sync {
2929 if let Err(e) = sync_slot.load().sync_data() {
2932 tracing::error!("WAL fsync failed: {e}");
2933 }
2934 }
2935
2936 if sync_stop.load(Ordering::Relaxed) {
2937 if let Err(e) = sync_slot.load().sync_data() {
2939 tracing::error!("WAL final fsync failed: {e}");
2940 }
2941 break;
2942 }
2943 }
2944 })
2945 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
2946
2947 Ok(Self {
2948 inner,
2949 snapshot: ArcSwap::new(snapshot),
2950 read_cache: ArcSwap::new(Arc::new(None)),
2951 sync_notify,
2952 stop_sync,
2953 durability,
2954 })
2955 }
2956
2957 pub fn read_graph_cached(&self) -> Arc<str> {
2960 if let Some(cached) = self.read_cache.load().as_ref() {
2961 return cached.clone();
2962 }
2963 let graph = self.read();
2964 let json: Arc<str> = Arc::from(graph.read_graph_json().into_boxed_str());
2965 self.read_cache.store(Arc::new(Some(json.clone())));
2966 json
2967 }
2968
2969 pub fn read(&self) -> ReadSnapshot {
2971 (**self.snapshot.load()).clone()
2972 }
2973
2974 pub fn write(&self) -> WriteGuard<'_> {
2977 WriteGuard {
2978 guard: self.inner.lock(),
2979 snapshot: &self.snapshot,
2980 read_cache: &self.read_cache,
2981 sync_notify: &self.sync_notify,
2982 durability: self.durability,
2983 did_publish: false,
2984 }
2985 }
2986}
2987
2988