use std::collections::HashSet;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use uuid::Uuid;
use khive_db::SqliteError;
use khive_storage::types::{EdgeFilter, TextDocument};
use khive_storage::{EdgeRelation, Entity, SubstrateKind};
use crate::error::{RuntimeError, RuntimeResult};
use crate::runtime::KhiveRuntime;
#[derive(Clone, Debug, Default)]
pub struct EntityPatch {
pub name: Option<String>,
pub description: Option<Option<String>>,
pub properties: Option<Value>,
pub tags: Option<Vec<String>>,
}
#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum MergeStrategy {
#[default]
PreferInto,
PreferFrom,
Union,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MergeSummary {
pub kept_id: Uuid,
pub removed_id: Uuid,
pub edges_rewired: usize,
pub properties_merged: usize,
pub tags_unioned: usize,
}
#[derive(Clone, Debug, Default)]
pub struct EdgeListFilter {
pub source_id: Option<Uuid>,
pub target_id: Option<Uuid>,
pub relations: Vec<EdgeRelation>,
pub min_weight: Option<f64>,
pub max_weight: Option<f64>,
}
impl From<EdgeListFilter> for EdgeFilter {
fn from(f: EdgeListFilter) -> Self {
EdgeFilter {
source_ids: f.source_id.into_iter().collect(),
target_ids: f.target_id.into_iter().collect(),
relations: f.relations,
min_weight: f.min_weight,
max_weight: f.max_weight,
..Default::default()
}
}
}
impl KhiveRuntime {
pub async fn update_entity(
&self,
namespace: Option<&str>,
id: Uuid,
patch: EntityPatch,
) -> RuntimeResult<Entity> {
let store = self.entities(namespace)?;
let mut entity = store
.get_entity(id)
.await?
.ok_or_else(|| RuntimeError::NotFound(format!("entity {id}")))?;
if entity.namespace != self.ns(namespace) {
return Err(RuntimeError::NotFound(format!("entity {id}")));
}
let mut text_changed = false;
if let Some(name) = patch.name {
text_changed |= entity.name != name;
entity.name = name;
}
if let Some(desc_patch) = patch.description {
text_changed |= entity.description != desc_patch;
entity.description = desc_patch;
}
if let Some(props) = patch.properties {
let (merged, _) =
merge_properties(&entity.properties, &Some(props), MergeStrategy::PreferFrom);
entity.properties = merged;
}
if let Some(tags) = patch.tags {
entity.tags = tags;
}
entity.updated_at = chrono::Utc::now().timestamp_micros();
store.upsert_entity(entity.clone()).await?;
if text_changed {
self.reindex_entity(namespace, &entity).await?;
}
Ok(entity)
}
pub async fn merge_entity(
&self,
namespace: Option<&str>,
into_id: Uuid,
from_id: Uuid,
strategy: MergeStrategy,
) -> RuntimeResult<MergeSummary> {
let ns = self.ns(namespace).to_string();
let sanitized_ns: String = ns
.chars()
.map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
.collect();
let fts_table = format!("fts_entities_{}", sanitized_ns);
let vec_table = self.config().embedding_model.map(|model| {
let key: String = model
.to_string()
.chars()
.map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
.collect();
format!("vec_{}", key)
});
let _ = self.entities(namespace)?;
let _ = self.graph(namespace)?;
let _ = self.text(namespace)?;
if self.config().embedding_model.is_some() {
let _ = self.vectors(namespace)?;
}
let pool = self.backend().pool_arc();
let (summary, updated_entity) = tokio::task::spawn_blocking(move || {
let guard = pool.writer()?;
guard.transaction(|conn| {
merge_entity_sql(conn, ns, fts_table, vec_table, into_id, from_id, strategy)
})
})
.await
.map_err(|e| RuntimeError::Internal(e.to_string()))??;
if self.config().embedding_model.is_some() {
self.reindex_entity(namespace, &updated_entity).await?;
}
Ok(summary)
}
pub(crate) async fn reindex_entity(
&self,
namespace: Option<&str>,
entity: &Entity,
) -> RuntimeResult<()> {
let body = match &entity.description {
Some(d) if !d.is_empty() => format!("{} {}", entity.name, d),
_ => entity.name.clone(),
};
let ns = entity.namespace.clone();
self.text(namespace)?
.upsert_document(TextDocument {
subject_id: entity.id,
kind: SubstrateKind::Entity,
title: Some(entity.name.clone()),
body: body.clone(),
tags: entity.tags.clone(),
namespace: ns.clone(),
metadata: entity.properties.clone(),
updated_at: chrono::Utc::now(),
})
.await?;
if self.config().embedding_model.is_some() {
let vector = self.embed(&body).await?;
self.vectors(namespace)?
.insert(entity.id, SubstrateKind::Entity, &ns, vector)
.await?;
}
Ok(())
}
pub(crate) async fn remove_from_indexes(
&self,
namespace: Option<&str>,
id: Uuid,
) -> RuntimeResult<()> {
let ns = self.ns(namespace).to_string();
self.text(namespace)?.delete_document(&ns, id).await?;
if self.config().embedding_model.is_some() {
self.vectors(namespace)?.delete(id).await?;
}
Ok(())
}
}
fn read_merge_entity(
conn: &rusqlite::Connection,
id: Uuid,
namespace: &str,
) -> Result<Entity, SqliteError> {
let id_str = id.to_string();
let mut stmt = conn.prepare(
"SELECT id, namespace, kind, name, description, properties, tags, \
created_at, updated_at, deleted_at \
FROM entities WHERE id = ?1 AND deleted_at IS NULL",
)?;
let mut rows = stmt.query(rusqlite::params![id_str])?;
let row = rows
.next()?
.ok_or_else(|| SqliteError::InvalidData(format!("entity {id} not found")))?;
let id_s: String = row.get(0)?;
let ns: String = row.get(1)?;
let kind: String = row.get(2)?;
let name: String = row.get(3)?;
let description: Option<String> = row.get(4)?;
let properties_str: Option<String> = row.get(5)?;
let tags_str: String = row.get(6)?;
let created_at: i64 = row.get(7)?;
let updated_at: i64 = row.get(8)?;
let deleted_at: Option<i64> = row.get(9)?;
if ns != namespace {
return Err(SqliteError::InvalidData(format!(
"entity {id} belongs to namespace '{ns}', not '{namespace}'"
)));
}
let entity_id = Uuid::parse_str(&id_s).map_err(|e| SqliteError::InvalidData(e.to_string()))?;
let properties: Option<Value> = properties_str
.map(|s| {
serde_json::from_str::<Value>(&s).map_err(|e| SqliteError::InvalidData(e.to_string()))
})
.transpose()?;
let tags: Vec<String> =
serde_json::from_str(&tags_str).map_err(|e| SqliteError::InvalidData(e.to_string()))?;
Ok(Entity {
id: entity_id,
namespace: ns,
kind,
name,
description,
properties,
tags,
created_at,
updated_at,
deleted_at,
})
}
fn merge_entity_sql(
conn: &rusqlite::Connection,
namespace: String,
fts_table: String,
vec_table: Option<String>,
into_id: Uuid,
from_id: Uuid,
strategy: MergeStrategy,
) -> Result<(MergeSummary, Entity), SqliteError> {
let into_entity = read_merge_entity(conn, into_id, &namespace)?;
let from_entity = read_merge_entity(conn, from_id, &namespace)?;
struct EdgeRow {
id: Uuid,
source_id: Uuid,
target_id: Uuid,
relation: String,
weight: f64,
created_at: i64,
metadata: Option<String>,
}
let parse_id =
|s: String| Uuid::parse_str(&s).map_err(|e| SqliteError::InvalidData(e.to_string()));
let from_str = from_id.to_string();
let mut outbound: Vec<EdgeRow> = Vec::new();
{
let mut stmt = conn.prepare(
"SELECT id, source_id, target_id, relation, weight, created_at, metadata \
FROM graph_edges WHERE namespace = ?1 AND source_id = ?2",
)?;
let mut rows = stmt.query(rusqlite::params![&namespace, &from_str])?;
while let Some(row) = rows.next()? {
outbound.push(EdgeRow {
id: parse_id(row.get(0)?)?,
source_id: parse_id(row.get(1)?)?,
target_id: parse_id(row.get(2)?)?,
relation: row.get(3)?,
weight: row.get(4)?,
created_at: row.get(5)?,
metadata: row.get(6)?,
});
}
}
let mut inbound: Vec<EdgeRow> = Vec::new();
{
let mut stmt = conn.prepare(
"SELECT id, source_id, target_id, relation, weight, created_at, metadata \
FROM graph_edges WHERE namespace = ?1 AND target_id = ?2",
)?;
let mut rows = stmt.query(rusqlite::params![&namespace, &from_str])?;
while let Some(row) = rows.next()? {
inbound.push(EdgeRow {
id: parse_id(row.get(0)?)?,
source_id: parse_id(row.get(1)?)?,
target_id: parse_id(row.get(2)?)?,
relation: row.get(3)?,
weight: row.get(4)?,
created_at: row.get(5)?,
metadata: row.get(6)?,
});
}
}
let mut seen: HashSet<Uuid> = HashSet::new();
let mut all_edges: Vec<EdgeRow> = Vec::new();
for edge in outbound.into_iter().chain(inbound) {
if seen.insert(edge.id) {
all_edges.push(edge);
}
}
let mut edges_rewired = 0usize;
for edge in all_edges {
let new_src = if edge.source_id == from_id {
into_id
} else {
edge.source_id
};
let new_tgt = if edge.target_id == from_id {
into_id
} else {
edge.target_id
};
if new_src == new_tgt {
conn.execute(
"DELETE FROM graph_edges WHERE namespace = ?1 AND id = ?2",
rusqlite::params![&namespace, edge.id.to_string()],
)?;
continue;
}
conn.execute(
"INSERT INTO graph_edges \
(namespace, id, source_id, target_id, relation, weight, created_at, metadata) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8) \
ON CONFLICT(namespace, id) DO UPDATE SET \
source_id = excluded.source_id, \
target_id = excluded.target_id, \
relation = excluded.relation, \
weight = excluded.weight, \
created_at = excluded.created_at, \
metadata = excluded.metadata \
ON CONFLICT(namespace, source_id, target_id, relation) DO NOTHING",
rusqlite::params![
&namespace,
edge.id.to_string(),
new_src.to_string(),
new_tgt.to_string(),
&edge.relation,
edge.weight,
edge.created_at,
edge.metadata,
],
)?;
edges_rewired += 1;
}
let (merged_props, properties_merged) =
merge_properties(&into_entity.properties, &from_entity.properties, strategy);
let merged_name = merge_string_field(&into_entity.name, &from_entity.name, strategy);
let merged_description =
merge_option_string_field(&into_entity.description, &from_entity.description, strategy);
let (merged_tags, tags_unioned) = union_tags(&into_entity.tags, &from_entity.tags);
let now = chrono::Utc::now().timestamp_micros();
let into_str = into_id.to_string();
let props_str = merged_props
.as_ref()
.map(|v| serde_json::to_string(v).unwrap_or_default());
let tags_json = serde_json::to_string(&merged_tags).unwrap_or_else(|_| "[]".to_string());
conn.execute(
"INSERT OR REPLACE INTO entities \
(id, namespace, kind, name, description, properties, tags, \
created_at, updated_at, deleted_at) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
rusqlite::params![
&into_str,
&namespace,
&into_entity.kind,
&merged_name,
&merged_description,
&props_str,
&tags_json,
into_entity.created_at,
now,
into_entity.deleted_at,
],
)?;
let fts_body = match &merged_description {
Some(d) if !d.is_empty() => format!("{} {}", merged_name, d),
_ => merged_name.clone(),
};
let kind_str = SubstrateKind::Entity.to_string();
conn.execute(
&format!(
"DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
fts_table
),
rusqlite::params![&namespace, &into_str],
)?;
conn.execute(
&format!(
"INSERT INTO {} \
(subject_id, kind, title, body, tags, namespace, metadata, updated_at) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
fts_table
),
rusqlite::params![
&into_str,
&kind_str,
&merged_name,
&fts_body,
&tags_json,
&namespace,
&props_str,
now,
],
)?;
conn.execute(
&format!(
"DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
fts_table
),
rusqlite::params![&namespace, &from_str],
)?;
if let Some(ref vec_tbl) = vec_table {
conn.execute(
&format!(
"DELETE FROM {} WHERE subject_id = ?1 AND namespace = ?2",
vec_tbl
),
rusqlite::params![&from_str, &namespace],
)?;
}
conn.execute(
"DELETE FROM entities WHERE id = ?1",
rusqlite::params![&from_str],
)?;
let updated_entity = Entity {
id: into_id,
namespace,
kind: into_entity.kind,
name: merged_name,
description: merged_description,
properties: merged_props,
tags: merged_tags,
created_at: into_entity.created_at,
updated_at: now,
deleted_at: into_entity.deleted_at,
};
Ok((
MergeSummary {
kept_id: into_id,
removed_id: from_id,
edges_rewired,
properties_merged,
tags_unioned,
},
updated_entity,
))
}
fn merge_string_field(into: &str, from: &str, strategy: MergeStrategy) -> String {
match strategy {
MergeStrategy::PreferInto | MergeStrategy::Union => into.to_string(),
MergeStrategy::PreferFrom => from.to_string(),
}
}
fn merge_option_string_field(
into: &Option<String>,
from: &Option<String>,
strategy: MergeStrategy,
) -> Option<String> {
match strategy {
MergeStrategy::PreferInto => {
if into.is_some() {
into.clone()
} else {
from.clone()
}
}
MergeStrategy::PreferFrom => {
if from.is_some() {
from.clone()
} else {
into.clone()
}
}
MergeStrategy::Union => {
match (into, from) {
(Some(a), _) if !a.is_empty() => Some(a.clone()),
(_, Some(b)) => Some(b.clone()),
_ => None,
}
}
}
}
fn merge_properties(
into: &Option<Value>,
from: &Option<Value>,
strategy: MergeStrategy,
) -> (Option<Value>, usize) {
match (into, from) {
(None, None) => (None, 0),
(Some(a), None) => (Some(a.clone()), 0),
(None, Some(b)) => {
let count = if let Value::Object(m) = b { m.len() } else { 1 };
(Some(b.clone()), count)
}
(Some(into_val), Some(from_val)) => {
let (merged, added) = merge_json(into_val, from_val, strategy);
(Some(merged), added)
}
}
}
fn merge_json(into: &Value, from: &Value, strategy: MergeStrategy) -> (Value, usize) {
match (into, from, strategy) {
(Value::Object(a), Value::Object(b), MergeStrategy::Union) => {
let mut result = a.clone();
let mut added = 0usize;
for (k, v_from) in b {
if let Some(v_into) = a.get(k) {
let (merged, sub_added) = merge_json(v_into, v_from, MergeStrategy::Union);
result.insert(k.clone(), merged);
added += sub_added;
} else {
result.insert(k.clone(), v_from.clone());
added += 1;
}
}
(Value::Object(result), added)
}
(Value::Object(a), Value::Object(b), MergeStrategy::PreferInto) => {
let mut result = a.clone();
let mut added = 0usize;
for (k, v) in b {
if !a.contains_key(k) {
result.insert(k.clone(), v.clone());
added += 1;
}
}
(Value::Object(result), added)
}
(Value::Object(a), Value::Object(b), MergeStrategy::PreferFrom) => {
let mut result = a.clone();
let mut added = 0usize;
for (k, v) in b {
result.insert(k.clone(), v.clone());
if !a.contains_key(k) {
added += 1;
}
}
(Value::Object(result), added)
}
(_into_val, from_val, MergeStrategy::PreferFrom) => (from_val.clone(), 1),
_ => (into.clone(), 0),
}
}
fn union_tags(into: &[String], from: &[String]) -> (Vec<String>, usize) {
let mut seen: HashSet<&str> = into.iter().map(|s| s.as_str()).collect();
let mut result: Vec<String> = into.to_vec();
let mut added = 0usize;
for tag in from {
if seen.insert(tag.as_str()) {
result.push(tag.clone());
added += 1;
}
}
(result, added)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::runtime::KhiveRuntime;
use khive_storage::types::{Direction, TextFilter, TextQueryMode, TextSearchRequest};
fn rt() -> KhiveRuntime {
KhiveRuntime::memory().unwrap()
}
async fn fts_hit(rt: &KhiveRuntime, namespace: Option<&str>, query: &str) -> Vec<Uuid> {
let ns = rt.ns(namespace).to_string();
rt.text(namespace)
.unwrap()
.search(TextSearchRequest {
query: query.to_string(),
mode: TextQueryMode::Plain,
filter: Some(TextFilter {
namespaces: vec![ns],
..Default::default()
}),
top_k: 50,
snippet_chars: 100,
})
.await
.unwrap()
.into_iter()
.map(|h| h.subject_id)
.collect()
}
#[tokio::test]
async fn update_entity_patch_changes_only_specified_fields() {
let rt = rt();
let entity = rt
.create_entity(
None,
"concept",
"OriginalName",
Some("orig desc"),
Some(serde_json::json!({"k":"v"})),
vec![],
)
.await
.unwrap();
let updated = rt
.update_entity(
None,
entity.id,
EntityPatch {
description: Some(Some("new desc".to_string())),
..Default::default()
},
)
.await
.unwrap();
assert_eq!(updated.name, "OriginalName");
assert_eq!(updated.description.as_deref(), Some("new desc"));
assert_eq!(updated.properties, Some(serde_json::json!({"k":"v"})));
}
#[tokio::test]
async fn update_entity_clear_description_with_some_none() {
let rt = rt();
let entity = rt
.create_entity(
None,
"concept",
"ClearDesc",
Some("has description"),
None,
vec![],
)
.await
.unwrap();
let updated = rt
.update_entity(
None,
entity.id,
EntityPatch {
description: Some(None),
..Default::default()
},
)
.await
.unwrap();
assert!(
updated.description.is_none(),
"description should be cleared"
);
}
#[tokio::test]
async fn update_entity_reindexes_when_name_changes() {
let rt = rt();
let entity = rt
.create_entity(None, "concept", "OldName", None, None, vec![])
.await
.unwrap();
let hits_before = fts_hit(&rt, None, "OldName").await;
assert!(
hits_before.contains(&entity.id),
"entity should be findable by old name"
);
rt.update_entity(
None,
entity.id,
EntityPatch {
name: Some("NewName".to_string()),
..Default::default()
},
)
.await
.unwrap();
let hits_old = fts_hit(&rt, None, "OldName").await;
let hits_new = fts_hit(&rt, None, "NewName").await;
assert!(
!hits_old.contains(&entity.id),
"old name should no longer match after rename"
);
assert!(
hits_new.contains(&entity.id),
"new name should be findable after rename"
);
}
#[tokio::test]
async fn update_entity_properties_merges_preserving_existing_keys() {
let rt = rt();
let entity = rt
.create_entity(
None,
"concept",
"MergeProps",
None,
Some(serde_json::json!({
"domain": "inference",
"repo": "lattice",
"status": "researched",
})),
vec![],
)
.await
.unwrap();
let updated = rt
.update_entity(
None,
entity.id,
EntityPatch {
properties: Some(serde_json::json!({"status": "implemented"})),
..Default::default()
},
)
.await
.unwrap();
let props = updated.properties.expect("properties should remain set");
assert_eq!(props["domain"], "inference", "domain key must be preserved");
assert_eq!(props["repo"], "lattice", "repo key must be preserved");
assert_eq!(
props["status"], "implemented",
"status key must be updated by patch"
);
}
#[tokio::test]
async fn update_entity_skips_reindex_when_only_properties_change() {
let rt = rt();
let entity = rt
.create_entity(None, "concept", "StableIndexed", None, None, vec![])
.await
.unwrap();
let hits_before = fts_hit(&rt, None, "StableIndexed").await;
assert!(hits_before.contains(&entity.id));
rt.update_entity(
None,
entity.id,
EntityPatch {
properties: Some(serde_json::json!({"new": "prop"})),
..Default::default()
},
)
.await
.unwrap();
let hits_after = fts_hit(&rt, None, "StableIndexed").await;
assert!(
hits_after.contains(&entity.id),
"still findable after props-only patch"
);
}
#[tokio::test]
async fn merge_entity_rewires_edges() {
let rt = rt();
let a = rt
.create_entity(None, "concept", "A", None, None, vec![])
.await
.unwrap();
let b = rt
.create_entity(None, "concept", "B", None, None, vec![])
.await
.unwrap();
let c = rt
.create_entity(None, "concept", "C", None, None, vec![])
.await
.unwrap();
let d = rt
.create_entity(None, "concept", "D", None, None, vec![])
.await
.unwrap();
rt.link(None, a.id, b.id, EdgeRelation::Extends, 1.0)
.await
.unwrap();
rt.link(None, c.id, b.id, EdgeRelation::Extends, 1.0)
.await
.unwrap();
let summary = rt
.merge_entity(None, d.id, b.id, MergeStrategy::PreferInto)
.await
.unwrap();
assert_eq!(summary.kept_id, d.id);
assert_eq!(summary.removed_id, b.id);
assert_eq!(summary.edges_rewired, 2);
let a_neighbors = rt
.neighbors(None, a.id, Direction::Out, None, None)
.await
.unwrap();
assert_eq!(a_neighbors.len(), 1);
assert_eq!(a_neighbors[0].node_id, d.id);
let c_neighbors = rt
.neighbors(None, c.id, Direction::Out, None, None)
.await
.unwrap();
assert_eq!(c_neighbors.len(), 1);
assert_eq!(c_neighbors[0].node_id, d.id);
}
#[tokio::test]
async fn merge_entity_prefer_into_strategy() {
let rt = rt();
let into = rt
.create_entity(
None,
"concept",
"Into",
None,
Some(serde_json::json!({"a": 1})),
vec![],
)
.await
.unwrap();
let from = rt
.create_entity(
None,
"concept",
"From",
None,
Some(serde_json::json!({"a": 2, "b": 3})),
vec![],
)
.await
.unwrap();
rt.merge_entity(None, into.id, from.id, MergeStrategy::PreferInto)
.await
.unwrap();
let kept = rt.get_entity(None, into.id).await.unwrap().unwrap();
let props = kept.properties.unwrap();
assert_eq!(props["a"], 1);
assert_eq!(props["b"], 3);
}
#[tokio::test]
async fn merge_entity_prefer_from_strategy() {
let rt = rt();
let into = rt
.create_entity(
None,
"concept",
"Into",
None,
Some(serde_json::json!({"a": 1})),
vec![],
)
.await
.unwrap();
let from = rt
.create_entity(
None,
"concept",
"From",
None,
Some(serde_json::json!({"a": 2, "b": 3})),
vec![],
)
.await
.unwrap();
rt.merge_entity(None, into.id, from.id, MergeStrategy::PreferFrom)
.await
.unwrap();
let kept = rt.get_entity(None, into.id).await.unwrap().unwrap();
let props = kept.properties.unwrap();
assert_eq!(props["a"], 2);
assert_eq!(props["b"], 3);
}
#[tokio::test]
async fn merge_entity_union_strategy() {
let rt = rt();
let into = rt
.create_entity(
None,
"concept",
"Into",
None,
Some(serde_json::json!({"a": 1})),
vec![],
)
.await
.unwrap();
let from = rt
.create_entity(
None,
"concept",
"From",
None,
Some(serde_json::json!({"a": 2, "b": 3})),
vec![],
)
.await
.unwrap();
rt.merge_entity(None, into.id, from.id, MergeStrategy::Union)
.await
.unwrap();
let kept = rt.get_entity(None, into.id).await.unwrap().unwrap();
let props = kept.properties.unwrap();
assert_eq!(props["a"], 1);
assert_eq!(props["b"], 3);
}
#[tokio::test]
async fn merge_entity_unions_tags() {
let rt = rt();
let into = rt
.create_entity(
None,
"concept",
"Into",
None,
None,
vec!["x".to_string(), "y".to_string()],
)
.await
.unwrap();
let from = rt
.create_entity(
None,
"concept",
"From",
None,
None,
vec!["y".to_string(), "z".to_string()],
)
.await
.unwrap();
rt.merge_entity(None, into.id, from.id, MergeStrategy::PreferInto)
.await
.unwrap();
let kept = rt.get_entity(None, into.id).await.unwrap().unwrap();
let mut tags = kept.tags.clone();
tags.sort();
assert_eq!(tags, vec!["x", "y", "z"]);
}
#[tokio::test]
async fn merge_entity_drops_self_loops() {
let rt = rt();
let a = rt
.create_entity(None, "concept", "A", None, None, vec![])
.await
.unwrap();
let b = rt
.create_entity(None, "concept", "B", None, None, vec![])
.await
.unwrap();
rt.link(None, a.id, b.id, EdgeRelation::Extends, 1.0)
.await
.unwrap();
let summary = rt
.merge_entity(None, a.id, b.id, MergeStrategy::PreferInto)
.await
.unwrap();
assert_eq!(
summary.edges_rewired, 0,
"self-loop should be dropped, not rewired"
);
let a_out = rt
.neighbors(None, a.id, Direction::Out, None, None)
.await
.unwrap();
assert!(a_out.is_empty(), "no self-loop should remain");
}
#[test]
fn union_tags_deduplicates() {
let (tags, added) = union_tags(
&["x".to_string(), "y".to_string()],
&["y".to_string(), "z".to_string()],
);
let mut sorted = tags.clone();
sorted.sort();
assert_eq!(sorted, vec!["x", "y", "z"]);
assert_eq!(added, 1);
}
#[test]
fn merge_properties_prefer_into_fills_missing_keys() {
let a = serde_json::json!({"a": 1});
let b = serde_json::json!({"a": 99, "b": 2});
let (merged, added) = merge_properties(&Some(a), &Some(b), MergeStrategy::PreferInto);
let m = merged.unwrap();
assert_eq!(m["a"], 1);
assert_eq!(m["b"], 2);
assert_eq!(added, 1);
}
}