use std::collections::HashSet;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use uuid::Uuid;
use khive_storage::types::{
DeleteMode, EdgeFilter, EdgeSortField, LinkId, PageRequest, SortOrder, TextDocument,
};
use khive_storage::{Edge, EdgeRelation, Entity, SubstrateKind};
use crate::error::{RuntimeError, RuntimeResult};
use crate::runtime::KhiveRuntime;
#[derive(Clone, Debug, Default)]
pub struct EntityPatch {
pub name: Option<String>,
pub description: Option<Option<String>>,
pub properties: Option<Value>,
pub tags: Option<Vec<String>>,
}
#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum MergeStrategy {
#[default]
PreferInto,
PreferFrom,
Union,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MergeSummary {
pub kept_id: Uuid,
pub removed_id: Uuid,
pub edges_rewired: usize,
pub properties_merged: usize,
pub tags_unioned: usize,
}
#[derive(Clone, Debug, Default)]
pub struct EdgeListFilter {
pub source_id: Option<Uuid>,
pub target_id: Option<Uuid>,
pub relations: Vec<EdgeRelation>,
pub min_weight: Option<f64>,
pub max_weight: Option<f64>,
}
impl From<EdgeListFilter> for EdgeFilter {
fn from(f: EdgeListFilter) -> Self {
EdgeFilter {
source_ids: f.source_id.into_iter().collect(),
target_ids: f.target_id.into_iter().collect(),
relations: f.relations,
min_weight: f.min_weight,
max_weight: f.max_weight,
..Default::default()
}
}
}
impl KhiveRuntime {
pub async fn update_entity(
&self,
namespace: Option<&str>,
id: Uuid,
patch: EntityPatch,
) -> RuntimeResult<Entity> {
let store = self.entities(namespace)?;
let mut entity = store
.get_entity(id)
.await?
.ok_or_else(|| RuntimeError::NotFound(format!("entity {id}")))?;
if entity.namespace != self.ns(namespace) {
return Err(RuntimeError::NotFound(format!("entity {id}")));
}
let mut text_changed = false;
if let Some(name) = patch.name {
text_changed |= entity.name != name;
entity.name = name;
}
if let Some(desc_patch) = patch.description {
text_changed |= entity.description != desc_patch;
entity.description = desc_patch;
}
if let Some(props) = patch.properties {
let (merged, _) =
merge_properties(&entity.properties, &Some(props), MergeStrategy::PreferFrom);
entity.properties = merged;
}
if let Some(tags) = patch.tags {
entity.tags = tags;
}
entity.updated_at = chrono::Utc::now().timestamp_micros();
store.upsert_entity(entity.clone()).await?;
if text_changed {
self.reindex_entity(namespace, &entity).await?;
}
Ok(entity)
}
pub async fn merge_entity(
&self,
namespace: Option<&str>,
into_id: Uuid,
from_id: Uuid,
strategy: MergeStrategy,
) -> RuntimeResult<MergeSummary> {
let store = self.entities(namespace)?;
let graph = self.graph(namespace)?;
let ns = self.ns(namespace);
let into_entity = store
.get_entity(into_id)
.await?
.ok_or_else(|| RuntimeError::NotFound(format!("entity {into_id}")))?;
if into_entity.namespace != ns {
return Err(RuntimeError::NotFound(format!("entity {into_id}")));
}
let from_entity = store
.get_entity(from_id)
.await?
.ok_or_else(|| RuntimeError::NotFound(format!("entity {from_id}")))?;
if from_entity.namespace != ns {
return Err(RuntimeError::NotFound(format!("entity {from_id}")));
}
const PAGE_SIZE: u32 = 1_000;
let sort = vec![SortOrder {
field: EdgeSortField::CreatedAt,
direction: khive_storage::types::SortDirection::Asc,
}];
let mut outbound: Vec<Edge> = Vec::new();
let mut offset: u64 = 0;
loop {
let page = graph
.query_edges(
EdgeFilter {
source_ids: vec![from_id],
..Default::default()
},
sort.clone(),
PageRequest {
offset,
limit: PAGE_SIZE,
},
)
.await?;
if page.items.is_empty() {
break;
}
offset += page.items.len() as u64;
outbound.extend(page.items);
}
let mut inbound: Vec<Edge> = Vec::new();
let mut offset: u64 = 0;
loop {
let page = graph
.query_edges(
EdgeFilter {
target_ids: vec![from_id],
..Default::default()
},
sort.clone(),
PageRequest {
offset,
limit: PAGE_SIZE,
},
)
.await?;
if page.items.is_empty() {
break;
}
offset += page.items.len() as u64;
inbound.extend(page.items);
}
let mut seen_edge_ids: std::collections::HashSet<LinkId> = std::collections::HashSet::new();
let mut all_edges: Vec<Edge> = Vec::new();
for edge in outbound.into_iter().chain(inbound.into_iter()) {
if seen_edge_ids.insert(edge.id) {
all_edges.push(edge);
}
}
let mut edges_rewired = 0usize;
for edge in all_edges {
let new_source = if edge.source_id == from_id {
into_id
} else {
edge.source_id
};
let new_target = if edge.target_id == from_id {
into_id
} else {
edge.target_id
};
if new_source == new_target {
graph.delete_edge(edge.id).await?;
continue;
}
let rewired = Edge {
source_id: new_source,
target_id: new_target,
..edge
};
graph.upsert_edge(rewired).await?;
edges_rewired += 1;
}
let (merged_props, properties_merged) =
merge_properties(&into_entity.properties, &from_entity.properties, strategy);
let merged_name = merge_string_field(&into_entity.name, &from_entity.name, strategy);
let merged_description =
merge_option_string_field(&into_entity.description, &from_entity.description, strategy);
let (merged_tags, tags_unioned) = union_tags(&into_entity.tags, &from_entity.tags);
let mut updated_into = into_entity;
updated_into.name = merged_name;
updated_into.description = merged_description;
updated_into.properties = merged_props;
updated_into.tags = merged_tags;
updated_into.updated_at = chrono::Utc::now().timestamp_micros();
store.upsert_entity(updated_into.clone()).await?;
self.reindex_entity(namespace, &updated_into).await?;
store.delete_entity(from_id, DeleteMode::Hard).await?;
self.remove_from_indexes(namespace, from_id).await?;
Ok(MergeSummary {
kept_id: into_id,
removed_id: from_id,
edges_rewired,
properties_merged,
tags_unioned,
})
}
pub(crate) async fn reindex_entity(
&self,
namespace: Option<&str>,
entity: &Entity,
) -> RuntimeResult<()> {
let body = match &entity.description {
Some(d) if !d.is_empty() => format!("{} {}", entity.name, d),
_ => entity.name.clone(),
};
let ns = entity.namespace.clone();
self.text(namespace)?
.upsert_document(TextDocument {
subject_id: entity.id,
kind: SubstrateKind::Entity,
title: Some(entity.name.clone()),
body: body.clone(),
tags: entity.tags.clone(),
namespace: ns.clone(),
metadata: entity.properties.clone(),
updated_at: chrono::Utc::now(),
})
.await?;
if self.config().embedding_model.is_some() {
let vector = self.embed(&body).await?;
self.vectors(namespace)?
.insert(entity.id, SubstrateKind::Entity, &ns, vector)
.await?;
}
Ok(())
}
pub(crate) async fn remove_from_indexes(
&self,
namespace: Option<&str>,
id: Uuid,
) -> RuntimeResult<()> {
let ns = self.ns(namespace).to_string();
self.text(namespace)?.delete_document(&ns, id).await?;
if self.config().embedding_model.is_some() {
self.vectors(namespace)?.delete(id).await?;
}
Ok(())
}
}
fn merge_string_field(into: &str, from: &str, strategy: MergeStrategy) -> String {
match strategy {
MergeStrategy::PreferInto | MergeStrategy::Union => into.to_string(),
MergeStrategy::PreferFrom => from.to_string(),
}
}
fn merge_option_string_field(
into: &Option<String>,
from: &Option<String>,
strategy: MergeStrategy,
) -> Option<String> {
match strategy {
MergeStrategy::PreferInto => {
if into.is_some() {
into.clone()
} else {
from.clone()
}
}
MergeStrategy::PreferFrom => {
if from.is_some() {
from.clone()
} else {
into.clone()
}
}
MergeStrategy::Union => {
match (into, from) {
(Some(a), _) if !a.is_empty() => Some(a.clone()),
(_, Some(b)) => Some(b.clone()),
_ => None,
}
}
}
}
fn merge_properties(
into: &Option<Value>,
from: &Option<Value>,
strategy: MergeStrategy,
) -> (Option<Value>, usize) {
match (into, from) {
(None, None) => (None, 0),
(Some(a), None) => (Some(a.clone()), 0),
(None, Some(b)) => {
let count = if let Value::Object(m) = b { m.len() } else { 1 };
(Some(b.clone()), count)
}
(Some(into_val), Some(from_val)) => {
let (merged, added) = merge_json(into_val, from_val, strategy);
(Some(merged), added)
}
}
}
fn merge_json(into: &Value, from: &Value, strategy: MergeStrategy) -> (Value, usize) {
match (into, from, strategy) {
(Value::Object(a), Value::Object(b), MergeStrategy::Union) => {
let mut result = a.clone();
let mut added = 0usize;
for (k, v_from) in b {
if let Some(v_into) = a.get(k) {
let (merged, sub_added) = merge_json(v_into, v_from, MergeStrategy::Union);
result.insert(k.clone(), merged);
added += sub_added;
} else {
result.insert(k.clone(), v_from.clone());
added += 1;
}
}
(Value::Object(result), added)
}
(Value::Object(a), Value::Object(b), MergeStrategy::PreferInto) => {
let mut result = a.clone();
let mut added = 0usize;
for (k, v) in b {
if !a.contains_key(k) {
result.insert(k.clone(), v.clone());
added += 1;
}
}
(Value::Object(result), added)
}
(Value::Object(a), Value::Object(b), MergeStrategy::PreferFrom) => {
let mut result = a.clone();
let mut added = 0usize;
for (k, v) in b {
result.insert(k.clone(), v.clone());
if !a.contains_key(k) {
added += 1;
}
}
(Value::Object(result), added)
}
(_into_val, from_val, MergeStrategy::PreferFrom) => (from_val.clone(), 1),
_ => (into.clone(), 0),
}
}
fn union_tags(into: &[String], from: &[String]) -> (Vec<String>, usize) {
let mut seen: HashSet<&str> = into.iter().map(|s| s.as_str()).collect();
let mut result: Vec<String> = into.to_vec();
let mut added = 0usize;
for tag in from {
if seen.insert(tag.as_str()) {
result.push(tag.clone());
added += 1;
}
}
(result, added)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::runtime::KhiveRuntime;
use khive_storage::types::{Direction, TextFilter, TextQueryMode, TextSearchRequest};
fn rt() -> KhiveRuntime {
KhiveRuntime::memory().unwrap()
}
async fn fts_hit(rt: &KhiveRuntime, namespace: Option<&str>, query: &str) -> Vec<Uuid> {
let ns = rt.ns(namespace).to_string();
rt.text(namespace)
.unwrap()
.search(TextSearchRequest {
query: query.to_string(),
mode: TextQueryMode::Plain,
filter: Some(TextFilter {
namespaces: vec![ns],
..Default::default()
}),
top_k: 50,
snippet_chars: 100,
})
.await
.unwrap()
.into_iter()
.map(|h| h.subject_id)
.collect()
}
#[tokio::test]
async fn update_entity_patch_changes_only_specified_fields() {
let rt = rt();
let entity = rt
.create_entity(
None,
"concept",
"OriginalName",
Some("orig desc"),
Some(serde_json::json!({"k":"v"})),
vec![],
)
.await
.unwrap();
let updated = rt
.update_entity(
None,
entity.id,
EntityPatch {
description: Some(Some("new desc".to_string())),
..Default::default()
},
)
.await
.unwrap();
assert_eq!(updated.name, "OriginalName");
assert_eq!(updated.description.as_deref(), Some("new desc"));
assert_eq!(updated.properties, Some(serde_json::json!({"k":"v"})));
}
#[tokio::test]
async fn update_entity_clear_description_with_some_none() {
let rt = rt();
let entity = rt
.create_entity(
None,
"concept",
"ClearDesc",
Some("has description"),
None,
vec![],
)
.await
.unwrap();
let updated = rt
.update_entity(
None,
entity.id,
EntityPatch {
description: Some(None),
..Default::default()
},
)
.await
.unwrap();
assert!(
updated.description.is_none(),
"description should be cleared"
);
}
#[tokio::test]
async fn update_entity_reindexes_when_name_changes() {
let rt = rt();
let entity = rt
.create_entity(None, "concept", "OldName", None, None, vec![])
.await
.unwrap();
let hits_before = fts_hit(&rt, None, "OldName").await;
assert!(
hits_before.contains(&entity.id),
"entity should be findable by old name"
);
rt.update_entity(
None,
entity.id,
EntityPatch {
name: Some("NewName".to_string()),
..Default::default()
},
)
.await
.unwrap();
let hits_old = fts_hit(&rt, None, "OldName").await;
let hits_new = fts_hit(&rt, None, "NewName").await;
assert!(
!hits_old.contains(&entity.id),
"old name should no longer match after rename"
);
assert!(
hits_new.contains(&entity.id),
"new name should be findable after rename"
);
}
#[tokio::test]
async fn update_entity_properties_merges_preserving_existing_keys() {
let rt = rt();
let entity = rt
.create_entity(
None,
"concept",
"MergeProps",
None,
Some(serde_json::json!({
"domain": "inference",
"repo": "lattice",
"status": "researched",
})),
vec![],
)
.await
.unwrap();
let updated = rt
.update_entity(
None,
entity.id,
EntityPatch {
properties: Some(serde_json::json!({"status": "implemented"})),
..Default::default()
},
)
.await
.unwrap();
let props = updated.properties.expect("properties should remain set");
assert_eq!(props["domain"], "inference", "domain key must be preserved");
assert_eq!(props["repo"], "lattice", "repo key must be preserved");
assert_eq!(
props["status"], "implemented",
"status key must be updated by patch"
);
}
#[tokio::test]
async fn update_entity_skips_reindex_when_only_properties_change() {
let rt = rt();
let entity = rt
.create_entity(None, "concept", "StableIndexed", None, None, vec![])
.await
.unwrap();
let hits_before = fts_hit(&rt, None, "StableIndexed").await;
assert!(hits_before.contains(&entity.id));
rt.update_entity(
None,
entity.id,
EntityPatch {
properties: Some(serde_json::json!({"new": "prop"})),
..Default::default()
},
)
.await
.unwrap();
let hits_after = fts_hit(&rt, None, "StableIndexed").await;
assert!(
hits_after.contains(&entity.id),
"still findable after props-only patch"
);
}
#[tokio::test]
async fn merge_entity_rewires_edges() {
let rt = rt();
let a = rt
.create_entity(None, "concept", "A", None, None, vec![])
.await
.unwrap();
let b = rt
.create_entity(None, "concept", "B", None, None, vec![])
.await
.unwrap();
let c = rt
.create_entity(None, "concept", "C", None, None, vec![])
.await
.unwrap();
let d = rt
.create_entity(None, "concept", "D", None, None, vec![])
.await
.unwrap();
rt.link(None, a.id, b.id, EdgeRelation::Extends, 1.0)
.await
.unwrap();
rt.link(None, c.id, b.id, EdgeRelation::Extends, 1.0)
.await
.unwrap();
let summary = rt
.merge_entity(None, d.id, b.id, MergeStrategy::PreferInto)
.await
.unwrap();
assert_eq!(summary.kept_id, d.id);
assert_eq!(summary.removed_id, b.id);
assert_eq!(summary.edges_rewired, 2);
let a_neighbors = rt
.neighbors(None, a.id, Direction::Out, None, None)
.await
.unwrap();
assert_eq!(a_neighbors.len(), 1);
assert_eq!(a_neighbors[0].node_id, d.id);
let c_neighbors = rt
.neighbors(None, c.id, Direction::Out, None, None)
.await
.unwrap();
assert_eq!(c_neighbors.len(), 1);
assert_eq!(c_neighbors[0].node_id, d.id);
}
#[tokio::test]
async fn merge_entity_prefer_into_strategy() {
let rt = rt();
let into = rt
.create_entity(
None,
"concept",
"Into",
None,
Some(serde_json::json!({"a": 1})),
vec![],
)
.await
.unwrap();
let from = rt
.create_entity(
None,
"concept",
"From",
None,
Some(serde_json::json!({"a": 2, "b": 3})),
vec![],
)
.await
.unwrap();
rt.merge_entity(None, into.id, from.id, MergeStrategy::PreferInto)
.await
.unwrap();
let kept = rt.get_entity(None, into.id).await.unwrap().unwrap();
let props = kept.properties.unwrap();
assert_eq!(props["a"], 1);
assert_eq!(props["b"], 3);
}
#[tokio::test]
async fn merge_entity_prefer_from_strategy() {
let rt = rt();
let into = rt
.create_entity(
None,
"concept",
"Into",
None,
Some(serde_json::json!({"a": 1})),
vec![],
)
.await
.unwrap();
let from = rt
.create_entity(
None,
"concept",
"From",
None,
Some(serde_json::json!({"a": 2, "b": 3})),
vec![],
)
.await
.unwrap();
rt.merge_entity(None, into.id, from.id, MergeStrategy::PreferFrom)
.await
.unwrap();
let kept = rt.get_entity(None, into.id).await.unwrap().unwrap();
let props = kept.properties.unwrap();
assert_eq!(props["a"], 2);
assert_eq!(props["b"], 3);
}
#[tokio::test]
async fn merge_entity_union_strategy() {
let rt = rt();
let into = rt
.create_entity(
None,
"concept",
"Into",
None,
Some(serde_json::json!({"a": 1})),
vec![],
)
.await
.unwrap();
let from = rt
.create_entity(
None,
"concept",
"From",
None,
Some(serde_json::json!({"a": 2, "b": 3})),
vec![],
)
.await
.unwrap();
rt.merge_entity(None, into.id, from.id, MergeStrategy::Union)
.await
.unwrap();
let kept = rt.get_entity(None, into.id).await.unwrap().unwrap();
let props = kept.properties.unwrap();
assert_eq!(props["a"], 1);
assert_eq!(props["b"], 3);
}
#[tokio::test]
async fn merge_entity_unions_tags() {
let rt = rt();
let into = rt
.create_entity(
None,
"concept",
"Into",
None,
None,
vec!["x".to_string(), "y".to_string()],
)
.await
.unwrap();
let from = rt
.create_entity(
None,
"concept",
"From",
None,
None,
vec!["y".to_string(), "z".to_string()],
)
.await
.unwrap();
rt.merge_entity(None, into.id, from.id, MergeStrategy::PreferInto)
.await
.unwrap();
let kept = rt.get_entity(None, into.id).await.unwrap().unwrap();
let mut tags = kept.tags.clone();
tags.sort();
assert_eq!(tags, vec!["x", "y", "z"]);
}
#[tokio::test]
async fn merge_entity_drops_self_loops() {
let rt = rt();
let a = rt
.create_entity(None, "concept", "A", None, None, vec![])
.await
.unwrap();
let b = rt
.create_entity(None, "concept", "B", None, None, vec![])
.await
.unwrap();
rt.link(None, a.id, b.id, EdgeRelation::Extends, 1.0)
.await
.unwrap();
let summary = rt
.merge_entity(None, a.id, b.id, MergeStrategy::PreferInto)
.await
.unwrap();
assert_eq!(
summary.edges_rewired, 0,
"self-loop should be dropped, not rewired"
);
let a_out = rt
.neighbors(None, a.id, Direction::Out, None, None)
.await
.unwrap();
assert!(a_out.is_empty(), "no self-loop should remain");
}
#[test]
fn union_tags_deduplicates() {
let (tags, added) = union_tags(
&["x".to_string(), "y".to_string()],
&["y".to_string(), "z".to_string()],
);
let mut sorted = tags.clone();
sorted.sort();
assert_eq!(sorted, vec!["x", "y", "z"]);
assert_eq!(added, 1);
}
#[test]
fn merge_properties_prefer_into_fills_missing_keys() {
let a = serde_json::json!({"a": 1});
let b = serde_json::json!({"a": 99, "b": 2});
let (merged, added) = merge_properties(&Some(a), &Some(b), MergeStrategy::PreferInto);
let m = merged.unwrap();
assert_eq!(m["a"], 1);
assert_eq!(m["b"], 2);
assert_eq!(added, 1);
}
}