1use std::collections::HashSet;
8
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use uuid::Uuid;
12
13use khive_storage::types::{
14 DeleteMode, EdgeFilter, EdgeSortField, LinkId, PageRequest, SortOrder, TextDocument,
15};
16use khive_storage::{Edge, EdgeRelation, Entity, SubstrateKind};
17
18use crate::error::{RuntimeError, RuntimeResult};
19use crate::runtime::KhiveRuntime;
20
21#[derive(Clone, Debug, Default)]
32pub struct EntityPatch {
33 pub name: Option<String>,
34 pub description: Option<Option<String>>,
35 pub properties: Option<Value>,
36 pub tags: Option<Vec<String>>,
37}
38
39#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
41#[serde(rename_all = "snake_case")]
42pub enum MergeStrategy {
43 #[default]
46 PreferInto,
47 PreferFrom,
49 Union,
51}
52
53#[derive(Clone, Debug, Serialize, Deserialize)]
55pub struct MergeSummary {
56 pub kept_id: Uuid,
57 pub removed_id: Uuid,
58 pub edges_rewired: usize,
59 pub properties_merged: usize,
60 pub tags_unioned: usize,
61}
62
63#[derive(Clone, Debug, Default)]
65pub struct EdgeListFilter {
66 pub source_id: Option<Uuid>,
67 pub target_id: Option<Uuid>,
68 pub relations: Vec<EdgeRelation>,
70 pub min_weight: Option<f64>,
71 pub max_weight: Option<f64>,
72}
73
74impl From<EdgeListFilter> for EdgeFilter {
75 fn from(f: EdgeListFilter) -> Self {
76 EdgeFilter {
77 source_ids: f.source_id.into_iter().collect(),
78 target_ids: f.target_id.into_iter().collect(),
79 relations: f.relations,
80 min_weight: f.min_weight,
81 max_weight: f.max_weight,
82 ..Default::default()
83 }
84 }
85}
86
87impl KhiveRuntime {
92 pub async fn update_entity(
100 &self,
101 namespace: Option<&str>,
102 id: Uuid,
103 patch: EntityPatch,
104 ) -> RuntimeResult<Entity> {
105 let store = self.entities(namespace)?;
106 let mut entity = store
107 .get_entity(id)
108 .await?
109 .ok_or_else(|| RuntimeError::NotFound(format!("entity {id}")))?;
110
111 if entity.namespace != self.ns(namespace) {
112 return Err(RuntimeError::NotFound(format!("entity {id}")));
113 }
114
115 let mut text_changed = false;
116
117 if let Some(name) = patch.name {
118 text_changed |= entity.name != name;
119 entity.name = name;
120 }
121 if let Some(desc_patch) = patch.description {
122 text_changed |= entity.description != desc_patch;
123 entity.description = desc_patch;
124 }
125 if let Some(props) = patch.properties {
126 entity.properties = Some(props);
127 }
128 if let Some(tags) = patch.tags {
129 entity.tags = tags;
130 }
131
132 entity.updated_at = chrono::Utc::now().timestamp_micros();
133 store.upsert_entity(entity.clone()).await?;
134
135 if text_changed {
136 self.reindex_entity(namespace, &entity).await?;
137 }
138
139 Ok(entity)
140 }
141
142 pub async fn merge_entity(
150 &self,
151 namespace: Option<&str>,
152 into_id: Uuid,
153 from_id: Uuid,
154 strategy: MergeStrategy,
155 ) -> RuntimeResult<MergeSummary> {
156 let store = self.entities(namespace)?;
157 let graph = self.graph(namespace)?;
158
159 let ns = self.ns(namespace);
160 let into_entity = store
161 .get_entity(into_id)
162 .await?
163 .ok_or_else(|| RuntimeError::NotFound(format!("entity {into_id}")))?;
164 if into_entity.namespace != ns {
165 return Err(RuntimeError::NotFound(format!("entity {into_id}")));
166 }
167 let from_entity = store
168 .get_entity(from_id)
169 .await?
170 .ok_or_else(|| RuntimeError::NotFound(format!("entity {from_id}")))?;
171 if from_entity.namespace != ns {
172 return Err(RuntimeError::NotFound(format!("entity {from_id}")));
173 }
174
175 const PAGE_SIZE: u32 = 1_000;
178 let sort = vec![SortOrder {
179 field: EdgeSortField::CreatedAt,
180 direction: khive_storage::types::SortDirection::Asc,
181 }];
182
183 let mut outbound: Vec<Edge> = Vec::new();
184 let mut offset: u64 = 0;
185 loop {
186 let page = graph
187 .query_edges(
188 EdgeFilter {
189 source_ids: vec![from_id],
190 ..Default::default()
191 },
192 sort.clone(),
193 PageRequest {
194 offset,
195 limit: PAGE_SIZE,
196 },
197 )
198 .await?;
199 if page.items.is_empty() {
200 break;
201 }
202 offset += page.items.len() as u64;
203 outbound.extend(page.items);
204 }
205
206 let mut inbound: Vec<Edge> = Vec::new();
207 let mut offset: u64 = 0;
208 loop {
209 let page = graph
210 .query_edges(
211 EdgeFilter {
212 target_ids: vec![from_id],
213 ..Default::default()
214 },
215 sort.clone(),
216 PageRequest {
217 offset,
218 limit: PAGE_SIZE,
219 },
220 )
221 .await?;
222 if page.items.is_empty() {
223 break;
224 }
225 offset += page.items.len() as u64;
226 inbound.extend(page.items);
227 }
228
229 let mut seen_edge_ids: std::collections::HashSet<LinkId> = std::collections::HashSet::new();
232 let mut all_edges: Vec<Edge> = Vec::new();
233 for edge in outbound.into_iter().chain(inbound.into_iter()) {
234 if seen_edge_ids.insert(edge.id) {
235 all_edges.push(edge);
236 }
237 }
238
239 let mut edges_rewired = 0usize;
242 for edge in all_edges {
243 let new_source = if edge.source_id == from_id {
244 into_id
245 } else {
246 edge.source_id
247 };
248 let new_target = if edge.target_id == from_id {
249 into_id
250 } else {
251 edge.target_id
252 };
253 if new_source == new_target {
254 graph.delete_edge(edge.id).await?;
255 continue;
256 }
257 let rewired = Edge {
258 source_id: new_source,
259 target_id: new_target,
260 ..edge
261 };
262 graph.upsert_edge(rewired).await?;
263 edges_rewired += 1;
264 }
265
266 let (merged_props, properties_merged) =
268 merge_properties(&into_entity.properties, &from_entity.properties, strategy);
269
270 let merged_name = merge_string_field(&into_entity.name, &from_entity.name, strategy);
272 let merged_description =
273 merge_option_string_field(&into_entity.description, &from_entity.description, strategy);
274
275 let (merged_tags, tags_unioned) = union_tags(&into_entity.tags, &from_entity.tags);
277
278 let mut updated_into = into_entity;
280 updated_into.name = merged_name;
281 updated_into.description = merged_description;
282 updated_into.properties = merged_props;
283 updated_into.tags = merged_tags;
284 updated_into.updated_at = chrono::Utc::now().timestamp_micros();
285 store.upsert_entity(updated_into.clone()).await?;
286 self.reindex_entity(namespace, &updated_into).await?;
287
288 store.delete_entity(from_id, DeleteMode::Hard).await?;
290 self.remove_from_indexes(namespace, from_id).await?;
291
292 Ok(MergeSummary {
293 kept_id: into_id,
294 removed_id: from_id,
295 edges_rewired,
296 properties_merged,
297 tags_unioned,
298 })
299 }
300
301 pub(crate) async fn reindex_entity(
309 &self,
310 namespace: Option<&str>,
311 entity: &Entity,
312 ) -> RuntimeResult<()> {
313 let body = match &entity.description {
314 Some(d) if !d.is_empty() => format!("{} {}", entity.name, d),
315 _ => entity.name.clone(),
316 };
317 let ns = entity.namespace.clone();
319 self.text(namespace)?
320 .upsert_document(TextDocument {
321 subject_id: entity.id,
322 kind: SubstrateKind::Entity,
323 title: Some(entity.name.clone()),
324 body: body.clone(),
325 tags: entity.tags.clone(),
326 namespace: ns.clone(),
327 metadata: entity.properties.clone(),
328 updated_at: chrono::Utc::now(),
329 })
330 .await?;
331
332 if self.config().embedding_model.is_some() {
333 let vector = self.embed(&body).await?;
334 self.vectors(namespace)?
335 .insert(entity.id, SubstrateKind::Entity, &ns, vector)
336 .await?;
337 }
338
339 Ok(())
340 }
341
342 pub(crate) async fn remove_from_indexes(
344 &self,
345 namespace: Option<&str>,
346 id: Uuid,
347 ) -> RuntimeResult<()> {
348 let ns = self.ns(namespace).to_string();
349 self.text(namespace)?.delete_document(&ns, id).await?;
350 if self.config().embedding_model.is_some() {
351 self.vectors(namespace)?.delete(id).await?;
352 }
353 Ok(())
354 }
355}
356
357fn merge_string_field(into: &str, from: &str, strategy: MergeStrategy) -> String {
362 match strategy {
363 MergeStrategy::PreferInto | MergeStrategy::Union => into.to_string(),
364 MergeStrategy::PreferFrom => from.to_string(),
365 }
366}
367
368fn merge_option_string_field(
369 into: &Option<String>,
370 from: &Option<String>,
371 strategy: MergeStrategy,
372) -> Option<String> {
373 match strategy {
374 MergeStrategy::PreferInto => {
375 if into.is_some() {
376 into.clone()
377 } else {
378 from.clone()
379 }
380 }
381 MergeStrategy::PreferFrom => {
382 if from.is_some() {
383 from.clone()
384 } else {
385 into.clone()
386 }
387 }
388 MergeStrategy::Union => {
389 match (into, from) {
391 (Some(a), _) if !a.is_empty() => Some(a.clone()),
392 (_, Some(b)) => Some(b.clone()),
393 _ => None,
394 }
395 }
396 }
397}
398
399fn merge_properties(
401 into: &Option<Value>,
402 from: &Option<Value>,
403 strategy: MergeStrategy,
404) -> (Option<Value>, usize) {
405 match (into, from) {
406 (None, None) => (None, 0),
407 (Some(a), None) => (Some(a.clone()), 0),
408 (None, Some(b)) => {
409 let count = if let Value::Object(m) = b { m.len() } else { 1 };
410 (Some(b.clone()), count)
411 }
412 (Some(into_val), Some(from_val)) => {
413 let (merged, added) = merge_json(into_val, from_val, strategy);
414 (Some(merged), added)
415 }
416 }
417}
418
419fn merge_json(into: &Value, from: &Value, strategy: MergeStrategy) -> (Value, usize) {
421 match (into, from, strategy) {
422 (Value::Object(a), Value::Object(b), MergeStrategy::Union) => {
423 let mut result = a.clone();
424 let mut added = 0usize;
425 for (k, v_from) in b {
426 if let Some(v_into) = a.get(k) {
427 let (merged, sub_added) = merge_json(v_into, v_from, MergeStrategy::Union);
428 result.insert(k.clone(), merged);
429 added += sub_added;
430 } else {
431 result.insert(k.clone(), v_from.clone());
432 added += 1;
433 }
434 }
435 (Value::Object(result), added)
436 }
437 (Value::Object(a), Value::Object(b), MergeStrategy::PreferInto) => {
438 let mut result = a.clone();
439 let mut added = 0usize;
440 for (k, v) in b {
441 if !a.contains_key(k) {
442 result.insert(k.clone(), v.clone());
443 added += 1;
444 }
445 }
446 (Value::Object(result), added)
447 }
448 (Value::Object(a), Value::Object(b), MergeStrategy::PreferFrom) => {
449 let mut result = a.clone();
450 let mut added = 0usize;
451 for (k, v) in b {
452 result.insert(k.clone(), v.clone());
453 if !a.contains_key(k) {
454 added += 1;
455 }
456 }
457 (Value::Object(result), added)
458 }
459 (_into_val, from_val, MergeStrategy::PreferFrom) => (from_val.clone(), 1),
461 _ => (into.clone(), 0),
462 }
463}
464
465fn union_tags(into: &[String], from: &[String]) -> (Vec<String>, usize) {
466 let mut seen: HashSet<&str> = into.iter().map(|s| s.as_str()).collect();
467 let mut result: Vec<String> = into.to_vec();
468 let mut added = 0usize;
469 for tag in from {
470 if seen.insert(tag.as_str()) {
471 result.push(tag.clone());
472 added += 1;
473 }
474 }
475 (result, added)
476}
477
478#[cfg(test)]
483mod tests {
484 use super::*;
485 use crate::runtime::KhiveRuntime;
486 use khive_storage::types::{Direction, TextFilter, TextQueryMode, TextSearchRequest};
487
488 fn rt() -> KhiveRuntime {
489 KhiveRuntime::memory().unwrap()
490 }
491
492 async fn fts_hit(rt: &KhiveRuntime, namespace: Option<&str>, query: &str) -> Vec<Uuid> {
494 let ns = rt.ns(namespace).to_string();
495 rt.text(namespace)
496 .unwrap()
497 .search(TextSearchRequest {
498 query: query.to_string(),
499 mode: TextQueryMode::Plain,
500 filter: Some(TextFilter {
501 namespaces: vec![ns],
502 ..Default::default()
503 }),
504 top_k: 50,
505 snippet_chars: 100,
506 })
507 .await
508 .unwrap()
509 .into_iter()
510 .map(|h| h.subject_id)
511 .collect()
512 }
513
514 #[tokio::test]
515 async fn update_entity_patch_changes_only_specified_fields() {
516 let rt = rt();
517 let entity = rt
518 .create_entity(
519 None,
520 "concept",
521 "OriginalName",
522 Some("orig desc"),
523 Some(serde_json::json!({"k":"v"})),
524 vec![],
525 )
526 .await
527 .unwrap();
528
529 let updated = rt
530 .update_entity(
531 None,
532 entity.id,
533 EntityPatch {
534 description: Some(Some("new desc".to_string())),
535 ..Default::default()
536 },
537 )
538 .await
539 .unwrap();
540
541 assert_eq!(updated.name, "OriginalName");
542 assert_eq!(updated.description.as_deref(), Some("new desc"));
543 assert_eq!(updated.properties, Some(serde_json::json!({"k":"v"})));
544 }
545
546 #[tokio::test]
547 async fn update_entity_clear_description_with_some_none() {
548 let rt = rt();
549 let entity = rt
550 .create_entity(
551 None,
552 "concept",
553 "ClearDesc",
554 Some("has description"),
555 None,
556 vec![],
557 )
558 .await
559 .unwrap();
560
561 let updated = rt
562 .update_entity(
563 None,
564 entity.id,
565 EntityPatch {
566 description: Some(None),
567 ..Default::default()
568 },
569 )
570 .await
571 .unwrap();
572
573 assert!(
574 updated.description.is_none(),
575 "description should be cleared"
576 );
577 }
578
579 #[tokio::test]
580 async fn update_entity_reindexes_when_name_changes() {
581 let rt = rt();
582 let entity = rt
583 .create_entity(None, "concept", "OldName", None, None, vec![])
584 .await
585 .unwrap();
586
587 let hits_before = fts_hit(&rt, None, "OldName").await;
589 assert!(
590 hits_before.contains(&entity.id),
591 "entity should be findable by old name"
592 );
593
594 rt.update_entity(
595 None,
596 entity.id,
597 EntityPatch {
598 name: Some("NewName".to_string()),
599 ..Default::default()
600 },
601 )
602 .await
603 .unwrap();
604
605 let hits_old = fts_hit(&rt, None, "OldName").await;
606 let hits_new = fts_hit(&rt, None, "NewName").await;
607
608 assert!(
610 !hits_old.contains(&entity.id),
611 "old name should no longer match after rename"
612 );
613 assert!(
614 hits_new.contains(&entity.id),
615 "new name should be findable after rename"
616 );
617 }
618
619 #[tokio::test]
620 async fn update_entity_skips_reindex_when_only_properties_change() {
621 let rt = rt();
622 let entity = rt
623 .create_entity(None, "concept", "StableIndexed", None, None, vec![])
624 .await
625 .unwrap();
626
627 let hits_before = fts_hit(&rt, None, "StableIndexed").await;
629 assert!(hits_before.contains(&entity.id));
630
631 rt.update_entity(
633 None,
634 entity.id,
635 EntityPatch {
636 properties: Some(serde_json::json!({"new": "prop"})),
637 ..Default::default()
638 },
639 )
640 .await
641 .unwrap();
642
643 let hits_after = fts_hit(&rt, None, "StableIndexed").await;
644 assert!(
645 hits_after.contains(&entity.id),
646 "still findable after props-only patch"
647 );
648 }
649
650 #[tokio::test]
651 async fn merge_entity_rewires_edges() {
652 let rt = rt();
653 let a = rt
654 .create_entity(None, "concept", "A", None, None, vec![])
655 .await
656 .unwrap();
657 let b = rt
658 .create_entity(None, "concept", "B", None, None, vec![])
659 .await
660 .unwrap();
661 let c = rt
662 .create_entity(None, "concept", "C", None, None, vec![])
663 .await
664 .unwrap();
665 let d = rt
666 .create_entity(None, "concept", "D", None, None, vec![])
667 .await
668 .unwrap();
669
670 rt.link(None, a.id, b.id, EdgeRelation::Extends, 1.0)
672 .await
673 .unwrap();
674 rt.link(None, c.id, b.id, EdgeRelation::Extends, 1.0)
675 .await
676 .unwrap();
677
678 let summary = rt
679 .merge_entity(None, d.id, b.id, MergeStrategy::PreferInto)
680 .await
681 .unwrap();
682
683 assert_eq!(summary.kept_id, d.id);
684 assert_eq!(summary.removed_id, b.id);
685 assert_eq!(summary.edges_rewired, 2);
686
687 let a_neighbors = rt
689 .neighbors(None, a.id, Direction::Out, None, None)
690 .await
691 .unwrap();
692 assert_eq!(a_neighbors.len(), 1);
693 assert_eq!(a_neighbors[0].node_id, d.id);
694
695 let c_neighbors = rt
696 .neighbors(None, c.id, Direction::Out, None, None)
697 .await
698 .unwrap();
699 assert_eq!(c_neighbors.len(), 1);
700 assert_eq!(c_neighbors[0].node_id, d.id);
701 }
702
703 #[tokio::test]
704 async fn merge_entity_prefer_into_strategy() {
705 let rt = rt();
706 let into = rt
707 .create_entity(
708 None,
709 "concept",
710 "Into",
711 None,
712 Some(serde_json::json!({"a": 1})),
713 vec![],
714 )
715 .await
716 .unwrap();
717 let from = rt
718 .create_entity(
719 None,
720 "concept",
721 "From",
722 None,
723 Some(serde_json::json!({"a": 2, "b": 3})),
724 vec![],
725 )
726 .await
727 .unwrap();
728
729 rt.merge_entity(None, into.id, from.id, MergeStrategy::PreferInto)
730 .await
731 .unwrap();
732
733 let kept = rt.get_entity(None, into.id).await.unwrap().unwrap();
734 let props = kept.properties.unwrap();
735 assert_eq!(props["a"], 1);
737 assert_eq!(props["b"], 3);
738 }
739
740 #[tokio::test]
741 async fn merge_entity_prefer_from_strategy() {
742 let rt = rt();
743 let into = rt
744 .create_entity(
745 None,
746 "concept",
747 "Into",
748 None,
749 Some(serde_json::json!({"a": 1})),
750 vec![],
751 )
752 .await
753 .unwrap();
754 let from = rt
755 .create_entity(
756 None,
757 "concept",
758 "From",
759 None,
760 Some(serde_json::json!({"a": 2, "b": 3})),
761 vec![],
762 )
763 .await
764 .unwrap();
765
766 rt.merge_entity(None, into.id, from.id, MergeStrategy::PreferFrom)
767 .await
768 .unwrap();
769
770 let kept = rt.get_entity(None, into.id).await.unwrap().unwrap();
771 let props = kept.properties.unwrap();
772 assert_eq!(props["a"], 2);
774 assert_eq!(props["b"], 3);
775 }
776
777 #[tokio::test]
778 async fn merge_entity_union_strategy() {
779 let rt = rt();
780 let into = rt
781 .create_entity(
782 None,
783 "concept",
784 "Into",
785 None,
786 Some(serde_json::json!({"a": 1})),
787 vec![],
788 )
789 .await
790 .unwrap();
791 let from = rt
792 .create_entity(
793 None,
794 "concept",
795 "From",
796 None,
797 Some(serde_json::json!({"a": 2, "b": 3})),
798 vec![],
799 )
800 .await
801 .unwrap();
802
803 rt.merge_entity(None, into.id, from.id, MergeStrategy::Union)
804 .await
805 .unwrap();
806
807 let kept = rt.get_entity(None, into.id).await.unwrap().unwrap();
808 let props = kept.properties.unwrap();
809 assert_eq!(props["a"], 1);
811 assert_eq!(props["b"], 3);
812 }
813
814 #[tokio::test]
815 async fn merge_entity_unions_tags() {
816 let rt = rt();
817 let into = rt
818 .create_entity(
819 None,
820 "concept",
821 "Into",
822 None,
823 None,
824 vec!["x".to_string(), "y".to_string()],
825 )
826 .await
827 .unwrap();
828 let from = rt
829 .create_entity(
830 None,
831 "concept",
832 "From",
833 None,
834 None,
835 vec!["y".to_string(), "z".to_string()],
836 )
837 .await
838 .unwrap();
839
840 rt.merge_entity(None, into.id, from.id, MergeStrategy::PreferInto)
841 .await
842 .unwrap();
843
844 let kept = rt.get_entity(None, into.id).await.unwrap().unwrap();
845 let mut tags = kept.tags.clone();
846 tags.sort();
847 assert_eq!(tags, vec!["x", "y", "z"]);
848 }
849
850 #[tokio::test]
851 async fn merge_entity_drops_self_loops() {
852 let rt = rt();
853 let a = rt
854 .create_entity(None, "concept", "A", None, None, vec![])
855 .await
856 .unwrap();
857 let b = rt
858 .create_entity(None, "concept", "B", None, None, vec![])
859 .await
860 .unwrap();
861
862 rt.link(None, a.id, b.id, EdgeRelation::Extends, 1.0)
864 .await
865 .unwrap();
866
867 let summary = rt
868 .merge_entity(None, a.id, b.id, MergeStrategy::PreferInto)
869 .await
870 .unwrap();
871
872 assert_eq!(
873 summary.edges_rewired, 0,
874 "self-loop should be dropped, not rewired"
875 );
876
877 let a_out = rt
878 .neighbors(None, a.id, Direction::Out, None, None)
879 .await
880 .unwrap();
881 assert!(a_out.is_empty(), "no self-loop should remain");
882 }
883
884 #[test]
887 fn union_tags_deduplicates() {
888 let (tags, added) = union_tags(
889 &["x".to_string(), "y".to_string()],
890 &["y".to_string(), "z".to_string()],
891 );
892 let mut sorted = tags.clone();
893 sorted.sort();
894 assert_eq!(sorted, vec!["x", "y", "z"]);
895 assert_eq!(added, 1);
896 }
897
898 #[test]
899 fn merge_properties_prefer_into_fills_missing_keys() {
900 let a = serde_json::json!({"a": 1});
901 let b = serde_json::json!({"a": 99, "b": 2});
902 let (merged, added) = merge_properties(&Some(a), &Some(b), MergeStrategy::PreferInto);
903 let m = merged.unwrap();
904 assert_eq!(m["a"], 1);
905 assert_eq!(m["b"], 2);
906 assert_eq!(added, 1);
907 }
908}