Skip to main content

khive_runtime/
curation.rs

1// Licensed under the Apache License, Version 2.0.
2
3//! Curation operations: entity update/merge and edge-list filter type.
4//!
5//! See ADR-014 for the full specification and semantics.
6
7use std::collections::HashSet;
8
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use uuid::Uuid;
12
13use khive_storage::types::{
14    DeleteMode, EdgeFilter, EdgeSortField, LinkId, PageRequest, SortOrder, TextDocument,
15};
16use khive_storage::{Edge, EdgeRelation, Entity, SubstrateKind};
17
18use crate::error::{RuntimeError, RuntimeResult};
19use crate::runtime::KhiveRuntime;
20
21// ---------------------------------------------------------------------------
22// Public types
23// ---------------------------------------------------------------------------
24
25/// Patch for `update_entity`. Only `Some(_)` fields are applied; `None` means "leave unchanged".
26///
27/// For `description`:
28/// - `None` (outer) — leave the current description as-is
29/// - `Some(None)` — clear the description (set to NULL)
30/// - `Some(Some(s))` — set the description to `s`
31#[derive(Clone, Debug, Default)]
32pub struct EntityPatch {
33    pub name: Option<String>,
34    pub description: Option<Option<String>>,
35    pub properties: Option<Value>,
36    pub tags: Option<Vec<String>>,
37}
38
39/// Strategy used when merging two entities.
40#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
41#[serde(rename_all = "snake_case")]
42pub enum MergeStrategy {
43    /// `into` values win on conflict. Tags are unioned. Properties from `from` fill in
44    /// keys that `into` doesn't have. This is the default.
45    #[default]
46    PreferInto,
47    /// `from` values win on conflict.
48    PreferFrom,
49    /// Deep-merge: object properties merge recursively. Scalar conflicts go to `into`.
50    Union,
51}
52
53/// Result returned by `merge_entity`.
54#[derive(Clone, Debug, Serialize, Deserialize)]
55pub struct MergeSummary {
56    pub kept_id: Uuid,
57    pub removed_id: Uuid,
58    pub edges_rewired: usize,
59    pub properties_merged: usize,
60    pub tags_unioned: usize,
61}
62
63/// Filter for `list_edges` / `count_edges`.
64#[derive(Clone, Debug, Default)]
65pub struct EdgeListFilter {
66    pub source_id: Option<Uuid>,
67    pub target_id: Option<Uuid>,
68    /// Empty = any relation.
69    pub relations: Vec<EdgeRelation>,
70    pub min_weight: Option<f64>,
71    pub max_weight: Option<f64>,
72}
73
74impl From<EdgeListFilter> for EdgeFilter {
75    fn from(f: EdgeListFilter) -> Self {
76        EdgeFilter {
77            source_ids: f.source_id.into_iter().collect(),
78            target_ids: f.target_id.into_iter().collect(),
79            relations: f.relations,
80            min_weight: f.min_weight,
81            max_weight: f.max_weight,
82            ..Default::default()
83        }
84    }
85}
86
87// ---------------------------------------------------------------------------
88// Implementation
89// ---------------------------------------------------------------------------
90
91impl KhiveRuntime {
92    /// Patch-style entity update.
93    ///
94    /// Only fields set to `Some(_)` are changed. Re-indexes FTS5 (and vectors if configured)
95    /// when `name` or `description` changes; skips re-indexing for property/tag-only patches.
96    ///
97    /// Returns `RuntimeError::NotFound` if the entity does not exist or belongs to a different
98    /// namespace. This enforces ADR-007 namespace isolation at the runtime layer.
99    pub async fn update_entity(
100        &self,
101        namespace: Option<&str>,
102        id: Uuid,
103        patch: EntityPatch,
104    ) -> RuntimeResult<Entity> {
105        let store = self.entities(namespace)?;
106        let mut entity = store
107            .get_entity(id)
108            .await?
109            .ok_or_else(|| RuntimeError::NotFound(format!("entity {id}")))?;
110
111        if entity.namespace != self.ns(namespace) {
112            return Err(RuntimeError::NotFound(format!("entity {id}")));
113        }
114
115        let mut text_changed = false;
116
117        if let Some(name) = patch.name {
118            text_changed |= entity.name != name;
119            entity.name = name;
120        }
121        if let Some(desc_patch) = patch.description {
122            text_changed |= entity.description != desc_patch;
123            entity.description = desc_patch;
124        }
125        if let Some(props) = patch.properties {
126            entity.properties = Some(props);
127        }
128        if let Some(tags) = patch.tags {
129            entity.tags = tags;
130        }
131
132        entity.updated_at = chrono::Utc::now().timestamp_micros();
133        store.upsert_entity(entity.clone()).await?;
134
135        if text_changed {
136            self.reindex_entity(namespace, &entity).await?;
137        }
138
139        Ok(entity)
140    }
141
142    /// Merge `from_id` into `into_id`.
143    ///
144    /// All edges incident to `from_id` are rewired to `into_id`. Self-loops that would
145    /// result from the rewire are dropped. Properties and tags are merged per `strategy`.
146    /// `from_id` is hard-deleted and removed from indexes. Returns a summary.
147    ///
148    /// Not transactional in v0.1 — idempotent enough to re-run if interrupted mid-way.
149    pub async fn merge_entity(
150        &self,
151        namespace: Option<&str>,
152        into_id: Uuid,
153        from_id: Uuid,
154        strategy: MergeStrategy,
155    ) -> RuntimeResult<MergeSummary> {
156        let store = self.entities(namespace)?;
157        let graph = self.graph(namespace)?;
158
159        let ns = self.ns(namespace);
160        let into_entity = store
161            .get_entity(into_id)
162            .await?
163            .ok_or_else(|| RuntimeError::NotFound(format!("entity {into_id}")))?;
164        if into_entity.namespace != ns {
165            return Err(RuntimeError::NotFound(format!("entity {into_id}")));
166        }
167        let from_entity = store
168            .get_entity(from_id)
169            .await?
170            .ok_or_else(|| RuntimeError::NotFound(format!("entity {from_id}")))?;
171        if from_entity.namespace != ns {
172            return Err(RuntimeError::NotFound(format!("entity {from_id}")));
173        }
174
175        // Collect all edges incident to from_id (as source OR target).
176        // Use paginated loops so entities with more than PAGE_SIZE edges are fully covered.
177        const PAGE_SIZE: u32 = 1_000;
178        let sort = vec![SortOrder {
179            field: EdgeSortField::CreatedAt,
180            direction: khive_storage::types::SortDirection::Asc,
181        }];
182
183        let mut outbound: Vec<Edge> = Vec::new();
184        let mut offset: u64 = 0;
185        loop {
186            let page = graph
187                .query_edges(
188                    EdgeFilter {
189                        source_ids: vec![from_id],
190                        ..Default::default()
191                    },
192                    sort.clone(),
193                    PageRequest {
194                        offset,
195                        limit: PAGE_SIZE,
196                    },
197                )
198                .await?;
199            if page.items.is_empty() {
200                break;
201            }
202            offset += page.items.len() as u64;
203            outbound.extend(page.items);
204        }
205
206        let mut inbound: Vec<Edge> = Vec::new();
207        let mut offset: u64 = 0;
208        loop {
209            let page = graph
210                .query_edges(
211                    EdgeFilter {
212                        target_ids: vec![from_id],
213                        ..Default::default()
214                    },
215                    sort.clone(),
216                    PageRequest {
217                        offset,
218                        limit: PAGE_SIZE,
219                    },
220                )
221                .await?;
222            if page.items.is_empty() {
223                break;
224            }
225            offset += page.items.len() as u64;
226            inbound.extend(page.items);
227        }
228
229        // Deduplicate incident edges by ID (a self-edge from_id -> from_id
230        // appears in both outbound and inbound lists; process it once).
231        let mut seen_edge_ids: std::collections::HashSet<LinkId> = std::collections::HashSet::new();
232        let mut all_edges: Vec<Edge> = Vec::new();
233        for edge in outbound.into_iter().chain(inbound.into_iter()) {
234            if seen_edge_ids.insert(edge.id) {
235                all_edges.push(edge);
236            }
237        }
238
239        // Rewire edges in one pass. Replace from_id with into_id on both
240        // endpoints simultaneously, then drop self-loops.
241        let mut edges_rewired = 0usize;
242        for edge in all_edges {
243            let new_source = if edge.source_id == from_id {
244                into_id
245            } else {
246                edge.source_id
247            };
248            let new_target = if edge.target_id == from_id {
249                into_id
250            } else {
251                edge.target_id
252            };
253            if new_source == new_target {
254                graph.delete_edge(edge.id).await?;
255                continue;
256            }
257            let rewired = Edge {
258                source_id: new_source,
259                target_id: new_target,
260                ..edge
261            };
262            graph.upsert_edge(rewired).await?;
263            edges_rewired += 1;
264        }
265
266        // Merge properties.
267        let (merged_props, properties_merged) =
268            merge_properties(&into_entity.properties, &from_entity.properties, strategy);
269
270        // Merge description and name per strategy.
271        let merged_name = merge_string_field(&into_entity.name, &from_entity.name, strategy);
272        let merged_description =
273            merge_option_string_field(&into_entity.description, &from_entity.description, strategy);
274
275        // Union tags.
276        let (merged_tags, tags_unioned) = union_tags(&into_entity.tags, &from_entity.tags);
277
278        // Upsert updated into entity.
279        let mut updated_into = into_entity;
280        updated_into.name = merged_name;
281        updated_into.description = merged_description;
282        updated_into.properties = merged_props;
283        updated_into.tags = merged_tags;
284        updated_into.updated_at = chrono::Utc::now().timestamp_micros();
285        store.upsert_entity(updated_into.clone()).await?;
286        self.reindex_entity(namespace, &updated_into).await?;
287
288        // Hard-delete from entity and remove from indexes.
289        store.delete_entity(from_id, DeleteMode::Hard).await?;
290        self.remove_from_indexes(namespace, from_id).await?;
291
292        Ok(MergeSummary {
293            kept_id: into_id,
294            removed_id: from_id,
295            edges_rewired,
296            properties_merged,
297            tags_unioned,
298        })
299    }
300
301    // ---- Internal helpers ----
302
303    /// Re-upsert FTS5 document (and vector if model configured) for the entity.
304    ///
305    /// Uses `entity.namespace` — the authoritative namespace stored on the record — rather
306    /// than the caller-supplied `namespace` parameter. This prevents a cross-namespace
307    /// reindex from writing the search document into the wrong namespace's FTS index.
308    pub(crate) async fn reindex_entity(
309        &self,
310        namespace: Option<&str>,
311        entity: &Entity,
312    ) -> RuntimeResult<()> {
313        let body = match &entity.description {
314            Some(d) if !d.is_empty() => format!("{} {}", entity.name, d),
315            _ => entity.name.clone(),
316        };
317        // Use entity.namespace (authoritative) rather than self.ns(namespace) (caller claim).
318        let ns = entity.namespace.clone();
319        self.text(namespace)?
320            .upsert_document(TextDocument {
321                subject_id: entity.id,
322                kind: SubstrateKind::Entity,
323                title: Some(entity.name.clone()),
324                body: body.clone(),
325                tags: entity.tags.clone(),
326                namespace: ns.clone(),
327                metadata: entity.properties.clone(),
328                updated_at: chrono::Utc::now(),
329            })
330            .await?;
331
332        if self.config().embedding_model.is_some() {
333            let vector = self.embed(&body).await?;
334            self.vectors(namespace)?
335                .insert(entity.id, SubstrateKind::Entity, &ns, vector)
336                .await?;
337        }
338
339        Ok(())
340    }
341
342    /// Remove an entity from FTS5 and (if configured) vector indexes.
343    pub(crate) async fn remove_from_indexes(
344        &self,
345        namespace: Option<&str>,
346        id: Uuid,
347    ) -> RuntimeResult<()> {
348        let ns = self.ns(namespace).to_string();
349        self.text(namespace)?.delete_document(&ns, id).await?;
350        if self.config().embedding_model.is_some() {
351            self.vectors(namespace)?.delete(id).await?;
352        }
353        Ok(())
354    }
355}
356
357// ---------------------------------------------------------------------------
358// Merge helpers (pure functions — easier to unit test)
359// ---------------------------------------------------------------------------
360
361fn merge_string_field(into: &str, from: &str, strategy: MergeStrategy) -> String {
362    match strategy {
363        MergeStrategy::PreferInto | MergeStrategy::Union => into.to_string(),
364        MergeStrategy::PreferFrom => from.to_string(),
365    }
366}
367
368fn merge_option_string_field(
369    into: &Option<String>,
370    from: &Option<String>,
371    strategy: MergeStrategy,
372) -> Option<String> {
373    match strategy {
374        MergeStrategy::PreferInto => {
375            if into.is_some() {
376                into.clone()
377            } else {
378                from.clone()
379            }
380        }
381        MergeStrategy::PreferFrom => {
382            if from.is_some() {
383                from.clone()
384            } else {
385                into.clone()
386            }
387        }
388        MergeStrategy::Union => {
389            // Keep into's description; if empty, append from's.
390            match (into, from) {
391                (Some(a), _) if !a.is_empty() => Some(a.clone()),
392                (_, Some(b)) => Some(b.clone()),
393                _ => None,
394            }
395        }
396    }
397}
398
399/// Merge two property objects. Returns (merged, count_of_fields_from_from_that_were_added).
400fn merge_properties(
401    into: &Option<Value>,
402    from: &Option<Value>,
403    strategy: MergeStrategy,
404) -> (Option<Value>, usize) {
405    match (into, from) {
406        (None, None) => (None, 0),
407        (Some(a), None) => (Some(a.clone()), 0),
408        (None, Some(b)) => {
409            let count = if let Value::Object(m) = b { m.len() } else { 1 };
410            (Some(b.clone()), count)
411        }
412        (Some(into_val), Some(from_val)) => {
413            let (merged, added) = merge_json(into_val, from_val, strategy);
414            (Some(merged), added)
415        }
416    }
417}
418
419/// Deep-merge two JSON values per strategy. Returns (merged, keys_contributed_by_from).
420fn merge_json(into: &Value, from: &Value, strategy: MergeStrategy) -> (Value, usize) {
421    match (into, from, strategy) {
422        (Value::Object(a), Value::Object(b), MergeStrategy::Union) => {
423            let mut result = a.clone();
424            let mut added = 0usize;
425            for (k, v_from) in b {
426                if let Some(v_into) = a.get(k) {
427                    let (merged, sub_added) = merge_json(v_into, v_from, MergeStrategy::Union);
428                    result.insert(k.clone(), merged);
429                    added += sub_added;
430                } else {
431                    result.insert(k.clone(), v_from.clone());
432                    added += 1;
433                }
434            }
435            (Value::Object(result), added)
436        }
437        (Value::Object(a), Value::Object(b), MergeStrategy::PreferInto) => {
438            let mut result = a.clone();
439            let mut added = 0usize;
440            for (k, v) in b {
441                if !a.contains_key(k) {
442                    result.insert(k.clone(), v.clone());
443                    added += 1;
444                }
445            }
446            (Value::Object(result), added)
447        }
448        (Value::Object(a), Value::Object(b), MergeStrategy::PreferFrom) => {
449            let mut result = a.clone();
450            let mut added = 0usize;
451            for (k, v) in b {
452                result.insert(k.clone(), v.clone());
453                if !a.contains_key(k) {
454                    added += 1;
455                }
456            }
457            (Value::Object(result), added)
458        }
459        // Non-object scalars: apply strategy directly.
460        (_into_val, from_val, MergeStrategy::PreferFrom) => (from_val.clone(), 1),
461        _ => (into.clone(), 0),
462    }
463}
464
465fn union_tags(into: &[String], from: &[String]) -> (Vec<String>, usize) {
466    let mut seen: HashSet<&str> = into.iter().map(|s| s.as_str()).collect();
467    let mut result: Vec<String> = into.to_vec();
468    let mut added = 0usize;
469    for tag in from {
470        if seen.insert(tag.as_str()) {
471            result.push(tag.clone());
472            added += 1;
473        }
474    }
475    (result, added)
476}
477
478// ---------------------------------------------------------------------------
479// Unit tests
480// ---------------------------------------------------------------------------
481
482#[cfg(test)]
483mod tests {
484    use super::*;
485    use crate::runtime::KhiveRuntime;
486    use khive_storage::types::{Direction, TextFilter, TextQueryMode, TextSearchRequest};
487
488    fn rt() -> KhiveRuntime {
489        KhiveRuntime::memory().unwrap()
490    }
491
492    // Helper: search FTS5 for `query` in a runtime namespace.
493    async fn fts_hit(rt: &KhiveRuntime, namespace: Option<&str>, query: &str) -> Vec<Uuid> {
494        let ns = rt.ns(namespace).to_string();
495        rt.text(namespace)
496            .unwrap()
497            .search(TextSearchRequest {
498                query: query.to_string(),
499                mode: TextQueryMode::Plain,
500                filter: Some(TextFilter {
501                    namespaces: vec![ns],
502                    ..Default::default()
503                }),
504                top_k: 50,
505                snippet_chars: 100,
506            })
507            .await
508            .unwrap()
509            .into_iter()
510            .map(|h| h.subject_id)
511            .collect()
512    }
513
514    #[tokio::test]
515    async fn update_entity_patch_changes_only_specified_fields() {
516        let rt = rt();
517        let entity = rt
518            .create_entity(
519                None,
520                "concept",
521                "OriginalName",
522                Some("orig desc"),
523                Some(serde_json::json!({"k":"v"})),
524                vec![],
525            )
526            .await
527            .unwrap();
528
529        let updated = rt
530            .update_entity(
531                None,
532                entity.id,
533                EntityPatch {
534                    description: Some(Some("new desc".to_string())),
535                    ..Default::default()
536                },
537            )
538            .await
539            .unwrap();
540
541        assert_eq!(updated.name, "OriginalName");
542        assert_eq!(updated.description.as_deref(), Some("new desc"));
543        assert_eq!(updated.properties, Some(serde_json::json!({"k":"v"})));
544    }
545
546    #[tokio::test]
547    async fn update_entity_clear_description_with_some_none() {
548        let rt = rt();
549        let entity = rt
550            .create_entity(
551                None,
552                "concept",
553                "ClearDesc",
554                Some("has description"),
555                None,
556                vec![],
557            )
558            .await
559            .unwrap();
560
561        let updated = rt
562            .update_entity(
563                None,
564                entity.id,
565                EntityPatch {
566                    description: Some(None),
567                    ..Default::default()
568                },
569            )
570            .await
571            .unwrap();
572
573        assert!(
574            updated.description.is_none(),
575            "description should be cleared"
576        );
577    }
578
579    #[tokio::test]
580    async fn update_entity_reindexes_when_name_changes() {
581        let rt = rt();
582        let entity = rt
583            .create_entity(None, "concept", "OldName", None, None, vec![])
584            .await
585            .unwrap();
586
587        // Old name is findable.
588        let hits_before = fts_hit(&rt, None, "OldName").await;
589        assert!(
590            hits_before.contains(&entity.id),
591            "entity should be findable by old name"
592        );
593
594        rt.update_entity(
595            None,
596            entity.id,
597            EntityPatch {
598                name: Some("NewName".to_string()),
599                ..Default::default()
600            },
601        )
602        .await
603        .unwrap();
604
605        let hits_old = fts_hit(&rt, None, "OldName").await;
606        let hits_new = fts_hit(&rt, None, "NewName").await;
607
608        // After rename, old name no longer matches this entity (FTS index updated).
609        assert!(
610            !hits_old.contains(&entity.id),
611            "old name should no longer match after rename"
612        );
613        assert!(
614            hits_new.contains(&entity.id),
615            "new name should be findable after rename"
616        );
617    }
618
619    #[tokio::test]
620    async fn update_entity_skips_reindex_when_only_properties_change() {
621        let rt = rt();
622        let entity = rt
623            .create_entity(None, "concept", "StableIndexed", None, None, vec![])
624            .await
625            .unwrap();
626
627        // Verify it's in the index before.
628        let hits_before = fts_hit(&rt, None, "StableIndexed").await;
629        assert!(hits_before.contains(&entity.id));
630
631        // Only patch properties — text index should be untouched (still findable).
632        rt.update_entity(
633            None,
634            entity.id,
635            EntityPatch {
636                properties: Some(serde_json::json!({"new": "prop"})),
637                ..Default::default()
638            },
639        )
640        .await
641        .unwrap();
642
643        let hits_after = fts_hit(&rt, None, "StableIndexed").await;
644        assert!(
645            hits_after.contains(&entity.id),
646            "still findable after props-only patch"
647        );
648    }
649
650    #[tokio::test]
651    async fn merge_entity_rewires_edges() {
652        let rt = rt();
653        let a = rt
654            .create_entity(None, "concept", "A", None, None, vec![])
655            .await
656            .unwrap();
657        let b = rt
658            .create_entity(None, "concept", "B", None, None, vec![])
659            .await
660            .unwrap();
661        let c = rt
662            .create_entity(None, "concept", "C", None, None, vec![])
663            .await
664            .unwrap();
665        let d = rt
666            .create_entity(None, "concept", "D", None, None, vec![])
667            .await
668            .unwrap();
669
670        // A→B and C→B; merge B into D → should become A→D and C→D.
671        rt.link(None, a.id, b.id, EdgeRelation::Extends, 1.0)
672            .await
673            .unwrap();
674        rt.link(None, c.id, b.id, EdgeRelation::Extends, 1.0)
675            .await
676            .unwrap();
677
678        let summary = rt
679            .merge_entity(None, d.id, b.id, MergeStrategy::PreferInto)
680            .await
681            .unwrap();
682
683        assert_eq!(summary.kept_id, d.id);
684        assert_eq!(summary.removed_id, b.id);
685        assert_eq!(summary.edges_rewired, 2);
686
687        // Verify edges now point to D.
688        let a_neighbors = rt
689            .neighbors(None, a.id, Direction::Out, None, None)
690            .await
691            .unwrap();
692        assert_eq!(a_neighbors.len(), 1);
693        assert_eq!(a_neighbors[0].node_id, d.id);
694
695        let c_neighbors = rt
696            .neighbors(None, c.id, Direction::Out, None, None)
697            .await
698            .unwrap();
699        assert_eq!(c_neighbors.len(), 1);
700        assert_eq!(c_neighbors[0].node_id, d.id);
701    }
702
703    #[tokio::test]
704    async fn merge_entity_prefer_into_strategy() {
705        let rt = rt();
706        let into = rt
707            .create_entity(
708                None,
709                "concept",
710                "Into",
711                None,
712                Some(serde_json::json!({"a": 1})),
713                vec![],
714            )
715            .await
716            .unwrap();
717        let from = rt
718            .create_entity(
719                None,
720                "concept",
721                "From",
722                None,
723                Some(serde_json::json!({"a": 2, "b": 3})),
724                vec![],
725            )
726            .await
727            .unwrap();
728
729        rt.merge_entity(None, into.id, from.id, MergeStrategy::PreferInto)
730            .await
731            .unwrap();
732
733        let kept = rt.get_entity(None, into.id).await.unwrap().unwrap();
734        let props = kept.properties.unwrap();
735        // a stays as 1 (into wins), b is added from from.
736        assert_eq!(props["a"], 1);
737        assert_eq!(props["b"], 3);
738    }
739
740    #[tokio::test]
741    async fn merge_entity_prefer_from_strategy() {
742        let rt = rt();
743        let into = rt
744            .create_entity(
745                None,
746                "concept",
747                "Into",
748                None,
749                Some(serde_json::json!({"a": 1})),
750                vec![],
751            )
752            .await
753            .unwrap();
754        let from = rt
755            .create_entity(
756                None,
757                "concept",
758                "From",
759                None,
760                Some(serde_json::json!({"a": 2, "b": 3})),
761                vec![],
762            )
763            .await
764            .unwrap();
765
766        rt.merge_entity(None, into.id, from.id, MergeStrategy::PreferFrom)
767            .await
768            .unwrap();
769
770        let kept = rt.get_entity(None, into.id).await.unwrap().unwrap();
771        let props = kept.properties.unwrap();
772        // from wins on a, b also from from.
773        assert_eq!(props["a"], 2);
774        assert_eq!(props["b"], 3);
775    }
776
777    #[tokio::test]
778    async fn merge_entity_union_strategy() {
779        let rt = rt();
780        let into = rt
781            .create_entity(
782                None,
783                "concept",
784                "Into",
785                None,
786                Some(serde_json::json!({"a": 1})),
787                vec![],
788            )
789            .await
790            .unwrap();
791        let from = rt
792            .create_entity(
793                None,
794                "concept",
795                "From",
796                None,
797                Some(serde_json::json!({"a": 2, "b": 3})),
798                vec![],
799            )
800            .await
801            .unwrap();
802
803        rt.merge_entity(None, into.id, from.id, MergeStrategy::Union)
804            .await
805            .unwrap();
806
807        let kept = rt.get_entity(None, into.id).await.unwrap().unwrap();
808        let props = kept.properties.unwrap();
809        // Scalar conflict: into wins → a=1. b added from from.
810        assert_eq!(props["a"], 1);
811        assert_eq!(props["b"], 3);
812    }
813
814    #[tokio::test]
815    async fn merge_entity_unions_tags() {
816        let rt = rt();
817        let into = rt
818            .create_entity(
819                None,
820                "concept",
821                "Into",
822                None,
823                None,
824                vec!["x".to_string(), "y".to_string()],
825            )
826            .await
827            .unwrap();
828        let from = rt
829            .create_entity(
830                None,
831                "concept",
832                "From",
833                None,
834                None,
835                vec!["y".to_string(), "z".to_string()],
836            )
837            .await
838            .unwrap();
839
840        rt.merge_entity(None, into.id, from.id, MergeStrategy::PreferInto)
841            .await
842            .unwrap();
843
844        let kept = rt.get_entity(None, into.id).await.unwrap().unwrap();
845        let mut tags = kept.tags.clone();
846        tags.sort();
847        assert_eq!(tags, vec!["x", "y", "z"]);
848    }
849
850    #[tokio::test]
851    async fn merge_entity_drops_self_loops() {
852        let rt = rt();
853        let a = rt
854            .create_entity(None, "concept", "A", None, None, vec![])
855            .await
856            .unwrap();
857        let b = rt
858            .create_entity(None, "concept", "B", None, None, vec![])
859            .await
860            .unwrap();
861
862        // A `extends` B — merging B into A would produce A `extends` A → drop it.
863        rt.link(None, a.id, b.id, EdgeRelation::Extends, 1.0)
864            .await
865            .unwrap();
866
867        let summary = rt
868            .merge_entity(None, a.id, b.id, MergeStrategy::PreferInto)
869            .await
870            .unwrap();
871
872        assert_eq!(
873            summary.edges_rewired, 0,
874            "self-loop should be dropped, not rewired"
875        );
876
877        let a_out = rt
878            .neighbors(None, a.id, Direction::Out, None, None)
879            .await
880            .unwrap();
881        assert!(a_out.is_empty(), "no self-loop should remain");
882    }
883
884    // ---- merge helper unit tests ----
885
886    #[test]
887    fn union_tags_deduplicates() {
888        let (tags, added) = union_tags(
889            &["x".to_string(), "y".to_string()],
890            &["y".to_string(), "z".to_string()],
891        );
892        let mut sorted = tags.clone();
893        sorted.sort();
894        assert_eq!(sorted, vec!["x", "y", "z"]);
895        assert_eq!(added, 1);
896    }
897
898    #[test]
899    fn merge_properties_prefer_into_fills_missing_keys() {
900        let a = serde_json::json!({"a": 1});
901        let b = serde_json::json!({"a": 99, "b": 2});
902        let (merged, added) = merge_properties(&Some(a), &Some(b), MergeStrategy::PreferInto);
903        let m = merged.unwrap();
904        assert_eq!(m["a"], 1);
905        assert_eq!(m["b"], 2);
906        assert_eq!(added, 1);
907    }
908}