use std::collections::VecDeque;
use ahash::{AHashMap, AHashSet};
use std::path::Path;
use serde::ser::{Serialize, SerializeSeq, SerializeStruct, Serializer};
use crate::errors::{MCSError, Result};
use crate::intern::{StrId, StringInterner};
use crate::types::{Entity, Relation, KnowledgeGraphOut};
use crate::search::SearchIndex;
use crate::store::{self as store_enc, BinaryStore, RecordKind};
const ENTITY_SLOT_LIVE: u8 = 1;
const NAME_TABLE_SHARDS: usize = 4;
#[cfg(target_arch = "x86_64")]
#[inline(always)]
unsafe fn prefetch_addr(addr: *const u8) {
std::arch::x86_64::_mm_prefetch::<3>(addr);
}
#[cfg(not(target_arch = "x86_64"))]
#[inline(always)]
const unsafe fn prefetch_addr(_addr: *const u8) {}
#[cfg_attr(feature = "cache_align", repr(align(64)))]
struct StoredEntity {
state: u8,
name: StrId,
entity_type: StrId,
observations: Vec<StrId>,
}
impl StoredEntity {
const fn is_live(&self) -> bool {
self.state == ENTITY_SLOT_LIVE
}
}
#[cfg_attr(feature = "cache_align", repr(align(16)))]
struct StoredRelation {
from: StrId,
to: StrId,
relation_type: StrId,
}
pub struct GraphView<'a> {
kg: &'a KnowledgeGraph,
entities: Vec<&'a StoredEntity>,
relations: Vec<&'a StoredRelation>,
}
impl GraphView<'_> {
pub fn to_owned_out(&self) -> KnowledgeGraphOut {
KnowledgeGraphOut {
entities: self.entities.iter().map(|e| self.kg.entity_to_output(e)).collect(),
relations: self.relations.iter().map(|r| self.kg.relation_to_output(r)).collect(),
}
}
}
impl Serialize for GraphView<'_> {
fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
let mut st = s.serialize_struct("KnowledgeGraphOut", 2)?;
st.serialize_field("entities", &EntityListRef { kg: self.kg, items: &self.entities })?;
st.serialize_field("relations", &RelationListRef { kg: self.kg, items: &self.relations })?;
st.end()
}
}
struct EntityListRef<'a> { kg: &'a KnowledgeGraph, items: &'a [&'a StoredEntity] }
impl Serialize for EntityListRef<'_> {
fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
let mut seq = s.serialize_seq(Some(self.items.len()))?;
for &e in self.items {
seq.serialize_element(&EntityRef { kg: self.kg, e })?;
}
seq.end()
}
}
struct RelationListRef<'a> { kg: &'a KnowledgeGraph, items: &'a [&'a StoredRelation] }
impl Serialize for RelationListRef<'_> {
fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
let mut seq = s.serialize_seq(Some(self.items.len()))?;
for &r in self.items {
seq.serialize_element(&RelationRef { kg: self.kg, r })?;
}
seq.end()
}
}
struct EntityRef<'a> { kg: &'a KnowledgeGraph, e: &'a StoredEntity }
impl Serialize for EntityRef<'_> {
fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
let mut st = s.serialize_struct("Entity", 3)?;
st.serialize_field("name", self.kg.interner.lookup(self.e.name))?;
st.serialize_field("entityType", self.kg.interner.lookup(self.e.entity_type))?;
st.serialize_field("observations", &ObsRef { kg: self.kg, obs: &self.e.observations })?;
st.end()
}
}
struct ObsRef<'a> { kg: &'a KnowledgeGraph, obs: &'a [StrId] }
impl Serialize for ObsRef<'_> {
fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
let mut seq = s.serialize_seq(Some(self.obs.len()))?;
for &o in self.obs {
seq.serialize_element(self.kg.interner.lookup(o))?;
}
seq.end()
}
}
struct RelationRef<'a> { kg: &'a KnowledgeGraph, r: &'a StoredRelation }
impl Serialize for RelationRef<'_> {
fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
let mut st = s.serialize_struct("Relation", 3)?;
st.serialize_field("from", self.kg.interner.lookup(self.r.from))?;
st.serialize_field("to", self.kg.interner.lookup(self.r.to))?;
st.serialize_field("relationType", self.kg.interner.lookup(self.r.relation_type))?;
st.end()
}
}
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum Direction {
Out,
In,
Both,
}
impl Direction {
pub fn parse(s: Option<&str>) -> Self {
match s {
Some("out") => Direction::Out,
Some("in") => Direction::In,
_ => Direction::Both,
}
}
}
fn sanitize_label(s: &str) -> String {
let mut out = String::with_capacity(s.len());
for c in s.chars() {
match c {
'"' => out.push('\''),
'\n' | '\r' => out.push(' '),
_ => out.push(c),
}
}
out
}
const EMPTY_SLOT: u8 = 0xFF;
#[inline(always)]
const fn h2(hash: u64) -> u8 {
(hash & 0x7F) as u8
}
#[inline(always)]
const fn h1(hash: u64, mask: usize) -> usize {
((hash >> 7) as usize) & mask
}
struct NameTableShard {
ctrl: Vec<u8>, names: Vec<StrId>,
slots: Vec<u32>,
mask: usize,
count: usize,
}
impl NameTableShard {
fn new(capacity: usize) -> Self {
let cap = capacity.next_power_of_two().max(16);
Self {
ctrl: vec![EMPTY_SLOT; cap],
names: vec![StrId::EMPTY; cap],
slots: vec![u32::MAX; cap],
mask: cap - 1,
count: 0,
}
}
#[inline(always)]
fn lookup(&self, hash: u64, name: StrId) -> Option<u32> {
let stamp = h2(hash);
let mask = self.mask;
let mut idx = h1(hash, mask);
let ctrl = self.ctrl.as_ptr();
let names = self.names.as_ptr();
let slots = self.slots.as_ptr();
let len = self.ctrl.len();
for _ in 0..len {
let prefetch_idx = idx.wrapping_add(4) & mask;
unsafe { prefetch_addr(ctrl.add(prefetch_idx)) };
unsafe {
let c = *ctrl.add(idx);
if c & 0x80 != 0 {
return None;
}
if c == stamp && *names.add(idx) == name {
return Some(*slots.add(idx));
}
}
idx = (idx + 1) & mask;
}
None
}
fn insert(&mut self, interner: &StringInterner, hash: u64, name: StrId, slot: u32) {
if self.count * 4 > self.ctrl.len() * 3 {
self.grow(interner);
}
let stamp = h2(hash);
let mask = self.mask;
let mut idx = h1(hash, mask);
loop {
unsafe {
if *self.ctrl.get_unchecked(idx) & 0x80 != 0 {
*self.ctrl.get_unchecked_mut(idx) = stamp;
*self.names.get_unchecked_mut(idx) = name;
*self.slots.get_unchecked_mut(idx) = slot;
self.count += 1;
return;
}
}
idx = (idx + 1) & mask;
}
}
fn remove(&mut self, interner: &StringInterner, hash: u64, name: StrId) {
let stamp = h2(hash);
let mask = self.mask;
let mut idx = h1(hash, mask);
let len = self.ctrl.len();
for _ in 0..len {
if self.ctrl[idx] & 0x80 != 0 {
return;
}
if self.ctrl[idx] == stamp && self.names[idx] == name {
self.ctrl[idx] = EMPTY_SLOT;
self.names[idx] = StrId::EMPTY;
self.slots[idx] = u32::MAX;
self.count -= 1;
let mut next = (idx + 1) & mask;
while self.ctrl[next] & 0x80 == 0 {
let nn = self.names[next];
let ns = self.slots[next];
let nh = interner.get_hash(nn);
self.ctrl[next] = EMPTY_SLOT;
self.names[next] = StrId::EMPTY;
self.slots[next] = u32::MAX;
self.count -= 1;
let nstamp = h2(nh);
let mut re_idx = h1(nh, mask);
while self.ctrl[re_idx] & 0x80 == 0 {
re_idx = (re_idx + 1) & mask;
}
self.ctrl[re_idx] = nstamp;
self.names[re_idx] = nn;
self.slots[re_idx] = ns;
self.count += 1;
next = (next + 1) & mask;
}
return;
}
idx = (idx + 1) & mask;
}
}
fn grow(&mut self, interner: &StringInterner) {
let new_cap = self.ctrl.len() * 2;
let new_mask = new_cap - 1;
let mut new_ctrl = vec![EMPTY_SLOT; new_cap];
let mut new_names = vec![StrId::EMPTY; new_cap];
let mut new_slots = vec![u32::MAX; new_cap];
for i in 0..self.ctrl.len() {
if self.ctrl[i] & 0x80 == 0 {
let name = self.names[i];
let hash = interner.get_hash(name);
let stamp = h2(hash);
let mut idx = h1(hash, new_mask);
while new_ctrl[idx] & 0x80 == 0 {
idx = (idx + 1) & new_mask;
}
new_ctrl[idx] = stamp;
new_names[idx] = name;
new_slots[idx] = self.slots[i];
}
}
self.ctrl = new_ctrl;
self.names = new_names;
self.slots = new_slots;
self.mask = new_mask;
}
}
struct ShardedNameTable {
shards: [NameTableShard; NAME_TABLE_SHARDS],
}
impl ShardedNameTable {
fn new(capacity_per_shard: usize) -> Self {
Self {
shards: [
NameTableShard::new(capacity_per_shard),
NameTableShard::new(capacity_per_shard),
NameTableShard::new(capacity_per_shard),
NameTableShard::new(capacity_per_shard),
],
}
}
#[inline(always)]
const fn shard(hash: u64) -> usize {
(hash as usize) & (NAME_TABLE_SHARDS - 1)
}
#[inline(always)]
fn lookup(&self, hash: u64, name: StrId) -> Option<u32> {
self.shards[Self::shard(hash)].lookup(hash, name)
}
#[inline(always)]
fn insert(&mut self, interner: &StringInterner, hash: u64, name: StrId, slot: u32) {
self.shards[Self::shard(hash)].insert(interner, hash, name, slot);
}
#[inline(always)]
fn remove(&mut self, interner: &StringInterner, hash: u64, name: StrId) {
self.shards[Self::shard(hash)].remove(interner, hash, name);
}
}
pub struct KnowledgeGraph {
interner: StringInterner,
entity_slots: Vec<Option<StoredEntity>>,
free_slots: Vec<u32>,
name_table: ShardedNameTable,
relations: Vec<StoredRelation>,
search: SearchIndex,
store: BinaryStore,
}
impl KnowledgeGraph {
pub fn new(path: &Path) -> std::io::Result<Self> {
let store = BinaryStore::new(path)?;
let mut interner = StringInterner::with_capacity(65536, 1024);
let mut entity_slots: Vec<Option<StoredEntity>> = Vec::with_capacity(256);
let mut name_table = ShardedNameTable::new(64);
let mut relations: Vec<StoredRelation> = Vec::with_capacity(64);
let mut search = SearchIndex::new();
store.replay(|kind, data| {
match kind {
RecordKind::CreateEntity => {
if let Some((name, etype, obs)) = store_enc::decode_create_entity(data) {
Self::replay_create_entity(
&mut interner, &mut entity_slots, &mut search, &mut name_table, name, etype, &obs,
);
}
}
RecordKind::CreateRelation => {
if let Some((from, to, rtype)) = store_enc::decode_create_relation(data) {
let from_id = interner.intern(from);
let to_id = interner.intern(to);
let type_id = interner.intern(rtype);
relations.push(StoredRelation {
from: from_id,
to: to_id,
relation_type: type_id,
});
}
}
RecordKind::AddObservations => {
if let Some((name, obs)) = store_enc::decode_add_observations(data) {
Self::replay_add_observations(
&mut interner, &mut entity_slots, &mut search, &mut name_table, name, &obs,
);
}
}
RecordKind::DeleteEntity => {
if let Some(name) = store_enc::decode_delete_entity(data) {
Self::replay_delete_entity(
&mut interner, &mut entity_slots, &mut relations, &mut search, &mut name_table, name,
);
}
}
RecordKind::DeleteObservations => {
if let Some((name, obs)) = store_enc::decode_delete_observations(data) {
Self::replay_delete_observations(
&mut interner, &mut entity_slots, &mut search, &mut name_table, name, &obs,
);
}
}
RecordKind::DeleteRelation => {
if let Some((from, to, rtype)) = store_enc::decode_delete_relation(data) {
let from_id = interner.intern(from);
let to_id = interner.intern(to);
let type_id = interner.intern(rtype);
relations.retain(|r| {
!(r.from == from_id && r.to == to_id && r.relation_type == type_id)
});
}
}
}
})?;
let free_slots: Vec<u32> = entity_slots
.iter()
.enumerate()
.filter(|(_, s)| s.is_none())
.map(|(i, _)| i as u32)
.collect();
Ok(Self {
interner,
entity_slots,
free_slots,
name_table,
relations,
search,
store,
})
}
#[allow(clippy::ptr_arg)]
fn replay_create_entity(
interner: &mut StringInterner,
entities: &mut Vec<Option<StoredEntity>>,
search: &mut SearchIndex,
name_table: &mut ShardedNameTable,
name: &str,
etype: &str,
observations: &[&str],
) {
let name_id = interner.intern(name);
let type_id = interner.intern(etype);
let obs_ids: Vec<StrId> = observations.iter().map(|o| interner.intern(o)).collect();
let slot = entities.len() as u32;
entities.push(Some(StoredEntity {
state: ENTITY_SLOT_LIVE,
name: name_id,
entity_type: type_id,
observations: obs_ids.clone(),
}));
let hash = interner.get_hash(name_id);
name_table.insert(&*interner, hash, name_id, slot);
search.index_entity(interner, slot, name_id, type_id, &obs_ids);
}
fn replay_add_observations(
interner: &mut StringInterner,
entities: &mut [Option<StoredEntity>],
search: &mut SearchIndex,
name_table: &mut ShardedNameTable,
name: &str,
observations: &[&str],
) {
let name_id = interner.intern(name);
let hash = interner.get_hash(name_id);
if let Some(slot) = name_table.lookup(hash, name_id)
&& let Some(Some(entity)) = entities.get_mut(slot as usize)
{
for &o in observations {
let oid = interner.intern(o);
if !entity.observations.contains(&oid) {
entity.observations.push(oid);
}
}
search.remove_entity(slot);
search.index_entity(
interner,
slot,
entity.name,
entity.entity_type,
&entity.observations,
);
}
}
fn replay_delete_entity(
interner: &mut StringInterner,
entities: &mut [Option<StoredEntity>],
rels: &mut Vec<StoredRelation>,
search: &mut SearchIndex,
name_table: &mut ShardedNameTable,
name: &str,
) {
let name_id = interner.intern(name);
let hash = interner.get_hash(name_id);
if let Some(slot) = name_table.lookup(hash, name_id)
&& let Some(Some(_)) = entities.get(slot as usize)
{
entities[slot as usize] = None;
search.remove_entity(slot);
name_table.remove(&*interner, hash, name_id);
}
rels.retain(|r| r.from != name_id && r.to != name_id);
}
fn replay_delete_observations(
interner: &mut StringInterner,
entities: &mut [Option<StoredEntity>],
search: &mut SearchIndex,
name_table: &mut ShardedNameTable,
name: &str,
observations: &[&str],
) {
let name_id = interner.intern(name);
let hash = interner.get_hash(name_id);
if let Some(slot) = name_table.lookup(hash, name_id)
&& let Some(Some(entity)) = entities.get_mut(slot as usize)
{
let remove_ids: Vec<StrId> = observations.iter().map(|o| interner.intern(o)).collect();
entity.observations.retain(|o| !remove_ids.contains(o));
search.remove_entity(slot);
search.index_entity(
interner,
slot,
entity.name,
entity.entity_type,
&entity.observations,
);
}
}
pub const fn interner(&self) -> &StringInterner {
&self.interner
}
pub fn get_entity(&self, name: &str) -> Option<Entity> {
let name_id = self.interner.get_optional(name)?;
let hash = self.interner.get_hash(name_id);
let slot = self.name_table.lookup(hash, name_id)?;
let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
if !stored.is_live() {
return None;
}
Some(self.entity_to_output(stored))
}
pub fn graph_stats(&self) -> serde_json::Value {
let live_entities = self
.entity_slots
.iter()
.filter(|s| s.as_ref().is_some_and(|e| e.is_live()))
.count();
let total_relations = self.relations.len();
let index_entries = self.search.len();
let total_obs: usize = self
.entity_slots
.iter()
.filter_map(|s| s.as_ref())
.filter(|e| e.is_live())
.map(|e| e.observations.len())
.sum();
serde_json::json!({
"entities": live_entities,
"relations": total_relations,
"totalObservations": total_obs,
"searchIndexEntries": index_entries,
"internedStrings": self.interner.len(),
"internedBytes": self.interner.total_bytes(),
})
}
pub fn search_relations(&self, from: Option<&str>, to: Option<&str>, rtype: Option<&str>) -> Vec<Relation> {
let from_id = match from {
Some(f) => match self.interner.get_optional(f) {
Some(id) => Some(id),
None => return Vec::new(),
},
None => None,
};
let to_id = match to {
Some(t) => match self.interner.get_optional(t) {
Some(id) => Some(id),
None => return Vec::new(),
},
None => None,
};
let rtype_id = match rtype {
Some(r) => match self.interner.get_optional(r) {
Some(id) => Some(id),
None => return Vec::new(),
},
None => None,
};
self.relations
.iter()
.filter(|r| {
from_id.is_none_or(|f| r.from == f)
&& to_id.is_none_or(|t| r.to == t)
&& rtype_id.is_none_or(|rt| r.relation_type == rt)
})
.map(|r| Relation {
from: self.interner.lookup(r.from).to_string(),
to: self.interner.lookup(r.to).to_string(),
relation_type: self.interner.lookup(r.relation_type).to_string(),
})
.collect()
}
pub fn find_path(&self, from: &str, to: &str) -> Result<Vec<String>> {
let from_id = self.interner.get_optional(from)
.ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
let to_id = self.interner.get_optional(to)
.ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
let hash_from = self.interner.get_hash(from_id);
let hash_to = self.interner.get_hash(to_id);
if self.name_table.lookup(hash_from, from_id).is_none() {
return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
}
if self.name_table.lookup(hash_to, to_id).is_none() {
return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
}
if from_id == to_id {
return Ok(vec![from.to_string()]);
}
let mut adj: AHashMap<StrId, Vec<(StrId, StrId)>> = AHashMap::new();
for rel in &self.relations {
adj.entry(rel.from).or_default().push((rel.to, rel.relation_type));
adj.entry(rel.to).or_default().push((rel.from, rel.relation_type));
}
let mut visited: AHashSet<StrId> = AHashSet::new();
let mut parent: AHashMap<StrId, StrId> = AHashMap::new();
let mut queue: VecDeque<StrId> = VecDeque::new();
visited.insert(from_id);
queue.push_back(from_id);
while let Some(current) = queue.pop_front() {
if current == to_id {
break;
}
if let Some(neighbors) = adj.get(¤t) {
for &(neighbor, _) in neighbors {
if visited.insert(neighbor) {
parent.insert(neighbor, current);
queue.push_back(neighbor);
}
}
}
}
if !parent.contains_key(&to_id) && from_id != to_id {
return Err(MCSError::MemoryError(format!(
"No path found between '{from}' and '{to}'"
)));
}
let mut path: Vec<String> = Vec::new();
let mut cur = to_id;
loop {
path.push(self.interner.lookup(cur).to_string());
if cur == from_id {
break;
}
cur = *parent.get(&cur).ok_or_else(|| {
MCSError::MemoryError("Path reconstruction failed".into())
})?;
}
path.reverse();
Ok(path)
}
pub fn compact(&mut self) -> Result<()> {
let mut create_entities: Vec<Entity> = Vec::new();
let mut create_relations: Vec<Relation> = Vec::new();
for slot in &self.entity_slots {
if let Some(stored) = slot.as_ref().filter(|e| e.is_live()) {
create_entities.push(self.entity_to_output(stored));
}
}
for rel in &self.relations {
create_relations.push(Relation {
from: self.interner.lookup(rel.from).to_string(),
to: self.interner.lookup(rel.to).to_string(),
relation_type: self.interner.lookup(rel.relation_type).to_string(),
});
}
let tmp_path = self.store.path().with_extension("tmp");
let mut tmp_store = BinaryStore::new(&tmp_path).map_err(MCSError::IoError)?;
for entity in &create_entities {
let mut buf = Vec::new();
store_enc::encode_create_entity(&mut buf, &entity.name, &entity.entity_type, &entity.observations)
.map_err(MCSError::IoError)?;
tmp_store.write_record(RecordKind::CreateEntity, &buf).map_err(MCSError::IoError)?;
}
for relation in &create_relations {
let mut buf = Vec::new();
store_enc::encode_create_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
.map_err(MCSError::IoError)?;
tmp_store.write_record(RecordKind::CreateRelation, &buf).map_err(MCSError::IoError)?;
}
tmp_store.flush_and_sync().map_err(MCSError::IoError)?;
drop(tmp_store);
std::fs::rename(&tmp_path, self.store.path()).map_err(MCSError::IoError)?;
let path = self.store.path().clone();
*self = KnowledgeGraph::new(&path).map_err(MCSError::IoError)?;
Ok(())
}
pub fn create_entities(&mut self, entities: &[Entity]) -> Result<Vec<Entity>> {
for entity in entities {
if entity.name.is_empty() {
return Err(MCSError::InvalidParams(
"Entity name must not be empty".into(),
));
}
}
let mut created = Vec::new();
for entity in entities {
let existing = self.interner.get_optional(&entity.name)
.and_then(|id| {
let hash = self.interner.get_hash(id);
self.name_table.lookup(hash, id)
});
if existing.is_some() {
continue;
}
let mut buf = Vec::new();
store_enc::encode_create_entity(&mut buf, &entity.name, &entity.entity_type, &entity.observations)
.map_err(MCSError::IoError)?;
self.store.write_record(RecordKind::CreateEntity, &buf)
.map_err(MCSError::IoError)?;
let name_id = self.interner.intern(&entity.name);
let hash = self.interner.get_hash(name_id);
let type_id = self.interner.intern(&entity.entity_type);
let obs_ids: Vec<StrId> = entity
.observations
.iter()
.map(|o| self.interner.intern(o))
.collect();
let reused = self.free_slots.pop();
let slot = reused.unwrap_or(self.entity_slots.len() as u32);
self.search
.index_entity(&mut self.interner, slot, name_id, type_id, &obs_ids);
let stored = Some(StoredEntity {
state: ENTITY_SLOT_LIVE,
name: name_id,
entity_type: type_id,
observations: obs_ids,
});
match reused {
Some(s) => self.entity_slots[s as usize] = stored,
None => self.entity_slots.push(stored),
}
self.name_table.insert(&self.interner, hash, name_id, slot);
created.push(Entity {
name: entity.name.clone(),
entity_type: entity.entity_type.clone(),
observations: entity.observations.clone(),
});
}
Ok(created)
}
pub fn create_relations(&mut self, relations: &[Relation]) -> Result<Vec<Relation>> {
for relation in relations {
if relation.from.is_empty() || relation.to.is_empty() {
return Err(MCSError::InvalidParams(
"Relation endpoints must not be empty".into(),
));
}
}
let mut created = Vec::new();
let mut rel_set: AHashSet<(StrId, StrId, StrId)> = AHashSet::new();
for rel in &self.relations {
rel_set.insert((rel.from, rel.to, rel.relation_type));
}
for relation in relations {
let from_id = self.interner.intern(&relation.from);
let to_id = self.interner.intern(&relation.to);
let type_id = self.interner.intern(&relation.relation_type);
if !rel_set.insert((from_id, to_id, type_id)) {
continue;
}
let mut buf = Vec::new();
store_enc::encode_create_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
.map_err(MCSError::IoError)?;
self.store.write_record(RecordKind::CreateRelation, &buf)
.map_err(MCSError::IoError)?;
self.relations.push(StoredRelation {
from: from_id,
to: to_id,
relation_type: type_id,
});
created.push(Relation {
from: relation.from.clone(),
to: relation.to.clone(),
relation_type: relation.relation_type.clone(),
});
}
Ok(created)
}
pub fn add_observations(&mut self, entity_name: &str, contents: &[String]) -> Result<Vec<String>> {
let name_id = self.interner.get_optional(entity_name)
.ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
let hash = self.interner.get_hash(name_id);
let slot = self
.name_table
.lookup(hash, name_id)
.ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
let stored = self
.entity_slots
.get_mut(slot as usize)
.and_then(|e| e.as_mut())
.ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
let existing: AHashSet<StrId> = stored.observations.iter().copied().collect();
let mut added = Vec::new();
let mut interned_added = Vec::new();
for content in contents {
let cid = self.interner.intern(content);
if existing.contains(&cid) {
continue;
}
stored.observations.push(cid);
interned_added.push(cid);
added.push(content.clone());
}
if !added.is_empty() {
let mut buf = Vec::new();
store_enc::encode_add_observations(&mut buf, entity_name, &added)
.map_err(MCSError::IoError)?;
self.store.write_record(RecordKind::AddObservations, &buf)
.map_err(MCSError::IoError)?;
self.search
.index_additional(&mut self.interner, slot, &interned_added);
}
Ok(added)
}
pub fn delete_entities(&mut self, entity_names: &[String]) -> Result<()> {
let mut deleted_names = Vec::new();
for name in entity_names {
let name_id_opt = self.interner.get_optional(name);
if let Some(name_id) = name_id_opt {
let hash = self.interner.get_hash(name_id);
if let Some(slot) = self.name_table.lookup(hash, name_id)
&& let Some(Some(_)) = self.entity_slots.get(slot as usize)
{
let mut buf = Vec::new();
store_enc::encode_delete_entity(&mut buf, name)
.map_err(MCSError::IoError)?;
self.store.write_record(RecordKind::DeleteEntity, &buf)
.map_err(MCSError::IoError)?;
self.entity_slots[slot as usize] = None;
self.free_slots.push(slot);
self.search.remove_entity(slot);
self.name_table.remove(&self.interner, hash, name_id);
deleted_names.push(name.clone());
}
}
}
if !deleted_names.is_empty() {
let deleted_ids: AHashSet<StrId> = deleted_names.iter()
.map(|n| self.interner.intern(n))
.collect();
self.relations
.retain(|r| !deleted_ids.contains(&r.from) && !deleted_ids.contains(&r.to));
}
Ok(())
}
pub fn delete_observations(&mut self, entity_name: &str, observations: &[String]) -> Result<()> {
let name_id = self.interner.get_optional(entity_name)
.ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
let hash = self.interner.get_hash(name_id);
let slot = self
.name_table
.lookup(hash, name_id)
.ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
let stored = self
.entity_slots
.get_mut(slot as usize)
.and_then(|e| e.as_mut())
.ok_or_else(|| MCSError::InvalidParams(format!("Entity '{entity_name}' not found")))?;
let remove_ids: AHashSet<StrId> = observations.iter().map(|o| self.interner.intern(o)).collect();
stored.observations.retain(|o| !remove_ids.contains(o));
let mut buf = Vec::new();
store_enc::encode_delete_observations(&mut buf, entity_name, observations)
.map_err(MCSError::IoError)?;
self.store.write_record(RecordKind::DeleteObservations, &buf)
.map_err(MCSError::IoError)?;
self.search.remove_entity(slot);
self.search
.index_entity(&mut self.interner, slot, stored.name, stored.entity_type, &stored.observations);
Ok(())
}
pub fn delete_relations(&mut self, relations: &[Relation]) -> Result<()> {
let rels: AHashSet<(StrId, StrId, StrId)> = relations
.iter()
.map(|r| {
(
self.interner.intern(&r.from),
self.interner.intern(&r.to),
self.interner.intern(&r.relation_type),
)
})
.collect();
self.relations
.retain(|r| !rels.contains(&(r.from, r.to, r.relation_type)));
for relation in relations {
let mut buf = Vec::new();
store_enc::encode_delete_relation(&mut buf, &relation.from, &relation.to, &relation.relation_type)
.map_err(MCSError::IoError)?;
self.store.write_record(RecordKind::DeleteRelation, &buf)
.map_err(MCSError::IoError)?;
}
Ok(())
}
pub fn read_graph(&self) -> KnowledgeGraphOut {
self.read_graph_view().to_owned_out()
}
pub fn read_graph_view(&self) -> GraphView<'_> {
let entities: Vec<&StoredEntity> = self
.entity_slots
.iter()
.filter_map(|s| s.as_ref().filter(|e| e.is_live()))
.collect();
let relations: Vec<&StoredRelation> = self.relations.iter().collect();
GraphView { kg: self, entities, relations }
}
pub fn search_nodes(&self, query: &str) -> KnowledgeGraphOut {
self.search_nodes_filtered(query, None, 0, usize::MAX)
}
pub fn open_nodes(&self, names: &[String]) -> KnowledgeGraphOut {
self.open_nodes_view(names).to_owned_out()
}
pub fn open_nodes_view(&self, names: &[String]) -> GraphView<'_> {
let name_ids: AHashSet<StrId> = names.iter()
.filter_map(|n| self.interner.get_optional(n))
.collect();
let entities: Vec<&StoredEntity> = self
.entity_slots
.iter()
.filter_map(|s| {
s.as_ref()
.filter(|stored| stored.is_live() && name_ids.contains(&stored.name))
})
.collect();
let matched_names: AHashSet<StrId> = entities.iter().map(|e| e.name).collect();
let relations: Vec<&StoredRelation> = self
.relations
.iter()
.filter(|r| matched_names.contains(&r.from) || matched_names.contains(&r.to))
.collect();
GraphView { kg: self, entities, relations }
}
fn entity_to_output(&self, stored: &StoredEntity) -> Entity {
Entity {
name: self.interner.lookup(stored.name).to_string(),
entity_type: self.interner.lookup(stored.entity_type).to_string(),
observations: stored
.observations
.iter()
.map(|o| self.interner.lookup(*o).to_string())
.collect(),
}
}
#[inline]
fn relation_to_output(&self, r: &StoredRelation) -> Relation {
Relation {
from: self.interner.lookup(r.from).to_string(),
to: self.interner.lookup(r.to).to_string(),
relation_type: self.interner.lookup(r.relation_type).to_string(),
}
}
fn lookup_live_slot(&self, name: &str) -> Option<u32> {
let name_id = self.interner.get_optional(name)?;
let hash = self.interner.get_hash(name_id);
let slot = self.name_table.lookup(hash, name_id)?;
let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
stored.is_live().then_some(slot)
}
fn entity_by_name_id(&self, name_id: StrId) -> Option<Entity> {
let hash = self.interner.get_hash(name_id);
let slot = self.name_table.lookup(hash, name_id)?;
let stored = self.entity_slots.get(slot as usize)?.as_ref()?;
stored.is_live().then(|| self.entity_to_output(stored))
}
pub fn entity_type_counts(&self) -> Vec<(String, usize)> {
let mut counts: AHashMap<StrId, usize> = AHashMap::new();
for st in self
.entity_slots
.iter()
.filter_map(|s| s.as_ref())
.filter(|e| e.is_live())
{
*counts.entry(st.entity_type).or_insert(0) += 1;
}
self.rank_counts(counts)
}
pub fn relation_type_counts(&self) -> Vec<(String, usize)> {
let mut counts: AHashMap<StrId, usize> = AHashMap::new();
for r in &self.relations {
*counts.entry(r.relation_type).or_insert(0) += 1;
}
self.rank_counts(counts)
}
fn rank_counts(&self, counts: AHashMap<StrId, usize>) -> Vec<(String, usize)> {
let mut out: Vec<(String, usize)> = counts
.into_iter()
.map(|(id, c)| (self.interner.lookup(id).to_string(), c))
.collect();
out.sort_unstable_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
out
}
pub fn search_nodes_filtered(
&self,
query: &str,
entity_type: Option<&str>,
offset: usize,
limit: usize,
) -> KnowledgeGraphOut {
self.search_nodes_view(query, entity_type, offset, limit).to_owned_out()
}
pub fn search_nodes_view(
&self,
query: &str,
entity_type: Option<&str>,
offset: usize,
limit: usize,
) -> GraphView<'_> {
let type_id = match entity_type {
Some(t) => match self.interner.get_optional(t) {
Some(id) => Some(id),
None => return GraphView { kg: self, entities: Vec::new(), relations: Vec::new() },
},
None => None,
};
let ranked = self.search.search_ranked(query, &self.interner);
let mut selected: AHashSet<StrId> = AHashSet::new();
let mut entities: Vec<&StoredEntity> = Vec::new();
let mut skipped = 0usize;
for (slot, _score) in ranked {
let Some(st) = self
.entity_slots
.get(slot as usize)
.and_then(|s| s.as_ref())
.filter(|e| e.is_live())
else {
continue;
};
if type_id.is_some_and(|tid| st.entity_type != tid) {
continue;
}
if skipped < offset {
skipped += 1;
continue;
}
if entities.len() >= limit {
break;
}
selected.insert(st.name);
entities.push(st);
}
let relations: Vec<&StoredRelation> = self
.relations
.iter()
.filter(|r| selected.contains(&r.from) || selected.contains(&r.to))
.collect();
GraphView { kg: self, entities, relations }
}
pub fn read_graph_filtered(
&self,
entity_type: Option<&str>,
offset: usize,
limit: usize,
) -> KnowledgeGraphOut {
self.read_graph_filtered_view(entity_type, offset, limit).to_owned_out()
}
pub fn read_graph_filtered_view(
&self,
entity_type: Option<&str>,
offset: usize,
limit: usize,
) -> GraphView<'_> {
let type_id = match entity_type {
Some(t) => match self.interner.get_optional(t) {
Some(id) => Some(id),
None => return GraphView { kg: self, entities: Vec::new(), relations: Vec::new() },
},
None => None,
};
let mut selected: AHashSet<StrId> = AHashSet::new();
let mut entities: Vec<&StoredEntity> = Vec::new();
let mut skipped = 0usize;
for st in self
.entity_slots
.iter()
.filter_map(|s| s.as_ref())
.filter(|e| e.is_live())
{
if type_id.is_some_and(|tid| st.entity_type != tid) {
continue;
}
if skipped < offset {
skipped += 1;
continue;
}
if entities.len() >= limit {
break;
}
selected.insert(st.name);
entities.push(st);
}
let relations: Vec<&StoredRelation> = self
.relations
.iter()
.filter(|r| selected.contains(&r.from) && selected.contains(&r.to))
.collect();
GraphView { kg: self, entities, relations }
}
pub fn neighbors(
&self,
name: &str,
direction: Direction,
rtype: Option<&str>,
depth: u32,
) -> Result<KnowledgeGraphOut> {
self.lookup_live_slot(name)
.ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
let start = self.interner.get_optional(name).unwrap();
let rtype_id = match rtype {
Some(r) => match self.interner.get_optional(r) {
Some(id) => Some(id),
None => {
let entities = self.entity_by_name_id(start).into_iter().collect();
return Ok(KnowledgeGraphOut { entities, relations: Vec::new() });
}
},
None => None,
};
let mut visited: AHashSet<StrId> = AHashSet::new();
visited.insert(start);
let type_ok = |r: &StoredRelation| rtype_id.is_none_or(|rt| r.relation_type == rt);
if depth == 1 {
for r in self.relations.iter().filter(|r| type_ok(r)) {
match direction {
Direction::Out => {
if r.from == start {
visited.insert(r.to);
}
}
Direction::In => {
if r.to == start {
visited.insert(r.from);
}
}
Direction::Both => {
if r.from == start {
visited.insert(r.to);
} else if r.to == start {
visited.insert(r.from);
}
}
}
}
} else if depth >= 2 {
let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
for r in self.relations.iter().filter(|r| type_ok(r)) {
match direction {
Direction::Out => adj.entry(r.from).or_default().push(r.to),
Direction::In => adj.entry(r.to).or_default().push(r.from),
Direction::Both => {
adj.entry(r.from).or_default().push(r.to);
adj.entry(r.to).or_default().push(r.from);
}
}
}
let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
queue.push_back((start, 0));
while let Some((node, d)) = queue.pop_front() {
if d >= depth {
continue;
}
if let Some(nbrs) = adj.get(&node) {
for &nb in nbrs {
if visited.insert(nb) {
queue.push_back((nb, d + 1));
}
}
}
}
}
let mut entities = Vec::with_capacity(visited.len());
for &nid in &visited {
if let Some(e) = self.entity_by_name_id(nid) {
entities.push(e);
}
}
let relations = self
.relations
.iter()
.filter(|r| type_ok(r) && visited.contains(&r.from) && visited.contains(&r.to))
.map(|r| self.relation_to_output(r))
.collect();
Ok(KnowledgeGraphOut { entities, relations })
}
pub fn describe_entity(&self, name: &str) -> Result<serde_json::Value> {
let name_id = self
.interner
.get_optional(name)
.ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
let entity = self
.entity_by_name_id(name_id)
.ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))?;
let mut incident: Vec<Relation> = Vec::new();
let mut neighbor_seen: AHashSet<StrId> = AHashSet::new();
let mut neighbors: Vec<&str> = Vec::new();
for r in &self.relations {
if r.from == name_id || r.to == name_id {
incident.push(self.relation_to_output(r));
let other = if r.from == name_id { r.to } else { r.from };
if other != name_id && neighbor_seen.insert(other) {
neighbors.push(self.interner.lookup(other));
}
}
}
Ok(serde_json::json!({
"entity": entity,
"relations": incident,
"neighbors": neighbors,
"degree": incident.len(),
}))
}
pub fn upsert_entities(&mut self, entities: &[Entity]) -> Result<Vec<serde_json::Value>> {
for e in entities {
if e.name.is_empty() {
return Err(MCSError::InvalidParams(
"Entity name must not be empty".into(),
));
}
}
let mut out = Vec::with_capacity(entities.len());
for e in entities {
if self.lookup_live_slot(&e.name).is_some() {
let added = self.add_observations(&e.name, &e.observations)?;
out.push(serde_json::json!({
"name": e.name,
"created": false,
"addedObservations": added,
}));
} else {
let created = self.create_entities(std::slice::from_ref(e))?;
out.push(serde_json::json!({
"name": e.name,
"created": !created.is_empty(),
"addedObservations": e.observations,
}));
}
}
Ok(out)
}
pub fn export(&self, format: &str) -> Result<String> {
match format {
"json" => serde_json::to_string(&self.read_graph()).map_err(MCSError::JsonError),
"mermaid" => Ok(self.export_mermaid()),
"dot" => Ok(self.export_dot()),
other => Err(MCSError::InvalidParams(format!(
"Unknown export format '{other}' (expected json|mermaid|dot)"
))),
}
}
fn diagram_node_ids(&self) -> (AHashMap<StrId, usize>, Vec<(usize, StrId)>) {
let mut ids: AHashMap<StrId, usize> = AHashMap::new();
let mut order: Vec<(usize, StrId)> = Vec::new();
for st in self
.entity_slots
.iter()
.filter_map(|s| s.as_ref())
.filter(|e| e.is_live())
{
let n = ids.len();
ids.insert(st.name, n);
order.push((n, st.name));
}
(ids, order)
}
fn export_mermaid(&self) -> String {
let (ids, order) = self.diagram_node_ids();
let mut s = String::with_capacity(64 + order.len() * 32 + self.relations.len() * 32);
s.push_str("graph LR\n");
for (n, name_id) in &order {
let label = sanitize_label(self.interner.lookup(*name_id));
s.push_str(&format!(" n{n}[\"{label}\"]\n"));
}
for r in &self.relations {
if let (Some(&a), Some(&b)) = (ids.get(&r.from), ids.get(&r.to)) {
let rel = sanitize_label(self.interner.lookup(r.relation_type));
s.push_str(&format!(" n{a} -->|{rel}| n{b}\n"));
}
}
s
}
fn export_dot(&self) -> String {
let (ids, order) = self.diagram_node_ids();
let mut s = String::with_capacity(64 + order.len() * 32 + self.relations.len() * 32);
s.push_str("digraph G {\n");
for (n, name_id) in &order {
let label = sanitize_label(self.interner.lookup(*name_id));
s.push_str(&format!(" n{n} [label=\"{label}\"];\n"));
}
for r in &self.relations {
if let (Some(&a), Some(&b)) = (ids.get(&r.from), ids.get(&r.to)) {
let rel = sanitize_label(self.interner.lookup(r.relation_type));
s.push_str(&format!(" n{a} -> n{b} [label=\"{rel}\"];\n"));
}
}
s.push_str("}\n");
s
}
pub fn merge_entities(&mut self, source: &str, target: &str) -> Result<serde_json::Value> {
if source == target {
return Err(MCSError::InvalidParams(
"Source and target must be different entities".into(),
));
}
self.lookup_live_slot(source).ok_or_else(|| {
MCSError::InvalidParams(format!("Source entity '{source}' not found"))
})?;
self.lookup_live_slot(target).ok_or_else(|| {
MCSError::InvalidParams(format!("Target entity '{target}' not found"))
})?;
let source_entity = self.get_entity(source).unwrap();
let moved_obs_count = source_entity.observations.len();
let added_count = if !source_entity.observations.is_empty() {
self.add_observations(target, &source_entity.observations)?.len()
} else {
0
};
let source_id = self.interner.get_optional(source).unwrap();
let source_rels: Vec<Relation> = self
.relations
.iter()
.filter(|r| r.from == source_id || r.to == source_id)
.filter_map(|r| {
let new_from = if r.from == source_id {
target
} else {
self.interner.lookup(r.from)
};
let new_to = if r.to == source_id {
target
} else {
self.interner.lookup(r.to)
};
if new_from == new_to {
None
} else {
Some(Relation {
from: new_from.to_string(),
to: new_to.to_string(),
relation_type: self.interner.lookup(r.relation_type).to_string(),
})
}
})
.collect();
let redirected = self.create_relations(&source_rels)?.len() as u32;
self.delete_entities(&[source.to_string()])?;
Ok(serde_json::json!({
"source": source,
"target": target,
"movedObservations": moved_obs_count,
"addedObservations": added_count,
"redirectedRelations": redirected,
}))
}
pub fn extract_subgraph(&self, names: &[String], depth: u32) -> Result<KnowledgeGraphOut> {
if names.is_empty() {
return Ok(KnowledgeGraphOut {
entities: Vec::new(),
relations: Vec::new(),
});
}
let mut visited: AHashSet<StrId> = AHashSet::new();
let mut queue: VecDeque<(StrId, u32)> = VecDeque::new();
for name in names {
if let Some(id) = self.interner.get_optional(name) {
if visited.insert(id) {
queue.push_back((id, 0));
}
}
}
let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
for r in &self.relations {
adj.entry(r.from).or_default().push(r.to);
adj.entry(r.to).or_default().push(r.from);
}
while let Some((node, d)) = queue.pop_front() {
if d >= depth {
continue;
}
if let Some(nbrs) = adj.get(&node) {
for &nb in nbrs {
if visited.insert(nb) {
queue.push_back((nb, d + 1));
}
}
}
}
let mut entities: Vec<Entity> = Vec::with_capacity(visited.len());
for &nid in &visited {
if let Some(e) = self.entity_by_name_id(nid) {
entities.push(e);
}
}
let relations: Vec<Relation> = self
.relations
.iter()
.filter(|r| visited.contains(&r.from) && visited.contains(&r.to))
.map(|r| self.relation_to_output(r))
.collect();
Ok(KnowledgeGraphOut { entities, relations })
}
pub fn batch_get_entities(&self, names: &[String]) -> Vec<Option<Entity>> {
names.iter().map(|n| self.get_entity(n)).collect()
}
fn dfs_all_paths(
adj: &AHashMap<StrId, Vec<StrId>>,
current: StrId,
target: StrId,
max_depth: usize,
max_paths: usize,
visited: &mut AHashSet<StrId>,
current_path: &mut Vec<StrId>,
all_paths: &mut Vec<Vec<StrId>>,
) {
if all_paths.len() >= max_paths {
return;
}
if current == target && current_path.len() > 1 {
all_paths.push(current_path.clone());
return;
}
if current_path.len() > max_depth {
return;
}
if let Some(neighbors) = adj.get(¤t) {
for &nb in neighbors {
if visited.insert(nb) {
current_path.push(nb);
Self::dfs_all_paths(
adj, nb, target, max_depth, max_paths, visited, current_path, all_paths,
);
current_path.pop();
visited.remove(&nb);
}
}
}
}
pub fn find_all_paths(
&self,
from: &str,
to: &str,
max_depth: usize,
max_paths: usize,
) -> Result<Vec<Vec<String>>> {
let from_id = self
.interner
.get_optional(from)
.ok_or_else(|| MCSError::InvalidParams(format!("Entity '{from}' not found")))?;
let to_id = self
.interner
.get_optional(to)
.ok_or_else(|| MCSError::InvalidParams(format!("Entity '{to}' not found")))?;
if self.lookup_live_slot(from).is_none() {
return Err(MCSError::InvalidParams(format!("Entity '{from}' not found")));
}
if self.lookup_live_slot(to).is_none() {
return Err(MCSError::InvalidParams(format!("Entity '{to}' not found")));
}
if from_id == to_id {
return Ok(vec![vec![from.to_string()]]);
}
let mut adj: AHashMap<StrId, Vec<StrId>> = AHashMap::new();
for r in &self.relations {
adj.entry(r.from).or_default().push(r.to);
adj.entry(r.to).or_default().push(r.from);
}
let mut all_paths: Vec<Vec<StrId>> = Vec::new();
let mut current_path = Vec::new();
let mut visited: AHashSet<StrId> = AHashSet::new();
visited.insert(from_id);
current_path.push(from_id);
Self::dfs_all_paths(
&adj,
from_id,
to_id,
max_depth,
max_paths,
&mut visited,
&mut current_path,
&mut all_paths,
);
if all_paths.is_empty() {
return Err(MCSError::MemoryError(format!(
"No path found between '{from}' and '{to}'"
)));
}
let result: Vec<Vec<String>> = all_paths
.into_iter()
.map(|path| {
path.into_iter()
.map(|id| self.interner.lookup(id).to_string())
.collect()
})
.collect();
Ok(result)
}
pub fn flush_and_sync(&mut self) -> Result<()> {
self.store.flush_and_sync().map_err(MCSError::IoError)
}
}