Skip to main content

khive_runtime/
curation.rs

1// Licensed under the Apache License, Version 2.0.
2
3//! Curation operations: entity update/merge and edge-list filter type.
4//!
5//! See ADR-014 for the full specification and semantics.
6
7use std::collections::HashSet;
8
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use uuid::Uuid;
12
13use khive_storage::types::{
14    DeleteMode, EdgeFilter, EdgeSortField, LinkId, PageRequest, SortOrder, TextDocument,
15};
16use khive_storage::{Edge, EdgeRelation, Entity, SubstrateKind};
17
18use crate::error::{RuntimeError, RuntimeResult};
19use crate::runtime::KhiveRuntime;
20
21// ---------------------------------------------------------------------------
22// Public types
23// ---------------------------------------------------------------------------
24
25/// Patch for `update_entity`. Only `Some(_)` fields are applied; `None` means "leave unchanged".
26///
27/// For `description`:
28/// - `None` (outer) — leave the current description as-is
29/// - `Some(None)` — clear the description (set to NULL)
30/// - `Some(Some(s))` — set the description to `s`
31///
32/// For `properties` (deep-merge semantics):
33/// - `None` — leave properties as-is
34/// - `Some(value)` — deep-merge `value` into existing properties. Keys present in
35///   the patch overwrite existing keys; keys absent from the patch are preserved.
36///   Removing a key requires explicit replacement of the parent object (or a future
37///   `unset`/`null-marker` extension).
38///
39/// For `tags` — replace semantics: `Some(vec)` sets tags to exactly `vec`. To add
40/// a tag without losing existing tags, read the entity first, push the new tag,
41/// and pass the full list back.
42#[derive(Clone, Debug, Default)]
43pub struct EntityPatch {
44    pub name: Option<String>,
45    pub description: Option<Option<String>>,
46    pub properties: Option<Value>,
47    pub tags: Option<Vec<String>>,
48}
49
50/// Strategy used when merging two entities.
51#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
52#[serde(rename_all = "snake_case")]
53pub enum MergeStrategy {
54    /// `into` values win on conflict. Tags are unioned. Properties from `from` fill in
55    /// keys that `into` doesn't have. This is the default.
56    #[default]
57    PreferInto,
58    /// `from` values win on conflict.
59    PreferFrom,
60    /// Deep-merge: object properties merge recursively. Scalar conflicts go to `into`.
61    Union,
62}
63
64/// Result returned by `merge_entity`.
65#[derive(Clone, Debug, Serialize, Deserialize)]
66pub struct MergeSummary {
67    pub kept_id: Uuid,
68    pub removed_id: Uuid,
69    pub edges_rewired: usize,
70    pub properties_merged: usize,
71    pub tags_unioned: usize,
72}
73
74/// Filter for `list_edges` / `count_edges`.
75#[derive(Clone, Debug, Default)]
76pub struct EdgeListFilter {
77    pub source_id: Option<Uuid>,
78    pub target_id: Option<Uuid>,
79    /// Empty = any relation.
80    pub relations: Vec<EdgeRelation>,
81    pub min_weight: Option<f64>,
82    pub max_weight: Option<f64>,
83}
84
85impl From<EdgeListFilter> for EdgeFilter {
86    fn from(f: EdgeListFilter) -> Self {
87        EdgeFilter {
88            source_ids: f.source_id.into_iter().collect(),
89            target_ids: f.target_id.into_iter().collect(),
90            relations: f.relations,
91            min_weight: f.min_weight,
92            max_weight: f.max_weight,
93            ..Default::default()
94        }
95    }
96}
97
98// ---------------------------------------------------------------------------
99// Implementation
100// ---------------------------------------------------------------------------
101
102impl KhiveRuntime {
103    /// Patch-style entity update.
104    ///
105    /// Only fields set to `Some(_)` are changed. Re-indexes FTS5 (and vectors if configured)
106    /// when `name` or `description` changes; skips re-indexing for property/tag-only patches.
107    ///
108    /// Returns `RuntimeError::NotFound` if the entity does not exist or belongs to a different
109    /// namespace. This enforces ADR-007 namespace isolation at the runtime layer.
110    pub async fn update_entity(
111        &self,
112        namespace: Option<&str>,
113        id: Uuid,
114        patch: EntityPatch,
115    ) -> RuntimeResult<Entity> {
116        let store = self.entities(namespace)?;
117        let mut entity = store
118            .get_entity(id)
119            .await?
120            .ok_or_else(|| RuntimeError::NotFound(format!("entity {id}")))?;
121
122        if entity.namespace != self.ns(namespace) {
123            return Err(RuntimeError::NotFound(format!("entity {id}")));
124        }
125
126        let mut text_changed = false;
127
128        if let Some(name) = patch.name {
129            text_changed |= entity.name != name;
130            entity.name = name;
131        }
132        if let Some(desc_patch) = patch.description {
133            text_changed |= entity.description != desc_patch;
134            entity.description = desc_patch;
135        }
136        if let Some(props) = patch.properties {
137            let (merged, _) =
138                merge_properties(&entity.properties, &Some(props), MergeStrategy::PreferFrom);
139            entity.properties = merged;
140        }
141        if let Some(tags) = patch.tags {
142            entity.tags = tags;
143        }
144
145        entity.updated_at = chrono::Utc::now().timestamp_micros();
146        store.upsert_entity(entity.clone()).await?;
147
148        if text_changed {
149            self.reindex_entity(namespace, &entity).await?;
150        }
151
152        Ok(entity)
153    }
154
155    /// Merge `from_id` into `into_id`.
156    ///
157    /// All edges incident to `from_id` are rewired to `into_id`. Self-loops that would
158    /// result from the rewire are dropped. Properties and tags are merged per `strategy`.
159    /// `from_id` is hard-deleted and removed from indexes. Returns a summary.
160    ///
161    /// Not transactional in v0.1 — idempotent enough to re-run if interrupted mid-way.
162    pub async fn merge_entity(
163        &self,
164        namespace: Option<&str>,
165        into_id: Uuid,
166        from_id: Uuid,
167        strategy: MergeStrategy,
168    ) -> RuntimeResult<MergeSummary> {
169        let store = self.entities(namespace)?;
170        let graph = self.graph(namespace)?;
171
172        let ns = self.ns(namespace);
173        let into_entity = store
174            .get_entity(into_id)
175            .await?
176            .ok_or_else(|| RuntimeError::NotFound(format!("entity {into_id}")))?;
177        if into_entity.namespace != ns {
178            return Err(RuntimeError::NotFound(format!("entity {into_id}")));
179        }
180        let from_entity = store
181            .get_entity(from_id)
182            .await?
183            .ok_or_else(|| RuntimeError::NotFound(format!("entity {from_id}")))?;
184        if from_entity.namespace != ns {
185            return Err(RuntimeError::NotFound(format!("entity {from_id}")));
186        }
187
188        // Collect all edges incident to from_id (as source OR target).
189        // Use paginated loops so entities with more than PAGE_SIZE edges are fully covered.
190        const PAGE_SIZE: u32 = 1_000;
191        let sort = vec![SortOrder {
192            field: EdgeSortField::CreatedAt,
193            direction: khive_storage::types::SortDirection::Asc,
194        }];
195
196        let mut outbound: Vec<Edge> = Vec::new();
197        let mut offset: u64 = 0;
198        loop {
199            let page = graph
200                .query_edges(
201                    EdgeFilter {
202                        source_ids: vec![from_id],
203                        ..Default::default()
204                    },
205                    sort.clone(),
206                    PageRequest {
207                        offset,
208                        limit: PAGE_SIZE,
209                    },
210                )
211                .await?;
212            if page.items.is_empty() {
213                break;
214            }
215            offset += page.items.len() as u64;
216            outbound.extend(page.items);
217        }
218
219        let mut inbound: Vec<Edge> = Vec::new();
220        let mut offset: u64 = 0;
221        loop {
222            let page = graph
223                .query_edges(
224                    EdgeFilter {
225                        target_ids: vec![from_id],
226                        ..Default::default()
227                    },
228                    sort.clone(),
229                    PageRequest {
230                        offset,
231                        limit: PAGE_SIZE,
232                    },
233                )
234                .await?;
235            if page.items.is_empty() {
236                break;
237            }
238            offset += page.items.len() as u64;
239            inbound.extend(page.items);
240        }
241
242        // Deduplicate incident edges by ID (a self-edge from_id -> from_id
243        // appears in both outbound and inbound lists; process it once).
244        let mut seen_edge_ids: std::collections::HashSet<LinkId> = std::collections::HashSet::new();
245        let mut all_edges: Vec<Edge> = Vec::new();
246        for edge in outbound.into_iter().chain(inbound.into_iter()) {
247            if seen_edge_ids.insert(edge.id) {
248                all_edges.push(edge);
249            }
250        }
251
252        // Rewire edges in one pass. Replace from_id with into_id on both
253        // endpoints simultaneously, then drop self-loops.
254        let mut edges_rewired = 0usize;
255        for edge in all_edges {
256            let new_source = if edge.source_id == from_id {
257                into_id
258            } else {
259                edge.source_id
260            };
261            let new_target = if edge.target_id == from_id {
262                into_id
263            } else {
264                edge.target_id
265            };
266            if new_source == new_target {
267                graph.delete_edge(edge.id).await?;
268                continue;
269            }
270            let rewired = Edge {
271                source_id: new_source,
272                target_id: new_target,
273                ..edge
274            };
275            graph.upsert_edge(rewired).await?;
276            edges_rewired += 1;
277        }
278
279        // Merge properties.
280        let (merged_props, properties_merged) =
281            merge_properties(&into_entity.properties, &from_entity.properties, strategy);
282
283        // Merge description and name per strategy.
284        let merged_name = merge_string_field(&into_entity.name, &from_entity.name, strategy);
285        let merged_description =
286            merge_option_string_field(&into_entity.description, &from_entity.description, strategy);
287
288        // Union tags.
289        let (merged_tags, tags_unioned) = union_tags(&into_entity.tags, &from_entity.tags);
290
291        // Upsert updated into entity.
292        let mut updated_into = into_entity;
293        updated_into.name = merged_name;
294        updated_into.description = merged_description;
295        updated_into.properties = merged_props;
296        updated_into.tags = merged_tags;
297        updated_into.updated_at = chrono::Utc::now().timestamp_micros();
298        store.upsert_entity(updated_into.clone()).await?;
299        self.reindex_entity(namespace, &updated_into).await?;
300
301        // Hard-delete from entity and remove from indexes.
302        store.delete_entity(from_id, DeleteMode::Hard).await?;
303        self.remove_from_indexes(namespace, from_id).await?;
304
305        Ok(MergeSummary {
306            kept_id: into_id,
307            removed_id: from_id,
308            edges_rewired,
309            properties_merged,
310            tags_unioned,
311        })
312    }
313
314    // ---- Internal helpers ----
315
316    /// Re-upsert FTS5 document (and vector if model configured) for the entity.
317    ///
318    /// Uses `entity.namespace` — the authoritative namespace stored on the record — rather
319    /// than the caller-supplied `namespace` parameter. This prevents a cross-namespace
320    /// reindex from writing the search document into the wrong namespace's FTS index.
321    pub(crate) async fn reindex_entity(
322        &self,
323        namespace: Option<&str>,
324        entity: &Entity,
325    ) -> RuntimeResult<()> {
326        let body = match &entity.description {
327            Some(d) if !d.is_empty() => format!("{} {}", entity.name, d),
328            _ => entity.name.clone(),
329        };
330        // Use entity.namespace (authoritative) rather than self.ns(namespace) (caller claim).
331        let ns = entity.namespace.clone();
332        self.text(namespace)?
333            .upsert_document(TextDocument {
334                subject_id: entity.id,
335                kind: SubstrateKind::Entity,
336                title: Some(entity.name.clone()),
337                body: body.clone(),
338                tags: entity.tags.clone(),
339                namespace: ns.clone(),
340                metadata: entity.properties.clone(),
341                updated_at: chrono::Utc::now(),
342            })
343            .await?;
344
345        if self.config().embedding_model.is_some() {
346            let vector = self.embed(&body).await?;
347            self.vectors(namespace)?
348                .insert(entity.id, SubstrateKind::Entity, &ns, vector)
349                .await?;
350        }
351
352        Ok(())
353    }
354
355    /// Remove an entity from FTS5 and (if configured) vector indexes.
356    pub(crate) async fn remove_from_indexes(
357        &self,
358        namespace: Option<&str>,
359        id: Uuid,
360    ) -> RuntimeResult<()> {
361        let ns = self.ns(namespace).to_string();
362        self.text(namespace)?.delete_document(&ns, id).await?;
363        if self.config().embedding_model.is_some() {
364            self.vectors(namespace)?.delete(id).await?;
365        }
366        Ok(())
367    }
368}
369
370// ---------------------------------------------------------------------------
371// Merge helpers (pure functions — easier to unit test)
372// ---------------------------------------------------------------------------
373
374fn merge_string_field(into: &str, from: &str, strategy: MergeStrategy) -> String {
375    match strategy {
376        MergeStrategy::PreferInto | MergeStrategy::Union => into.to_string(),
377        MergeStrategy::PreferFrom => from.to_string(),
378    }
379}
380
381fn merge_option_string_field(
382    into: &Option<String>,
383    from: &Option<String>,
384    strategy: MergeStrategy,
385) -> Option<String> {
386    match strategy {
387        MergeStrategy::PreferInto => {
388            if into.is_some() {
389                into.clone()
390            } else {
391                from.clone()
392            }
393        }
394        MergeStrategy::PreferFrom => {
395            if from.is_some() {
396                from.clone()
397            } else {
398                into.clone()
399            }
400        }
401        MergeStrategy::Union => {
402            // Keep into's description; if empty, append from's.
403            match (into, from) {
404                (Some(a), _) if !a.is_empty() => Some(a.clone()),
405                (_, Some(b)) => Some(b.clone()),
406                _ => None,
407            }
408        }
409    }
410}
411
412/// Merge two property objects. Returns (merged, count_of_fields_from_from_that_were_added).
413fn merge_properties(
414    into: &Option<Value>,
415    from: &Option<Value>,
416    strategy: MergeStrategy,
417) -> (Option<Value>, usize) {
418    match (into, from) {
419        (None, None) => (None, 0),
420        (Some(a), None) => (Some(a.clone()), 0),
421        (None, Some(b)) => {
422            let count = if let Value::Object(m) = b { m.len() } else { 1 };
423            (Some(b.clone()), count)
424        }
425        (Some(into_val), Some(from_val)) => {
426            let (merged, added) = merge_json(into_val, from_val, strategy);
427            (Some(merged), added)
428        }
429    }
430}
431
432/// Deep-merge two JSON values per strategy. Returns (merged, keys_contributed_by_from).
433fn merge_json(into: &Value, from: &Value, strategy: MergeStrategy) -> (Value, usize) {
434    match (into, from, strategy) {
435        (Value::Object(a), Value::Object(b), MergeStrategy::Union) => {
436            let mut result = a.clone();
437            let mut added = 0usize;
438            for (k, v_from) in b {
439                if let Some(v_into) = a.get(k) {
440                    let (merged, sub_added) = merge_json(v_into, v_from, MergeStrategy::Union);
441                    result.insert(k.clone(), merged);
442                    added += sub_added;
443                } else {
444                    result.insert(k.clone(), v_from.clone());
445                    added += 1;
446                }
447            }
448            (Value::Object(result), added)
449        }
450        (Value::Object(a), Value::Object(b), MergeStrategy::PreferInto) => {
451            let mut result = a.clone();
452            let mut added = 0usize;
453            for (k, v) in b {
454                if !a.contains_key(k) {
455                    result.insert(k.clone(), v.clone());
456                    added += 1;
457                }
458            }
459            (Value::Object(result), added)
460        }
461        (Value::Object(a), Value::Object(b), MergeStrategy::PreferFrom) => {
462            let mut result = a.clone();
463            let mut added = 0usize;
464            for (k, v) in b {
465                result.insert(k.clone(), v.clone());
466                if !a.contains_key(k) {
467                    added += 1;
468                }
469            }
470            (Value::Object(result), added)
471        }
472        // Non-object scalars: apply strategy directly.
473        (_into_val, from_val, MergeStrategy::PreferFrom) => (from_val.clone(), 1),
474        _ => (into.clone(), 0),
475    }
476}
477
478fn union_tags(into: &[String], from: &[String]) -> (Vec<String>, usize) {
479    let mut seen: HashSet<&str> = into.iter().map(|s| s.as_str()).collect();
480    let mut result: Vec<String> = into.to_vec();
481    let mut added = 0usize;
482    for tag in from {
483        if seen.insert(tag.as_str()) {
484            result.push(tag.clone());
485            added += 1;
486        }
487    }
488    (result, added)
489}
490
491// ---------------------------------------------------------------------------
492// Unit tests
493// ---------------------------------------------------------------------------
494
495#[cfg(test)]
496mod tests {
497    use super::*;
498    use crate::runtime::KhiveRuntime;
499    use khive_storage::types::{Direction, TextFilter, TextQueryMode, TextSearchRequest};
500
501    fn rt() -> KhiveRuntime {
502        KhiveRuntime::memory().unwrap()
503    }
504
505    // Helper: search FTS5 for `query` in a runtime namespace.
506    async fn fts_hit(rt: &KhiveRuntime, namespace: Option<&str>, query: &str) -> Vec<Uuid> {
507        let ns = rt.ns(namespace).to_string();
508        rt.text(namespace)
509            .unwrap()
510            .search(TextSearchRequest {
511                query: query.to_string(),
512                mode: TextQueryMode::Plain,
513                filter: Some(TextFilter {
514                    namespaces: vec![ns],
515                    ..Default::default()
516                }),
517                top_k: 50,
518                snippet_chars: 100,
519            })
520            .await
521            .unwrap()
522            .into_iter()
523            .map(|h| h.subject_id)
524            .collect()
525    }
526
527    #[tokio::test]
528    async fn update_entity_patch_changes_only_specified_fields() {
529        let rt = rt();
530        let entity = rt
531            .create_entity(
532                None,
533                "concept",
534                "OriginalName",
535                Some("orig desc"),
536                Some(serde_json::json!({"k":"v"})),
537                vec![],
538            )
539            .await
540            .unwrap();
541
542        let updated = rt
543            .update_entity(
544                None,
545                entity.id,
546                EntityPatch {
547                    description: Some(Some("new desc".to_string())),
548                    ..Default::default()
549                },
550            )
551            .await
552            .unwrap();
553
554        assert_eq!(updated.name, "OriginalName");
555        assert_eq!(updated.description.as_deref(), Some("new desc"));
556        assert_eq!(updated.properties, Some(serde_json::json!({"k":"v"})));
557    }
558
559    #[tokio::test]
560    async fn update_entity_clear_description_with_some_none() {
561        let rt = rt();
562        let entity = rt
563            .create_entity(
564                None,
565                "concept",
566                "ClearDesc",
567                Some("has description"),
568                None,
569                vec![],
570            )
571            .await
572            .unwrap();
573
574        let updated = rt
575            .update_entity(
576                None,
577                entity.id,
578                EntityPatch {
579                    description: Some(None),
580                    ..Default::default()
581                },
582            )
583            .await
584            .unwrap();
585
586        assert!(
587            updated.description.is_none(),
588            "description should be cleared"
589        );
590    }
591
592    #[tokio::test]
593    async fn update_entity_reindexes_when_name_changes() {
594        let rt = rt();
595        let entity = rt
596            .create_entity(None, "concept", "OldName", None, None, vec![])
597            .await
598            .unwrap();
599
600        // Old name is findable.
601        let hits_before = fts_hit(&rt, None, "OldName").await;
602        assert!(
603            hits_before.contains(&entity.id),
604            "entity should be findable by old name"
605        );
606
607        rt.update_entity(
608            None,
609            entity.id,
610            EntityPatch {
611                name: Some("NewName".to_string()),
612                ..Default::default()
613            },
614        )
615        .await
616        .unwrap();
617
618        let hits_old = fts_hit(&rt, None, "OldName").await;
619        let hits_new = fts_hit(&rt, None, "NewName").await;
620
621        // After rename, old name no longer matches this entity (FTS index updated).
622        assert!(
623            !hits_old.contains(&entity.id),
624            "old name should no longer match after rename"
625        );
626        assert!(
627            hits_new.contains(&entity.id),
628            "new name should be findable after rename"
629        );
630    }
631
632    #[tokio::test]
633    async fn update_entity_properties_merges_preserving_existing_keys() {
634        let rt = rt();
635        let entity = rt
636            .create_entity(
637                None,
638                "concept",
639                "MergeProps",
640                None,
641                Some(serde_json::json!({
642                    "domain": "inference",
643                    "repo": "lattice",
644                    "status": "researched",
645                })),
646                vec![],
647            )
648            .await
649            .unwrap();
650
651        let updated = rt
652            .update_entity(
653                None,
654                entity.id,
655                EntityPatch {
656                    properties: Some(serde_json::json!({"status": "implemented"})),
657                    ..Default::default()
658                },
659            )
660            .await
661            .unwrap();
662
663        let props = updated.properties.expect("properties should remain set");
664        assert_eq!(props["domain"], "inference", "domain key must be preserved");
665        assert_eq!(props["repo"], "lattice", "repo key must be preserved");
666        assert_eq!(
667            props["status"], "implemented",
668            "status key must be updated by patch"
669        );
670    }
671
672    #[tokio::test]
673    async fn update_entity_skips_reindex_when_only_properties_change() {
674        let rt = rt();
675        let entity = rt
676            .create_entity(None, "concept", "StableIndexed", None, None, vec![])
677            .await
678            .unwrap();
679
680        // Verify it's in the index before.
681        let hits_before = fts_hit(&rt, None, "StableIndexed").await;
682        assert!(hits_before.contains(&entity.id));
683
684        // Only patch properties — text index should be untouched (still findable).
685        rt.update_entity(
686            None,
687            entity.id,
688            EntityPatch {
689                properties: Some(serde_json::json!({"new": "prop"})),
690                ..Default::default()
691            },
692        )
693        .await
694        .unwrap();
695
696        let hits_after = fts_hit(&rt, None, "StableIndexed").await;
697        assert!(
698            hits_after.contains(&entity.id),
699            "still findable after props-only patch"
700        );
701    }
702
703    #[tokio::test]
704    async fn merge_entity_rewires_edges() {
705        let rt = rt();
706        let a = rt
707            .create_entity(None, "concept", "A", None, None, vec![])
708            .await
709            .unwrap();
710        let b = rt
711            .create_entity(None, "concept", "B", None, None, vec![])
712            .await
713            .unwrap();
714        let c = rt
715            .create_entity(None, "concept", "C", None, None, vec![])
716            .await
717            .unwrap();
718        let d = rt
719            .create_entity(None, "concept", "D", None, None, vec![])
720            .await
721            .unwrap();
722
723        // A→B and C→B; merge B into D → should become A→D and C→D.
724        rt.link(None, a.id, b.id, EdgeRelation::Extends, 1.0)
725            .await
726            .unwrap();
727        rt.link(None, c.id, b.id, EdgeRelation::Extends, 1.0)
728            .await
729            .unwrap();
730
731        let summary = rt
732            .merge_entity(None, d.id, b.id, MergeStrategy::PreferInto)
733            .await
734            .unwrap();
735
736        assert_eq!(summary.kept_id, d.id);
737        assert_eq!(summary.removed_id, b.id);
738        assert_eq!(summary.edges_rewired, 2);
739
740        // Verify edges now point to D.
741        let a_neighbors = rt
742            .neighbors(None, a.id, Direction::Out, None, None)
743            .await
744            .unwrap();
745        assert_eq!(a_neighbors.len(), 1);
746        assert_eq!(a_neighbors[0].node_id, d.id);
747
748        let c_neighbors = rt
749            .neighbors(None, c.id, Direction::Out, None, None)
750            .await
751            .unwrap();
752        assert_eq!(c_neighbors.len(), 1);
753        assert_eq!(c_neighbors[0].node_id, d.id);
754    }
755
756    #[tokio::test]
757    async fn merge_entity_prefer_into_strategy() {
758        let rt = rt();
759        let into = rt
760            .create_entity(
761                None,
762                "concept",
763                "Into",
764                None,
765                Some(serde_json::json!({"a": 1})),
766                vec![],
767            )
768            .await
769            .unwrap();
770        let from = rt
771            .create_entity(
772                None,
773                "concept",
774                "From",
775                None,
776                Some(serde_json::json!({"a": 2, "b": 3})),
777                vec![],
778            )
779            .await
780            .unwrap();
781
782        rt.merge_entity(None, into.id, from.id, MergeStrategy::PreferInto)
783            .await
784            .unwrap();
785
786        let kept = rt.get_entity(None, into.id).await.unwrap().unwrap();
787        let props = kept.properties.unwrap();
788        // a stays as 1 (into wins), b is added from from.
789        assert_eq!(props["a"], 1);
790        assert_eq!(props["b"], 3);
791    }
792
793    #[tokio::test]
794    async fn merge_entity_prefer_from_strategy() {
795        let rt = rt();
796        let into = rt
797            .create_entity(
798                None,
799                "concept",
800                "Into",
801                None,
802                Some(serde_json::json!({"a": 1})),
803                vec![],
804            )
805            .await
806            .unwrap();
807        let from = rt
808            .create_entity(
809                None,
810                "concept",
811                "From",
812                None,
813                Some(serde_json::json!({"a": 2, "b": 3})),
814                vec![],
815            )
816            .await
817            .unwrap();
818
819        rt.merge_entity(None, into.id, from.id, MergeStrategy::PreferFrom)
820            .await
821            .unwrap();
822
823        let kept = rt.get_entity(None, into.id).await.unwrap().unwrap();
824        let props = kept.properties.unwrap();
825        // from wins on a, b also from from.
826        assert_eq!(props["a"], 2);
827        assert_eq!(props["b"], 3);
828    }
829
830    #[tokio::test]
831    async fn merge_entity_union_strategy() {
832        let rt = rt();
833        let into = rt
834            .create_entity(
835                None,
836                "concept",
837                "Into",
838                None,
839                Some(serde_json::json!({"a": 1})),
840                vec![],
841            )
842            .await
843            .unwrap();
844        let from = rt
845            .create_entity(
846                None,
847                "concept",
848                "From",
849                None,
850                Some(serde_json::json!({"a": 2, "b": 3})),
851                vec![],
852            )
853            .await
854            .unwrap();
855
856        rt.merge_entity(None, into.id, from.id, MergeStrategy::Union)
857            .await
858            .unwrap();
859
860        let kept = rt.get_entity(None, into.id).await.unwrap().unwrap();
861        let props = kept.properties.unwrap();
862        // Scalar conflict: into wins → a=1. b added from from.
863        assert_eq!(props["a"], 1);
864        assert_eq!(props["b"], 3);
865    }
866
867    #[tokio::test]
868    async fn merge_entity_unions_tags() {
869        let rt = rt();
870        let into = rt
871            .create_entity(
872                None,
873                "concept",
874                "Into",
875                None,
876                None,
877                vec!["x".to_string(), "y".to_string()],
878            )
879            .await
880            .unwrap();
881        let from = rt
882            .create_entity(
883                None,
884                "concept",
885                "From",
886                None,
887                None,
888                vec!["y".to_string(), "z".to_string()],
889            )
890            .await
891            .unwrap();
892
893        rt.merge_entity(None, into.id, from.id, MergeStrategy::PreferInto)
894            .await
895            .unwrap();
896
897        let kept = rt.get_entity(None, into.id).await.unwrap().unwrap();
898        let mut tags = kept.tags.clone();
899        tags.sort();
900        assert_eq!(tags, vec!["x", "y", "z"]);
901    }
902
903    #[tokio::test]
904    async fn merge_entity_drops_self_loops() {
905        let rt = rt();
906        let a = rt
907            .create_entity(None, "concept", "A", None, None, vec![])
908            .await
909            .unwrap();
910        let b = rt
911            .create_entity(None, "concept", "B", None, None, vec![])
912            .await
913            .unwrap();
914
915        // A `extends` B — merging B into A would produce A `extends` A → drop it.
916        rt.link(None, a.id, b.id, EdgeRelation::Extends, 1.0)
917            .await
918            .unwrap();
919
920        let summary = rt
921            .merge_entity(None, a.id, b.id, MergeStrategy::PreferInto)
922            .await
923            .unwrap();
924
925        assert_eq!(
926            summary.edges_rewired, 0,
927            "self-loop should be dropped, not rewired"
928        );
929
930        let a_out = rt
931            .neighbors(None, a.id, Direction::Out, None, None)
932            .await
933            .unwrap();
934        assert!(a_out.is_empty(), "no self-loop should remain");
935    }
936
937    // ---- merge helper unit tests ----
938
939    #[test]
940    fn union_tags_deduplicates() {
941        let (tags, added) = union_tags(
942            &["x".to_string(), "y".to_string()],
943            &["y".to_string(), "z".to_string()],
944        );
945        let mut sorted = tags.clone();
946        sorted.sort();
947        assert_eq!(sorted, vec!["x", "y", "z"]);
948        assert_eq!(added, 1);
949    }
950
951    #[test]
952    fn merge_properties_prefer_into_fills_missing_keys() {
953        let a = serde_json::json!({"a": 1});
954        let b = serde_json::json!({"a": 99, "b": 2});
955        let (merged, added) = merge_properties(&Some(a), &Some(b), MergeStrategy::PreferInto);
956        let m = merged.unwrap();
957        assert_eq!(m["a"], 1);
958        assert_eq!(m["b"], 2);
959        assert_eq!(added, 1);
960    }
961}