1use std::collections::HashSet;
8
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use uuid::Uuid;
12
13use khive_db::SqliteError;
14use khive_storage::types::{EdgeFilter, TextDocument};
15use khive_storage::{EdgeRelation, Entity, SubstrateKind};
16
17use crate::error::{RuntimeError, RuntimeResult};
18use crate::runtime::KhiveRuntime;
19
20#[derive(Clone, Debug, Default)]
42pub struct EntityPatch {
43 pub name: Option<String>,
44 pub description: Option<Option<String>>,
45 pub properties: Option<Value>,
46 pub tags: Option<Vec<String>>,
47}
48
49#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
51#[serde(rename_all = "snake_case")]
52pub enum MergeStrategy {
53 #[default]
56 PreferInto,
57 PreferFrom,
59 Union,
61}
62
63#[derive(Clone, Debug, Serialize, Deserialize)]
65pub struct MergeSummary {
66 pub kept_id: Uuid,
67 pub removed_id: Uuid,
68 pub edges_rewired: usize,
69 pub properties_merged: usize,
70 pub tags_unioned: usize,
71}
72
73#[derive(Clone, Debug, Default)]
75pub struct EdgeListFilter {
76 pub source_id: Option<Uuid>,
77 pub target_id: Option<Uuid>,
78 pub relations: Vec<EdgeRelation>,
80 pub min_weight: Option<f64>,
81 pub max_weight: Option<f64>,
82}
83
84impl From<EdgeListFilter> for EdgeFilter {
85 fn from(f: EdgeListFilter) -> Self {
86 EdgeFilter {
87 source_ids: f.source_id.into_iter().collect(),
88 target_ids: f.target_id.into_iter().collect(),
89 relations: f.relations,
90 min_weight: f.min_weight,
91 max_weight: f.max_weight,
92 ..Default::default()
93 }
94 }
95}
96
97impl KhiveRuntime {
102 pub async fn update_entity(
110 &self,
111 namespace: Option<&str>,
112 id: Uuid,
113 patch: EntityPatch,
114 ) -> RuntimeResult<Entity> {
115 let store = self.entities(namespace)?;
116 let mut entity = store
117 .get_entity(id)
118 .await?
119 .ok_or_else(|| RuntimeError::NotFound(format!("entity {id}")))?;
120
121 if entity.namespace != self.ns(namespace) {
122 return Err(RuntimeError::NotFound(format!("entity {id}")));
123 }
124
125 let mut text_changed = false;
126
127 if let Some(name) = patch.name {
128 text_changed |= entity.name != name;
129 entity.name = name;
130 }
131 if let Some(desc_patch) = patch.description {
132 text_changed |= entity.description != desc_patch;
133 entity.description = desc_patch;
134 }
135 if let Some(props) = patch.properties {
136 let (merged, _) =
137 merge_properties(&entity.properties, &Some(props), MergeStrategy::PreferFrom);
138 entity.properties = merged;
139 }
140 if let Some(tags) = patch.tags {
141 entity.tags = tags;
142 }
143
144 entity.updated_at = chrono::Utc::now().timestamp_micros();
145 store.upsert_entity(entity.clone()).await?;
146
147 if text_changed {
148 self.reindex_entity(namespace, &entity).await?;
149 }
150
151 Ok(entity)
152 }
153
154 pub async fn merge_entity(
165 &self,
166 namespace: Option<&str>,
167 into_id: Uuid,
168 from_id: Uuid,
169 strategy: MergeStrategy,
170 ) -> RuntimeResult<MergeSummary> {
171 let ns = self.ns(namespace).to_string();
172 let sanitized_ns: String = ns
173 .chars()
174 .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
175 .collect();
176 let fts_table = format!("fts_entities_{}", sanitized_ns);
177 let vec_table = self.config().embedding_model.map(|model| {
178 let key: String = model
179 .to_string()
180 .chars()
181 .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
182 .collect();
183 format!("vec_{}", key)
184 });
185
186 let _ = self.entities(namespace)?;
189 let _ = self.graph(namespace)?;
190 let _ = self.text(namespace)?;
191 if self.config().embedding_model.is_some() {
192 let _ = self.vectors(namespace)?;
193 }
194
195 let pool = self.backend().pool_arc();
196
197 let (summary, updated_entity) = tokio::task::spawn_blocking(move || {
198 let guard = pool.writer()?;
199 guard.transaction(|conn| {
200 merge_entity_sql(conn, ns, fts_table, vec_table, into_id, from_id, strategy)
201 })
202 })
203 .await
204 .map_err(|e| RuntimeError::Internal(e.to_string()))??;
205
206 if self.config().embedding_model.is_some() {
209 self.reindex_entity(namespace, &updated_entity).await?;
210 }
211
212 Ok(summary)
213 }
214
215 pub(crate) async fn reindex_entity(
223 &self,
224 namespace: Option<&str>,
225 entity: &Entity,
226 ) -> RuntimeResult<()> {
227 let body = match &entity.description {
228 Some(d) if !d.is_empty() => format!("{} {}", entity.name, d),
229 _ => entity.name.clone(),
230 };
231 let ns = entity.namespace.clone();
233 self.text(namespace)?
234 .upsert_document(TextDocument {
235 subject_id: entity.id,
236 kind: SubstrateKind::Entity,
237 title: Some(entity.name.clone()),
238 body: body.clone(),
239 tags: entity.tags.clone(),
240 namespace: ns.clone(),
241 metadata: entity.properties.clone(),
242 updated_at: chrono::Utc::now(),
243 })
244 .await?;
245
246 if self.config().embedding_model.is_some() {
247 let vector = self.embed(&body).await?;
248 self.vectors(namespace)?
249 .insert(entity.id, SubstrateKind::Entity, &ns, vector)
250 .await?;
251 }
252
253 Ok(())
254 }
255
256 pub(crate) async fn remove_from_indexes(
258 &self,
259 namespace: Option<&str>,
260 id: Uuid,
261 ) -> RuntimeResult<()> {
262 let ns = self.ns(namespace).to_string();
263 self.text(namespace)?.delete_document(&ns, id).await?;
264 if self.config().embedding_model.is_some() {
265 self.vectors(namespace)?.delete(id).await?;
266 }
267 Ok(())
268 }
269}
270
271fn read_merge_entity(
277 conn: &rusqlite::Connection,
278 id: Uuid,
279 namespace: &str,
280) -> Result<Entity, SqliteError> {
281 let id_str = id.to_string();
282 let mut stmt = conn.prepare(
283 "SELECT id, namespace, kind, name, description, properties, tags, \
284 created_at, updated_at, deleted_at \
285 FROM entities WHERE id = ?1 AND deleted_at IS NULL",
286 )?;
287 let mut rows = stmt.query(rusqlite::params![id_str])?;
288 let row = rows
289 .next()?
290 .ok_or_else(|| SqliteError::InvalidData(format!("entity {id} not found")))?;
291
292 let id_s: String = row.get(0)?;
293 let ns: String = row.get(1)?;
294 let kind: String = row.get(2)?;
295 let name: String = row.get(3)?;
296 let description: Option<String> = row.get(4)?;
297 let properties_str: Option<String> = row.get(5)?;
298 let tags_str: String = row.get(6)?;
299 let created_at: i64 = row.get(7)?;
300 let updated_at: i64 = row.get(8)?;
301 let deleted_at: Option<i64> = row.get(9)?;
302
303 if ns != namespace {
304 return Err(SqliteError::InvalidData(format!(
305 "entity {id} belongs to namespace '{ns}', not '{namespace}'"
306 )));
307 }
308
309 let entity_id = Uuid::parse_str(&id_s).map_err(|e| SqliteError::InvalidData(e.to_string()))?;
310 let properties: Option<Value> = properties_str
311 .map(|s| {
312 serde_json::from_str::<Value>(&s).map_err(|e| SqliteError::InvalidData(e.to_string()))
313 })
314 .transpose()?;
315 let tags: Vec<String> =
316 serde_json::from_str(&tags_str).map_err(|e| SqliteError::InvalidData(e.to_string()))?;
317
318 Ok(Entity {
319 id: entity_id,
320 namespace: ns,
321 kind,
322 name,
323 description,
324 properties,
325 tags,
326 created_at,
327 updated_at,
328 deleted_at,
329 })
330}
331
332fn merge_entity_sql(
338 conn: &rusqlite::Connection,
339 namespace: String,
340 fts_table: String,
341 vec_table: Option<String>,
342 into_id: Uuid,
343 from_id: Uuid,
344 strategy: MergeStrategy,
345) -> Result<(MergeSummary, Entity), SqliteError> {
346 let into_entity = read_merge_entity(conn, into_id, &namespace)?;
347 let from_entity = read_merge_entity(conn, from_id, &namespace)?;
348
349 struct EdgeRow {
351 id: Uuid,
352 source_id: Uuid,
353 target_id: Uuid,
354 relation: String,
355 weight: f64,
356 created_at: i64,
357 metadata: Option<String>,
358 }
359
360 let parse_id =
361 |s: String| Uuid::parse_str(&s).map_err(|e| SqliteError::InvalidData(e.to_string()));
362
363 let from_str = from_id.to_string();
364
365 let mut outbound: Vec<EdgeRow> = Vec::new();
366 {
367 let mut stmt = conn.prepare(
368 "SELECT id, source_id, target_id, relation, weight, created_at, metadata \
369 FROM graph_edges WHERE namespace = ?1 AND source_id = ?2",
370 )?;
371 let mut rows = stmt.query(rusqlite::params![&namespace, &from_str])?;
372 while let Some(row) = rows.next()? {
373 outbound.push(EdgeRow {
374 id: parse_id(row.get(0)?)?,
375 source_id: parse_id(row.get(1)?)?,
376 target_id: parse_id(row.get(2)?)?,
377 relation: row.get(3)?,
378 weight: row.get(4)?,
379 created_at: row.get(5)?,
380 metadata: row.get(6)?,
381 });
382 }
383 }
384
385 let mut inbound: Vec<EdgeRow> = Vec::new();
386 {
387 let mut stmt = conn.prepare(
388 "SELECT id, source_id, target_id, relation, weight, created_at, metadata \
389 FROM graph_edges WHERE namespace = ?1 AND target_id = ?2",
390 )?;
391 let mut rows = stmt.query(rusqlite::params![&namespace, &from_str])?;
392 while let Some(row) = rows.next()? {
393 inbound.push(EdgeRow {
394 id: parse_id(row.get(0)?)?,
395 source_id: parse_id(row.get(1)?)?,
396 target_id: parse_id(row.get(2)?)?,
397 relation: row.get(3)?,
398 weight: row.get(4)?,
399 created_at: row.get(5)?,
400 metadata: row.get(6)?,
401 });
402 }
403 }
404
405 let mut seen: HashSet<Uuid> = HashSet::new();
407 let mut all_edges: Vec<EdgeRow> = Vec::new();
408 for edge in outbound.into_iter().chain(inbound) {
409 if seen.insert(edge.id) {
410 all_edges.push(edge);
411 }
412 }
413
414 let mut edges_rewired = 0usize;
416 for edge in all_edges {
417 let new_src = if edge.source_id == from_id {
418 into_id
419 } else {
420 edge.source_id
421 };
422 let new_tgt = if edge.target_id == from_id {
423 into_id
424 } else {
425 edge.target_id
426 };
427
428 if new_src == new_tgt {
429 conn.execute(
430 "DELETE FROM graph_edges WHERE namespace = ?1 AND id = ?2",
431 rusqlite::params![&namespace, edge.id.to_string()],
432 )?;
433 continue;
434 }
435
436 conn.execute(
437 "INSERT INTO graph_edges \
438 (namespace, id, source_id, target_id, relation, weight, created_at, metadata) \
439 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8) \
440 ON CONFLICT(namespace, id) DO UPDATE SET \
441 source_id = excluded.source_id, \
442 target_id = excluded.target_id, \
443 relation = excluded.relation, \
444 weight = excluded.weight, \
445 created_at = excluded.created_at, \
446 metadata = excluded.metadata \
447 ON CONFLICT(namespace, source_id, target_id, relation) DO NOTHING",
448 rusqlite::params![
449 &namespace,
450 edge.id.to_string(),
451 new_src.to_string(),
452 new_tgt.to_string(),
453 &edge.relation,
454 edge.weight,
455 edge.created_at,
456 edge.metadata,
457 ],
458 )?;
459 edges_rewired += 1;
460 }
461
462 let (merged_props, properties_merged) =
464 merge_properties(&into_entity.properties, &from_entity.properties, strategy);
465 let merged_name = merge_string_field(&into_entity.name, &from_entity.name, strategy);
466 let merged_description =
467 merge_option_string_field(&into_entity.description, &from_entity.description, strategy);
468 let (merged_tags, tags_unioned) = union_tags(&into_entity.tags, &from_entity.tags);
469
470 let now = chrono::Utc::now().timestamp_micros();
471 let into_str = into_id.to_string();
472 let props_str = merged_props
473 .as_ref()
474 .map(|v| serde_json::to_string(v).unwrap_or_default());
475 let tags_json = serde_json::to_string(&merged_tags).unwrap_or_else(|_| "[]".to_string());
476
477 conn.execute(
479 "INSERT OR REPLACE INTO entities \
480 (id, namespace, kind, name, description, properties, tags, \
481 created_at, updated_at, deleted_at) \
482 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
483 rusqlite::params![
484 &into_str,
485 &namespace,
486 &into_entity.kind,
487 &merged_name,
488 &merged_description,
489 &props_str,
490 &tags_json,
491 into_entity.created_at,
492 now,
493 into_entity.deleted_at,
494 ],
495 )?;
496
497 let fts_body = match &merged_description {
499 Some(d) if !d.is_empty() => format!("{} {}", merged_name, d),
500 _ => merged_name.clone(),
501 };
502 let kind_str = SubstrateKind::Entity.to_string();
503
504 conn.execute(
505 &format!(
506 "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
507 fts_table
508 ),
509 rusqlite::params![&namespace, &into_str],
510 )?;
511 conn.execute(
512 &format!(
513 "INSERT INTO {} \
514 (subject_id, kind, title, body, tags, namespace, metadata, updated_at) \
515 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
516 fts_table
517 ),
518 rusqlite::params![
519 &into_str,
520 &kind_str,
521 &merged_name,
522 &fts_body,
523 &tags_json,
524 &namespace,
525 &props_str,
526 now,
527 ],
528 )?;
529
530 conn.execute(
532 &format!(
533 "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
534 fts_table
535 ),
536 rusqlite::params![&namespace, &from_str],
537 )?;
538
539 if let Some(ref vec_tbl) = vec_table {
541 conn.execute(
542 &format!(
543 "DELETE FROM {} WHERE subject_id = ?1 AND namespace = ?2",
544 vec_tbl
545 ),
546 rusqlite::params![&from_str, &namespace],
547 )?;
548 }
549
550 conn.execute(
552 "DELETE FROM entities WHERE id = ?1",
553 rusqlite::params![&from_str],
554 )?;
555
556 let updated_entity = Entity {
557 id: into_id,
558 namespace,
559 kind: into_entity.kind,
560 name: merged_name,
561 description: merged_description,
562 properties: merged_props,
563 tags: merged_tags,
564 created_at: into_entity.created_at,
565 updated_at: now,
566 deleted_at: into_entity.deleted_at,
567 };
568
569 Ok((
570 MergeSummary {
571 kept_id: into_id,
572 removed_id: from_id,
573 edges_rewired,
574 properties_merged,
575 tags_unioned,
576 },
577 updated_entity,
578 ))
579}
580
581fn merge_string_field(into: &str, from: &str, strategy: MergeStrategy) -> String {
586 match strategy {
587 MergeStrategy::PreferInto | MergeStrategy::Union => into.to_string(),
588 MergeStrategy::PreferFrom => from.to_string(),
589 }
590}
591
592fn merge_option_string_field(
593 into: &Option<String>,
594 from: &Option<String>,
595 strategy: MergeStrategy,
596) -> Option<String> {
597 match strategy {
598 MergeStrategy::PreferInto => {
599 if into.is_some() {
600 into.clone()
601 } else {
602 from.clone()
603 }
604 }
605 MergeStrategy::PreferFrom => {
606 if from.is_some() {
607 from.clone()
608 } else {
609 into.clone()
610 }
611 }
612 MergeStrategy::Union => {
613 match (into, from) {
615 (Some(a), _) if !a.is_empty() => Some(a.clone()),
616 (_, Some(b)) => Some(b.clone()),
617 _ => None,
618 }
619 }
620 }
621}
622
623fn merge_properties(
625 into: &Option<Value>,
626 from: &Option<Value>,
627 strategy: MergeStrategy,
628) -> (Option<Value>, usize) {
629 match (into, from) {
630 (None, None) => (None, 0),
631 (Some(a), None) => (Some(a.clone()), 0),
632 (None, Some(b)) => {
633 let count = if let Value::Object(m) = b { m.len() } else { 1 };
634 (Some(b.clone()), count)
635 }
636 (Some(into_val), Some(from_val)) => {
637 let (merged, added) = merge_json(into_val, from_val, strategy);
638 (Some(merged), added)
639 }
640 }
641}
642
643fn merge_json(into: &Value, from: &Value, strategy: MergeStrategy) -> (Value, usize) {
645 match (into, from, strategy) {
646 (Value::Object(a), Value::Object(b), MergeStrategy::Union) => {
647 let mut result = a.clone();
648 let mut added = 0usize;
649 for (k, v_from) in b {
650 if let Some(v_into) = a.get(k) {
651 let (merged, sub_added) = merge_json(v_into, v_from, MergeStrategy::Union);
652 result.insert(k.clone(), merged);
653 added += sub_added;
654 } else {
655 result.insert(k.clone(), v_from.clone());
656 added += 1;
657 }
658 }
659 (Value::Object(result), added)
660 }
661 (Value::Object(a), Value::Object(b), MergeStrategy::PreferInto) => {
662 let mut result = a.clone();
663 let mut added = 0usize;
664 for (k, v) in b {
665 if !a.contains_key(k) {
666 result.insert(k.clone(), v.clone());
667 added += 1;
668 }
669 }
670 (Value::Object(result), added)
671 }
672 (Value::Object(a), Value::Object(b), MergeStrategy::PreferFrom) => {
673 let mut result = a.clone();
674 let mut added = 0usize;
675 for (k, v) in b {
676 result.insert(k.clone(), v.clone());
677 if !a.contains_key(k) {
678 added += 1;
679 }
680 }
681 (Value::Object(result), added)
682 }
683 (_into_val, from_val, MergeStrategy::PreferFrom) => (from_val.clone(), 1),
685 _ => (into.clone(), 0),
686 }
687}
688
689fn union_tags(into: &[String], from: &[String]) -> (Vec<String>, usize) {
690 let mut seen: HashSet<&str> = into.iter().map(|s| s.as_str()).collect();
691 let mut result: Vec<String> = into.to_vec();
692 let mut added = 0usize;
693 for tag in from {
694 if seen.insert(tag.as_str()) {
695 result.push(tag.clone());
696 added += 1;
697 }
698 }
699 (result, added)
700}
701
702#[cfg(test)]
707mod tests {
708 use super::*;
709 use crate::runtime::KhiveRuntime;
710 use khive_storage::types::{Direction, TextFilter, TextQueryMode, TextSearchRequest};
711
712 fn rt() -> KhiveRuntime {
713 KhiveRuntime::memory().unwrap()
714 }
715
716 async fn fts_hit(rt: &KhiveRuntime, namespace: Option<&str>, query: &str) -> Vec<Uuid> {
718 let ns = rt.ns(namespace).to_string();
719 rt.text(namespace)
720 .unwrap()
721 .search(TextSearchRequest {
722 query: query.to_string(),
723 mode: TextQueryMode::Plain,
724 filter: Some(TextFilter {
725 namespaces: vec![ns],
726 ..Default::default()
727 }),
728 top_k: 50,
729 snippet_chars: 100,
730 })
731 .await
732 .unwrap()
733 .into_iter()
734 .map(|h| h.subject_id)
735 .collect()
736 }
737
738 #[tokio::test]
739 async fn update_entity_patch_changes_only_specified_fields() {
740 let rt = rt();
741 let entity = rt
742 .create_entity(
743 None,
744 "concept",
745 "OriginalName",
746 Some("orig desc"),
747 Some(serde_json::json!({"k":"v"})),
748 vec![],
749 )
750 .await
751 .unwrap();
752
753 let updated = rt
754 .update_entity(
755 None,
756 entity.id,
757 EntityPatch {
758 description: Some(Some("new desc".to_string())),
759 ..Default::default()
760 },
761 )
762 .await
763 .unwrap();
764
765 assert_eq!(updated.name, "OriginalName");
766 assert_eq!(updated.description.as_deref(), Some("new desc"));
767 assert_eq!(updated.properties, Some(serde_json::json!({"k":"v"})));
768 }
769
770 #[tokio::test]
771 async fn update_entity_clear_description_with_some_none() {
772 let rt = rt();
773 let entity = rt
774 .create_entity(
775 None,
776 "concept",
777 "ClearDesc",
778 Some("has description"),
779 None,
780 vec![],
781 )
782 .await
783 .unwrap();
784
785 let updated = rt
786 .update_entity(
787 None,
788 entity.id,
789 EntityPatch {
790 description: Some(None),
791 ..Default::default()
792 },
793 )
794 .await
795 .unwrap();
796
797 assert!(
798 updated.description.is_none(),
799 "description should be cleared"
800 );
801 }
802
803 #[tokio::test]
804 async fn update_entity_reindexes_when_name_changes() {
805 let rt = rt();
806 let entity = rt
807 .create_entity(None, "concept", "OldName", None, None, vec![])
808 .await
809 .unwrap();
810
811 let hits_before = fts_hit(&rt, None, "OldName").await;
813 assert!(
814 hits_before.contains(&entity.id),
815 "entity should be findable by old name"
816 );
817
818 rt.update_entity(
819 None,
820 entity.id,
821 EntityPatch {
822 name: Some("NewName".to_string()),
823 ..Default::default()
824 },
825 )
826 .await
827 .unwrap();
828
829 let hits_old = fts_hit(&rt, None, "OldName").await;
830 let hits_new = fts_hit(&rt, None, "NewName").await;
831
832 assert!(
834 !hits_old.contains(&entity.id),
835 "old name should no longer match after rename"
836 );
837 assert!(
838 hits_new.contains(&entity.id),
839 "new name should be findable after rename"
840 );
841 }
842
843 #[tokio::test]
844 async fn update_entity_properties_merges_preserving_existing_keys() {
845 let rt = rt();
846 let entity = rt
847 .create_entity(
848 None,
849 "concept",
850 "MergeProps",
851 None,
852 Some(serde_json::json!({
853 "domain": "inference",
854 "repo": "lattice",
855 "status": "researched",
856 })),
857 vec![],
858 )
859 .await
860 .unwrap();
861
862 let updated = rt
863 .update_entity(
864 None,
865 entity.id,
866 EntityPatch {
867 properties: Some(serde_json::json!({"status": "implemented"})),
868 ..Default::default()
869 },
870 )
871 .await
872 .unwrap();
873
874 let props = updated.properties.expect("properties should remain set");
875 assert_eq!(props["domain"], "inference", "domain key must be preserved");
876 assert_eq!(props["repo"], "lattice", "repo key must be preserved");
877 assert_eq!(
878 props["status"], "implemented",
879 "status key must be updated by patch"
880 );
881 }
882
883 #[tokio::test]
884 async fn update_entity_skips_reindex_when_only_properties_change() {
885 let rt = rt();
886 let entity = rt
887 .create_entity(None, "concept", "StableIndexed", None, None, vec![])
888 .await
889 .unwrap();
890
891 let hits_before = fts_hit(&rt, None, "StableIndexed").await;
893 assert!(hits_before.contains(&entity.id));
894
895 rt.update_entity(
897 None,
898 entity.id,
899 EntityPatch {
900 properties: Some(serde_json::json!({"new": "prop"})),
901 ..Default::default()
902 },
903 )
904 .await
905 .unwrap();
906
907 let hits_after = fts_hit(&rt, None, "StableIndexed").await;
908 assert!(
909 hits_after.contains(&entity.id),
910 "still findable after props-only patch"
911 );
912 }
913
914 #[tokio::test]
915 async fn merge_entity_rewires_edges() {
916 let rt = rt();
917 let a = rt
918 .create_entity(None, "concept", "A", None, None, vec![])
919 .await
920 .unwrap();
921 let b = rt
922 .create_entity(None, "concept", "B", None, None, vec![])
923 .await
924 .unwrap();
925 let c = rt
926 .create_entity(None, "concept", "C", None, None, vec![])
927 .await
928 .unwrap();
929 let d = rt
930 .create_entity(None, "concept", "D", None, None, vec![])
931 .await
932 .unwrap();
933
934 rt.link(None, a.id, b.id, EdgeRelation::Extends, 1.0)
936 .await
937 .unwrap();
938 rt.link(None, c.id, b.id, EdgeRelation::Extends, 1.0)
939 .await
940 .unwrap();
941
942 let summary = rt
943 .merge_entity(None, d.id, b.id, MergeStrategy::PreferInto)
944 .await
945 .unwrap();
946
947 assert_eq!(summary.kept_id, d.id);
948 assert_eq!(summary.removed_id, b.id);
949 assert_eq!(summary.edges_rewired, 2);
950
951 let a_neighbors = rt
953 .neighbors(None, a.id, Direction::Out, None, None)
954 .await
955 .unwrap();
956 assert_eq!(a_neighbors.len(), 1);
957 assert_eq!(a_neighbors[0].node_id, d.id);
958
959 let c_neighbors = rt
960 .neighbors(None, c.id, Direction::Out, None, None)
961 .await
962 .unwrap();
963 assert_eq!(c_neighbors.len(), 1);
964 assert_eq!(c_neighbors[0].node_id, d.id);
965 }
966
967 #[tokio::test]
968 async fn merge_entity_prefer_into_strategy() {
969 let rt = rt();
970 let into = rt
971 .create_entity(
972 None,
973 "concept",
974 "Into",
975 None,
976 Some(serde_json::json!({"a": 1})),
977 vec![],
978 )
979 .await
980 .unwrap();
981 let from = rt
982 .create_entity(
983 None,
984 "concept",
985 "From",
986 None,
987 Some(serde_json::json!({"a": 2, "b": 3})),
988 vec![],
989 )
990 .await
991 .unwrap();
992
993 rt.merge_entity(None, into.id, from.id, MergeStrategy::PreferInto)
994 .await
995 .unwrap();
996
997 let kept = rt.get_entity(None, into.id).await.unwrap().unwrap();
998 let props = kept.properties.unwrap();
999 assert_eq!(props["a"], 1);
1001 assert_eq!(props["b"], 3);
1002 }
1003
1004 #[tokio::test]
1005 async fn merge_entity_prefer_from_strategy() {
1006 let rt = rt();
1007 let into = rt
1008 .create_entity(
1009 None,
1010 "concept",
1011 "Into",
1012 None,
1013 Some(serde_json::json!({"a": 1})),
1014 vec![],
1015 )
1016 .await
1017 .unwrap();
1018 let from = rt
1019 .create_entity(
1020 None,
1021 "concept",
1022 "From",
1023 None,
1024 Some(serde_json::json!({"a": 2, "b": 3})),
1025 vec![],
1026 )
1027 .await
1028 .unwrap();
1029
1030 rt.merge_entity(None, into.id, from.id, MergeStrategy::PreferFrom)
1031 .await
1032 .unwrap();
1033
1034 let kept = rt.get_entity(None, into.id).await.unwrap().unwrap();
1035 let props = kept.properties.unwrap();
1036 assert_eq!(props["a"], 2);
1038 assert_eq!(props["b"], 3);
1039 }
1040
1041 #[tokio::test]
1042 async fn merge_entity_union_strategy() {
1043 let rt = rt();
1044 let into = rt
1045 .create_entity(
1046 None,
1047 "concept",
1048 "Into",
1049 None,
1050 Some(serde_json::json!({"a": 1})),
1051 vec![],
1052 )
1053 .await
1054 .unwrap();
1055 let from = rt
1056 .create_entity(
1057 None,
1058 "concept",
1059 "From",
1060 None,
1061 Some(serde_json::json!({"a": 2, "b": 3})),
1062 vec![],
1063 )
1064 .await
1065 .unwrap();
1066
1067 rt.merge_entity(None, into.id, from.id, MergeStrategy::Union)
1068 .await
1069 .unwrap();
1070
1071 let kept = rt.get_entity(None, into.id).await.unwrap().unwrap();
1072 let props = kept.properties.unwrap();
1073 assert_eq!(props["a"], 1);
1075 assert_eq!(props["b"], 3);
1076 }
1077
1078 #[tokio::test]
1079 async fn merge_entity_unions_tags() {
1080 let rt = rt();
1081 let into = rt
1082 .create_entity(
1083 None,
1084 "concept",
1085 "Into",
1086 None,
1087 None,
1088 vec!["x".to_string(), "y".to_string()],
1089 )
1090 .await
1091 .unwrap();
1092 let from = rt
1093 .create_entity(
1094 None,
1095 "concept",
1096 "From",
1097 None,
1098 None,
1099 vec!["y".to_string(), "z".to_string()],
1100 )
1101 .await
1102 .unwrap();
1103
1104 rt.merge_entity(None, into.id, from.id, MergeStrategy::PreferInto)
1105 .await
1106 .unwrap();
1107
1108 let kept = rt.get_entity(None, into.id).await.unwrap().unwrap();
1109 let mut tags = kept.tags.clone();
1110 tags.sort();
1111 assert_eq!(tags, vec!["x", "y", "z"]);
1112 }
1113
1114 #[tokio::test]
1115 async fn merge_entity_drops_self_loops() {
1116 let rt = rt();
1117 let a = rt
1118 .create_entity(None, "concept", "A", None, None, vec![])
1119 .await
1120 .unwrap();
1121 let b = rt
1122 .create_entity(None, "concept", "B", None, None, vec![])
1123 .await
1124 .unwrap();
1125
1126 rt.link(None, a.id, b.id, EdgeRelation::Extends, 1.0)
1128 .await
1129 .unwrap();
1130
1131 let summary = rt
1132 .merge_entity(None, a.id, b.id, MergeStrategy::PreferInto)
1133 .await
1134 .unwrap();
1135
1136 assert_eq!(
1137 summary.edges_rewired, 0,
1138 "self-loop should be dropped, not rewired"
1139 );
1140
1141 let a_out = rt
1142 .neighbors(None, a.id, Direction::Out, None, None)
1143 .await
1144 .unwrap();
1145 assert!(a_out.is_empty(), "no self-loop should remain");
1146 }
1147
1148 #[test]
1151 fn union_tags_deduplicates() {
1152 let (tags, added) = union_tags(
1153 &["x".to_string(), "y".to_string()],
1154 &["y".to_string(), "z".to_string()],
1155 );
1156 let mut sorted = tags.clone();
1157 sorted.sort();
1158 assert_eq!(sorted, vec!["x", "y", "z"]);
1159 assert_eq!(added, 1);
1160 }
1161
1162 #[test]
1163 fn merge_properties_prefer_into_fills_missing_keys() {
1164 let a = serde_json::json!({"a": 1});
1165 let b = serde_json::json!({"a": 99, "b": 2});
1166 let (merged, added) = merge_properties(&Some(a), &Some(b), MergeStrategy::PreferInto);
1167 let m = merged.unwrap();
1168 assert_eq!(m["a"], 1);
1169 assert_eq!(m["b"], 2);
1170 assert_eq!(added, 1);
1171 }
1172}