Skip to main content

khive_runtime/
curation.rs

1// Licensed under the Apache License, Version 2.0.
2
3//! Curation operations: entity update/merge and edge-list filter type.
4//!
5//! See ADR-014 for the full specification and semantics.
6
7use std::collections::HashSet;
8
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use uuid::Uuid;
12
13use khive_db::SqliteError;
14use khive_storage::types::{EdgeFilter, TextDocument};
15use khive_storage::{EdgeRelation, Entity, SubstrateKind};
16use khive_types::EventKind;
17use rusqlite::OptionalExtension;
18
19use crate::error::{RuntimeError, RuntimeResult};
20use crate::operations::canonical_edge_endpoints;
21use crate::runtime::{KhiveRuntime, NamespaceToken};
22
23// ---------------------------------------------------------------------------
24// Public types
25// ---------------------------------------------------------------------------
26
27/// Patch for `update_entity`. Only `Some(_)` fields are applied; `None` means "leave unchanged".
28///
29/// For `description`:
30/// - `None` (outer) — leave the current description as-is
31/// - `Some(None)` — clear the description (set to NULL)
32/// - `Some(Some(s))` — set the description to `s`
33///
34/// For `properties` (deep-merge semantics):
35/// - `None` — leave properties as-is
36/// - `Some(value)` — deep-merge `value` into existing properties. Keys present in
37///   the patch overwrite existing keys; keys absent from the patch are preserved.
38///   Removing a key requires explicit replacement of the parent object (or a future
39///   `unset`/`null-marker` extension).
40///
41/// For `tags` — replace semantics: `Some(vec)` sets tags to exactly `vec`. To add
42/// a tag without losing existing tags, read the entity first, push the new tag,
43/// and pass the full list back.
44#[derive(Clone, Debug, Default)]
45pub struct EntityPatch {
46    pub name: Option<String>,
47    pub description: Option<Option<String>>,
48    pub properties: Option<Value>,
49    pub tags: Option<Vec<String>>,
50}
51
52/// Policy used when deduplicating two entities.
53#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
54#[serde(rename_all = "snake_case")]
55pub enum EntityDedupMergePolicy {
56    /// `into` values win on conflict. Tags are unioned. Properties from `from` fill in
57    /// keys that `into` doesn't have. This is the default.
58    #[default]
59    PreferInto,
60    /// `from` values win on conflict.
61    PreferFrom,
62    /// Deep-merge: object properties merge recursively. Scalar conflicts go to `into`.
63    Union,
64}
65
66/// Strategy for merging note content when two notes are combined.
67#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
68#[serde(rename_all = "snake_case")]
69pub enum ContentMergeStrategy {
70    #[default]
71    Append,
72    PreferInto,
73    PreferFrom,
74}
75
76/// Result returned by `merge_entity` / `merge_note`.
77#[derive(Clone, Debug, Serialize, Deserialize)]
78pub struct MergeSummary {
79    pub kept_id: Uuid,
80    pub removed_id: Uuid,
81    pub edges_rewired: usize,
82    pub properties_merged: usize,
83    pub tags_unioned: usize,
84    pub content_appended: bool,
85    pub dry_run: bool,
86}
87
88/// Patch for `update_edge`. Only `Some(_)` fields are applied; `None` means "leave unchanged".
89///
90/// For `properties` — replacement semantics (not deep merge): `Some(value)` replaces
91/// the entire metadata object. `None` leaves metadata unchanged.
92#[derive(Clone, Debug, Default)]
93pub struct EdgePatch {
94    pub relation: Option<EdgeRelation>,
95    pub weight: Option<f64>,
96    pub properties: Option<Value>,
97}
98
99/// Patch for `update_note`. Only `Some(_)` fields are applied; `None` means "leave unchanged".
100///
101/// For `salience`/`decay_factor`:
102/// - `None` (outer) — leave unchanged
103/// - `Some(None)` — clear the value
104/// - `Some(Some(v))` — set to v
105#[derive(Clone, Debug, Default)]
106pub struct NotePatch {
107    pub name: Option<Option<String>>,
108    pub content: Option<String>,
109    pub salience: Option<Option<f64>>,
110    pub decay_factor: Option<Option<f64>>,
111    pub properties: Option<Value>,
112    pub(crate) kind_status: Option<String>,
113}
114
115impl NotePatch {
116    /// Construct a `NotePatch` from the public fields only.
117    /// Use this from external crates; `kind_status` is set to `None`.
118    pub fn new(
119        name: Option<Option<String>>,
120        content: Option<String>,
121        salience: Option<Option<f64>>,
122        decay_factor: Option<Option<f64>>,
123        properties: Option<Value>,
124    ) -> Self {
125        Self {
126            name,
127            content,
128            salience,
129            decay_factor,
130            properties,
131            kind_status: None,
132        }
133    }
134}
135
136/// Filter for `list_edges` / `count_edges`.
137#[derive(Clone, Debug, Default)]
138pub struct EdgeListFilter {
139    pub source_id: Option<Uuid>,
140    pub target_id: Option<Uuid>,
141    /// Empty = any relation.
142    pub relations: Vec<EdgeRelation>,
143    pub min_weight: Option<f64>,
144    pub max_weight: Option<f64>,
145}
146
147impl From<EdgeListFilter> for EdgeFilter {
148    fn from(f: EdgeListFilter) -> Self {
149        EdgeFilter {
150            source_ids: f.source_id.into_iter().collect(),
151            target_ids: f.target_id.into_iter().collect(),
152            relations: f.relations,
153            min_weight: f.min_weight,
154            max_weight: f.max_weight,
155            ..Default::default()
156        }
157    }
158}
159
160// ---------------------------------------------------------------------------
161// Implementation
162// ---------------------------------------------------------------------------
163
164impl KhiveRuntime {
165    /// Patch-style entity update.
166    ///
167    /// Only fields set to `Some(_)` are changed. Re-indexes FTS5 (and vectors if configured)
168    /// when `name` or `description` changes; skips re-indexing for property/tag-only patches.
169    ///
170    /// Returns `RuntimeError::NotFound` if the entity does not exist or belongs to a different
171    /// namespace. This enforces ADR-007 namespace isolation at the runtime layer.
172    pub async fn update_entity(
173        &self,
174        token: &NamespaceToken,
175        id: Uuid,
176        patch: EntityPatch,
177    ) -> RuntimeResult<Entity> {
178        let store = self.entities(token)?;
179        let mut entity = store
180            .get_entity(id)
181            .await?
182            .ok_or_else(|| RuntimeError::NotFound(format!("entity {id}")))?;
183
184        Self::ensure_namespace(&entity.namespace, token.namespace().as_str())?;
185
186        let mut text_changed = false;
187        let mut changed_fields: Vec<&'static str> = Vec::new();
188
189        if let Some(name) = patch.name {
190            text_changed |= entity.name != name;
191            entity.name = name;
192            changed_fields.push("name");
193        }
194        if let Some(desc_patch) = patch.description {
195            text_changed |= entity.description != desc_patch;
196            entity.description = desc_patch;
197            changed_fields.push("description");
198        }
199        if let Some(props) = patch.properties {
200            let (merged, _) = merge_properties(
201                &entity.properties,
202                &Some(props),
203                EntityDedupMergePolicy::PreferFrom,
204            );
205            entity.properties = merged;
206            changed_fields.push("properties");
207        }
208        if let Some(tags) = patch.tags {
209            entity.tags = tags;
210            changed_fields.push("tags");
211        }
212
213        entity.updated_at = chrono::Utc::now().timestamp_micros();
214        store.upsert_entity(entity.clone()).await?;
215
216        if text_changed {
217            self.reindex_entity(token, &entity).await?;
218        }
219
220        let event_store = self.events(token)?;
221        let event = khive_storage::event::Event::new(
222            entity.namespace.clone(),
223            "update",
224            EventKind::EntityUpdated,
225            SubstrateKind::Entity,
226            "",
227        )
228        .with_target(entity.id)
229        .with_payload(serde_json::json!({
230            "id": entity.id,
231            "namespace": entity.namespace,
232            "changed_fields": changed_fields,
233        }));
234        event_store.append_event(event).await.map_err(|e| {
235            RuntimeError::Internal(format!("update_entity: event store write failed: {e}"))
236        })?;
237
238        Ok(entity)
239    }
240
241    /// Merge `from_id` into `into_id`.
242    ///
243    /// All edges incident to `from_id` are rewired to `into_id`. Self-loops that would
244    /// result from the rewire are dropped. Properties and tags are merged per `strategy`.
245    /// `from_id` is tombstoned with merge provenance and removed from indexes. Returns a summary.
246    ///
247    /// If `dry_run` is true, computes and returns the planned summary without mutating any rows.
248    ///
249    /// Atomic: all SQL (entity reads/writes, edge rewires, FTS updates, vec-index delete)
250    /// runs on a single pool connection inside one `BEGIN IMMEDIATE` transaction via
251    /// `merge_entity_sql`. If embedding vectors are configured, the vector re-insert for
252    /// `into_id` is performed after the transaction (requires async embedding computation).
253    pub async fn merge_entity(
254        &self,
255        token: &NamespaceToken,
256        into_id: Uuid,
257        from_id: Uuid,
258        strategy: EntityDedupMergePolicy,
259        dry_run: bool,
260    ) -> RuntimeResult<MergeSummary> {
261        if into_id == from_id {
262            return Err(RuntimeError::InvalidInput(
263                "cannot merge an entity into itself".into(),
264            ));
265        }
266        // H2 fix: enforce same-kind constraint at the runtime layer (ADR-014).
267        // The handler also checks this, but any direct runtime caller (CLI, tests,
268        // future SDK) would bypass the handler guard without this check here.
269        {
270            let into_entity = self.get_entity(token, into_id).await?;
271            let from_entity = self.get_entity(token, from_id).await?;
272            if into_entity.kind != from_entity.kind {
273                return Err(RuntimeError::InvalidInput(format!(
274                    "cannot merge entities of different kinds: into={} ({}), from={} ({}); \
275                     merge requires both entities to share the same kind",
276                    into_id, into_entity.kind, from_id, from_entity.kind
277                )));
278            }
279        }
280        let ns = token.namespace().as_str().to_owned();
281        let sanitized_ns: String = ns
282            .chars()
283            .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
284            .collect();
285        let fts_table = format!("fts_entities_{}", sanitized_ns);
286        let vec_table = self.config().embedding_model.map(|model| {
287            let key: String = model
288                .to_string()
289                .chars()
290                .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
291                .collect();
292            format!("vec_{}", key)
293        });
294
295        // Ensure all required tables exist before entering the transaction.
296        // Each accessor applies its DDL idempotently via `CREATE TABLE IF NOT EXISTS`.
297        let _ = self.entities(token)?;
298        let _ = self.graph(token)?;
299        let _ = self.text(token)?;
300        if self.config().embedding_model.is_some() {
301            let _ = self.vectors(token)?;
302        }
303
304        let pool = self.backend().pool_arc();
305
306        let (summary, updated_entity) = tokio::task::spawn_blocking(move || {
307            let guard = pool.writer()?;
308            guard.transaction(|conn| {
309                merge_entity_sql(
310                    conn, ns, fts_table, vec_table, into_id, from_id, strategy, dry_run,
311                )
312            })
313        })
314        .await
315        .map_err(|e| RuntimeError::Internal(e.to_string()))??;
316
317        // If vectors are configured, reindex into_entity (requires async embedding).
318        // FTS and vec-delete were already committed inside the transaction above.
319        if !dry_run && self.config().embedding_model.is_some() {
320            self.reindex_entity(token, &updated_entity).await?;
321        }
322
323        let event_store = self.events(token)?;
324        // Mirror the wire-level strategy spelling from MergeParams so consumers
325        // can round-trip the policy string back into a request.
326        let policy_str = match strategy {
327            EntityDedupMergePolicy::PreferInto => "prefer_into",
328            EntityDedupMergePolicy::PreferFrom => "prefer_from",
329            EntityDedupMergePolicy::Union => "union",
330        };
331        let event = khive_storage::event::Event::new(
332            updated_entity.namespace.clone(),
333            "merge",
334            EventKind::EntityMerged,
335            SubstrateKind::Entity,
336            "",
337        )
338        .with_target(summary.kept_id)
339        .with_payload(serde_json::json!({
340            "into_id": summary.kept_id,
341            "from_id": summary.removed_id,
342            "policy": policy_str,
343            "edges_rewired": summary.edges_rewired,
344        }));
345        event_store.append_event(event).await.map_err(|e| {
346            RuntimeError::Internal(format!("merge_entity: event store write failed: {e}"))
347        })?;
348
349        Ok(summary)
350    }
351
352    // ---- Internal helpers ----
353
354    /// Re-upsert FTS5 document (and vector if model configured) for the entity.
355    ///
356    /// Uses `entity.namespace` — the authoritative namespace stored on the record — rather
357    /// than the caller-supplied `namespace` parameter. This prevents a cross-namespace
358    /// reindex from writing the search document into the wrong namespace's FTS index.
359    pub(crate) async fn reindex_entity(
360        &self,
361        token: &NamespaceToken,
362        entity: &Entity,
363    ) -> RuntimeResult<()> {
364        let body = match &entity.description {
365            Some(d) if !d.is_empty() => format!("{} {}", entity.name, d),
366            _ => entity.name.clone(),
367        };
368        // Use entity.namespace (authoritative) rather than token.namespace().as_str() (caller claim).
369        let ns = entity.namespace.clone();
370        self.text(token)?
371            .upsert_document(TextDocument {
372                subject_id: entity.id,
373                kind: SubstrateKind::Entity,
374                title: Some(entity.name.clone()),
375                body: body.clone(),
376                tags: entity.tags.clone(),
377                namespace: ns.clone(),
378                metadata: entity.properties.clone(),
379                updated_at: chrono::Utc::now(),
380            })
381            .await?;
382
383        if self.config().embedding_model.is_some() {
384            let vector = self.embed(&body).await?;
385            self.vectors(token)?
386                .insert(
387                    entity.id,
388                    SubstrateKind::Entity,
389                    &ns,
390                    "entity.body",
391                    vec![vector],
392                )
393                .await?;
394        }
395
396        Ok(())
397    }
398
399    /// Remove an entity from FTS5 and (if configured) vector indexes.
400    pub(crate) async fn remove_from_indexes(
401        &self,
402        token: &NamespaceToken,
403        id: Uuid,
404    ) -> RuntimeResult<()> {
405        let ns = token.namespace().as_str().to_owned();
406        self.text(token)?.delete_document(&ns, id).await?;
407        if self.config().embedding_model.is_some() {
408            self.vectors(token)?.delete(id).await?;
409        }
410        Ok(())
411    }
412
413    /// Re-upsert FTS5 document (and vector if model configured) for the note.
414    pub(crate) async fn reindex_note(
415        &self,
416        token: &NamespaceToken,
417        note: &khive_storage::note::Note,
418    ) -> RuntimeResult<()> {
419        let ns = note.namespace.clone();
420        self.text_for_notes(token)?
421            .upsert_document(TextDocument {
422                subject_id: note.id,
423                kind: SubstrateKind::Note,
424                title: note.name.clone(),
425                body: note.content.clone(),
426                tags: Vec::new(),
427                namespace: ns.clone(),
428                metadata: note.properties.clone(),
429                updated_at: chrono::Utc::now(),
430            })
431            .await?;
432
433        if self.config().embedding_model.is_some() {
434            let vector = self.embed(&note.content).await?;
435            self.vectors(token)?
436                .insert(
437                    note.id,
438                    SubstrateKind::Note,
439                    &ns,
440                    "note.content",
441                    vec![vector],
442                )
443                .await?;
444        }
445        Ok(())
446    }
447
448    /// Patch-style note update.
449    pub async fn update_note(
450        &self,
451        token: &NamespaceToken,
452        id: Uuid,
453        patch: NotePatch,
454    ) -> RuntimeResult<khive_storage::note::Note> {
455        let store = self.notes(token)?;
456        let mut note = store
457            .get_note(id)
458            .await?
459            .ok_or_else(|| RuntimeError::NotFound(format!("note {id}")))?;
460
461        Self::ensure_namespace(&note.namespace, token.namespace().as_str())?;
462
463        let mut text_changed = false;
464
465        if let Some(name_patch) = patch.name {
466            text_changed |= note.name != name_patch;
467            note.name = name_patch;
468        }
469        if let Some(content) = patch.content {
470            text_changed |= note.content != content;
471            note.content = content;
472        }
473        if let Some(salience_patch) = patch.salience {
474            note.salience = salience_patch.map(|s| s.clamp(0.0, 1.0));
475        }
476        if let Some(decay_patch) = patch.decay_factor {
477            note.decay_factor = decay_patch.map(|d| d.max(0.0));
478        }
479        if let Some(props) = patch.properties {
480            let (merged, _) = merge_properties(
481                &note.properties,
482                &Some(props),
483                EntityDedupMergePolicy::PreferFrom,
484            );
485            note.properties = merged;
486        }
487        if let Some(status) = patch.kind_status {
488            note.status = status;
489        }
490
491        note.updated_at = chrono::Utc::now().timestamp_micros();
492        store.upsert_note(note.clone()).await?;
493
494        if text_changed {
495            self.reindex_note(token, &note).await?;
496        }
497
498        Ok(note)
499    }
500
501    /// Merge `from_id` note into `into_id` note.
502    ///
503    /// Both notes must exist in the namespace and have the same `kind`. Content is merged
504    /// per `content_strategy`. Properties are merged per `strategy`. `from_id` is
505    /// tombstoned (status='deleted', deleted_at set). Returns a summary.
506    ///
507    /// If `dry_run` is true, computes and returns the planned summary without mutating
508    /// any rows, edges, or indexes.
509    pub async fn merge_note(
510        &self,
511        token: &NamespaceToken,
512        into_id: Uuid,
513        from_id: Uuid,
514        strategy: EntityDedupMergePolicy,
515        content_strategy: ContentMergeStrategy,
516        dry_run: bool,
517    ) -> RuntimeResult<MergeSummary> {
518        if into_id == from_id {
519            return Err(RuntimeError::InvalidInput(
520                "cannot merge a note into itself".into(),
521            ));
522        }
523        let ns = token.namespace().as_str().to_string();
524        let sanitized_ns: String = ns
525            .chars()
526            .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
527            .collect();
528        let fts_table = format!("fts_notes_{}", sanitized_ns);
529        let vec_table = self.config().embedding_model.map(|model| {
530            let key: String = model
531                .to_string()
532                .chars()
533                .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
534                .collect();
535            format!("vec_{}", key)
536        });
537
538        let note_store = self.notes(token)?;
539        let into_note = note_store
540            .get_note(into_id)
541            .await?
542            .ok_or_else(|| RuntimeError::NotFound("not found in this namespace".into()))?;
543        Self::ensure_namespace(&into_note.namespace, &ns)?;
544
545        let from_note = note_store
546            .get_note(from_id)
547            .await?
548            .ok_or_else(|| RuntimeError::NotFound("not found in this namespace".into()))?;
549        Self::ensure_namespace(&from_note.namespace, &ns)?;
550
551        let _ = self.graph(token)?;
552        let _ = self.text_for_notes(token)?;
553        if self.config().embedding_model.is_some() {
554            let _ = self.vectors(token)?;
555        }
556
557        let pool = self.backend().pool_arc();
558        let (summary, updated_note) = tokio::task::spawn_blocking(move || {
559            let guard = pool.writer()?;
560            guard.transaction(|conn| {
561                merge_note_sql(
562                    conn,
563                    ns,
564                    fts_table,
565                    vec_table,
566                    into_id,
567                    from_id,
568                    strategy,
569                    content_strategy,
570                    dry_run,
571                )
572            })
573        })
574        .await
575        .map_err(|e| RuntimeError::Internal(e.to_string()))??;
576
577        if !dry_run && self.config().embedding_model.is_some() {
578            self.reindex_note(token, &updated_note).await?;
579        }
580        Ok(summary)
581    }
582}
583
584// ---------------------------------------------------------------------------
585// Transactional merge SQL helpers
586// ---------------------------------------------------------------------------
587
588/// Read one entity row by ID within a namespace, returning `SqliteError` on missing/wrong-ns.
589fn read_merge_entity(
590    conn: &rusqlite::Connection,
591    id: Uuid,
592    namespace: &str,
593) -> Result<Entity, SqliteError> {
594    let id_str = id.to_string();
595    let mut stmt = conn.prepare(
596        "SELECT id, namespace, kind, entity_type, name, description, properties, tags, \
597         created_at, updated_at, deleted_at, merged_into, merge_event_id \
598         FROM entities WHERE id = ?1 AND deleted_at IS NULL",
599    )?;
600    let mut rows = stmt.query(rusqlite::params![id_str])?;
601    let row = rows
602        .next()?
603        .ok_or_else(|| SqliteError::InvalidData(format!("entity {id} not found")))?;
604
605    let id_s: String = row.get(0)?;
606    let ns: String = row.get(1)?;
607    let kind: String = row.get(2)?;
608    let entity_type: Option<String> = row.get(3)?;
609    let name: String = row.get(4)?;
610    let description: Option<String> = row.get(5)?;
611    let properties_str: Option<String> = row.get(6)?;
612    let tags_str: String = row.get(7)?;
613    let created_at: i64 = row.get(8)?;
614    let updated_at: i64 = row.get(9)?;
615    let deleted_at: Option<i64> = row.get(10)?;
616    let merged_into_str: Option<String> = row.get(11)?;
617    let merge_event_id_str: Option<String> = row.get(12)?;
618
619    if ns != namespace {
620        return Err(SqliteError::InvalidData(format!(
621            "entity {id} belongs to namespace '{ns}', not '{namespace}'"
622        )));
623    }
624
625    let entity_id = Uuid::parse_str(&id_s).map_err(|e| SqliteError::InvalidData(e.to_string()))?;
626    let properties: Option<Value> = properties_str
627        .map(|s| {
628            serde_json::from_str::<Value>(&s).map_err(|e| SqliteError::InvalidData(e.to_string()))
629        })
630        .transpose()?;
631    let tags: Vec<String> =
632        serde_json::from_str(&tags_str).map_err(|e| SqliteError::InvalidData(e.to_string()))?;
633    let merged_into = merged_into_str
634        .as_deref()
635        .map(Uuid::parse_str)
636        .transpose()
637        .map_err(|e| SqliteError::InvalidData(e.to_string()))?;
638    let merge_event_id = merge_event_id_str
639        .as_deref()
640        .map(Uuid::parse_str)
641        .transpose()
642        .map_err(|e| SqliteError::InvalidData(e.to_string()))?;
643
644    Ok(Entity {
645        id: entity_id,
646        namespace: ns,
647        kind,
648        entity_type,
649        name,
650        description,
651        properties,
652        tags,
653        created_at,
654        updated_at,
655        deleted_at,
656        merged_into,
657        merge_event_id,
658    })
659}
660
661/// All merge SQL on one connection inside an already-open `BEGIN IMMEDIATE` transaction.
662///
663/// Reads both entities, rewires/drops incident edges, merges entity fields, updates FTS,
664/// deletes the `from` vec entry (if `vec_table` is Some), and tombstones `from` with merge
665/// provenance.  Returns the updated `into` entity so the caller can do the async vec re-insert.
666///
667/// When `dry_run` is true, all reads and computations are performed but no writes are issued.
668#[allow(clippy::too_many_arguments)]
669fn merge_entity_sql(
670    conn: &rusqlite::Connection,
671    namespace: String,
672    fts_table: String,
673    vec_table: Option<String>,
674    into_id: Uuid,
675    from_id: Uuid,
676    strategy: EntityDedupMergePolicy,
677    dry_run: bool,
678) -> Result<(MergeSummary, Entity), SqliteError> {
679    let into_entity = read_merge_entity(conn, into_id, &namespace)?;
680    let from_entity = read_merge_entity(conn, from_id, &namespace)?;
681
682    // --- Collect edges incident to from_id ---
683    #[allow(dead_code)]
684    struct EdgeRow {
685        id: Uuid,
686        source_id: Uuid,
687        target_id: Uuid,
688        relation: String,
689        weight: f64,
690        created_at: i64,
691        updated_at: i64,
692        deleted_at: Option<i64>,
693        target_backend: Option<String>,
694        metadata: Option<String>,
695    }
696
697    let parse_id =
698        |s: String| Uuid::parse_str(&s).map_err(|e| SqliteError::InvalidData(e.to_string()));
699
700    let from_str = from_id.to_string();
701
702    let mut outbound: Vec<EdgeRow> = Vec::new();
703    {
704        let mut stmt = conn.prepare(
705            "SELECT id, source_id, target_id, relation, weight, created_at, \
706                    updated_at, deleted_at, target_backend, metadata \
707             FROM graph_edges WHERE namespace = ?1 AND source_id = ?2",
708        )?;
709        let mut rows = stmt.query(rusqlite::params![&namespace, &from_str])?;
710        while let Some(row) = rows.next()? {
711            outbound.push(EdgeRow {
712                id: parse_id(row.get(0)?)?,
713                source_id: parse_id(row.get(1)?)?,
714                target_id: parse_id(row.get(2)?)?,
715                relation: row.get(3)?,
716                weight: row.get(4)?,
717                created_at: row.get(5)?,
718                updated_at: row.get(6)?,
719                deleted_at: row.get(7)?,
720                target_backend: row.get(8)?,
721                metadata: row.get(9)?,
722            });
723        }
724    }
725
726    let mut inbound: Vec<EdgeRow> = Vec::new();
727    {
728        let mut stmt = conn.prepare(
729            "SELECT id, source_id, target_id, relation, weight, created_at, \
730                    updated_at, deleted_at, target_backend, metadata \
731             FROM graph_edges WHERE namespace = ?1 AND target_id = ?2",
732        )?;
733        let mut rows = stmt.query(rusqlite::params![&namespace, &from_str])?;
734        while let Some(row) = rows.next()? {
735            inbound.push(EdgeRow {
736                id: parse_id(row.get(0)?)?,
737                source_id: parse_id(row.get(1)?)?,
738                target_id: parse_id(row.get(2)?)?,
739                relation: row.get(3)?,
740                weight: row.get(4)?,
741                created_at: row.get(5)?,
742                updated_at: row.get(6)?,
743                deleted_at: row.get(7)?,
744                target_backend: row.get(8)?,
745                metadata: row.get(9)?,
746            });
747        }
748    }
749
750    // Deduplicate by edge ID (a self-edge from_id→from_id appears in both lists).
751    let mut seen: HashSet<Uuid> = HashSet::new();
752    let mut all_edges: Vec<EdgeRow> = Vec::new();
753    for edge in outbound.into_iter().chain(inbound) {
754        if seen.insert(edge.id) {
755            all_edges.push(edge);
756        }
757    }
758
759    // --- Merge entity fields ---
760    let (merged_props, properties_merged) =
761        merge_properties(&into_entity.properties, &from_entity.properties, strategy);
762    let merged_name = merge_string_field(&into_entity.name, &from_entity.name, strategy);
763    let merged_description =
764        merge_option_string_field(&into_entity.description, &from_entity.description, strategy);
765    let (merged_tags, tags_unioned) = union_tags(&into_entity.tags, &from_entity.tags);
766
767    let now = chrono::Utc::now().timestamp_micros();
768    let into_str = into_id.to_string();
769    let props_str = merged_props
770        .as_ref()
771        .map(|v| serde_json::to_string(v).unwrap_or_default());
772    let tags_json = serde_json::to_string(&merged_tags).unwrap_or_else(|_| "[]".to_string());
773
774    // --- Rewire edges ---
775    let mut edges_rewired = 0usize;
776    if !dry_run {
777        for edge in all_edges {
778            let raw_src = if edge.source_id == from_id {
779                into_id
780            } else {
781                edge.source_id
782            };
783            let raw_tgt = if edge.target_id == from_id {
784                into_id
785            } else {
786                edge.target_id
787            };
788            // ADR-002 §134: symmetric relations must be stored with source_uuid < target_uuid.
789            // Apply canonicalization so the conflict check and UPDATE both use the canonical form.
790            let (new_src, new_tgt) = match edge.relation.parse::<EdgeRelation>() {
791                Ok(rel) => canonical_edge_endpoints(rel, raw_src, raw_tgt),
792                Err(_) => (raw_src, raw_tgt),
793            };
794
795            if new_src == new_tgt {
796                conn.execute(
797                    "DELETE FROM graph_edges WHERE namespace = ?1 AND id = ?2",
798                    rusqlite::params![&namespace, edge.id.to_string()],
799                )?;
800                continue;
801            }
802
803            let now_ts = chrono::Utc::now().timestamp();
804            // H3 fix (ADR-009/ADR-014): preserve the original edge ID by updating
805            // source_id/target_id in-place when no conflict exists.
806            //
807            // Two-step approach to handle all cases while keeping the original ID:
808            //   (a) No conflict (new triple): UPDATE source_id/target_id in-place.
809            //       The edge retains its original UUID — callers can still get() it
810            //       by the ID they received from link().
811            //   (b) Conflict: into_id already has an edge with this (source,target,
812            //       relation). Delete the from-edge (it is superseded) and UPDATE
813            //       the existing into-edge to refresh weight/metadata/deleted_at.
814            //       The surviving edge is the into-entity's original edge (correct).
815            //
816            // Check for a conflict: does into_id already have this natural key?
817            let conflict_id: Option<String> = {
818                let conflict_src = new_src.to_string();
819                let conflict_tgt = new_tgt.to_string();
820                conn.query_row(
821                    "SELECT id FROM graph_edges \
822                     WHERE namespace = ?1 AND source_id = ?2 AND target_id = ?3 \
823                     AND relation = ?4 AND id != ?5",
824                    rusqlite::params![
825                        &namespace,
826                        &conflict_src,
827                        &conflict_tgt,
828                        &edge.relation,
829                        edge.id.to_string(),
830                    ],
831                    |row| row.get(0),
832                )
833                .optional()
834                .map_err(SqliteError::Rusqlite)?
835            };
836
837            let changed = if let Some(existing_id) = conflict_id {
838                // Case (b): a live or soft-deleted row already owns this natural key.
839                // Delete the from-edge and refresh the existing row.
840                conn.execute(
841                    "DELETE FROM graph_edges WHERE namespace = ?1 AND id = ?2",
842                    rusqlite::params![&namespace, edge.id.to_string()],
843                )?;
844                conn.execute(
845                    "UPDATE graph_edges SET \
846                     weight = ?1, updated_at = ?2, deleted_at = NULL, \
847                     target_backend = ?3, metadata = ?4 \
848                     WHERE namespace = ?5 AND id = ?6",
849                    rusqlite::params![
850                        edge.weight,
851                        now_ts,
852                        edge.target_backend,
853                        edge.metadata,
854                        &namespace,
855                        &existing_id,
856                    ],
857                )?
858            } else {
859                // Case (a): no conflict — update source_id/target_id in-place,
860                // preserving the original edge ID for callers.
861                conn.execute(
862                    "UPDATE graph_edges SET \
863                     source_id = ?1, target_id = ?2, updated_at = ?3 \
864                     WHERE namespace = ?4 AND id = ?5",
865                    rusqlite::params![
866                        new_src.to_string(),
867                        new_tgt.to_string(),
868                        now_ts,
869                        &namespace,
870                        edge.id.to_string(),
871                    ],
872                )?
873            };
874            if changed > 0 {
875                edges_rewired += 1;
876            }
877        }
878
879        // --- Upsert merged entity ---
880        conn.execute(
881            "INSERT OR REPLACE INTO entities \
882             (id, namespace, kind, name, description, properties, tags, \
883              created_at, updated_at, deleted_at, merged_into, merge_event_id) \
884             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
885            rusqlite::params![
886                &into_str,
887                &namespace,
888                &into_entity.kind,
889                &merged_name,
890                &merged_description,
891                &props_str,
892                &tags_json,
893                into_entity.created_at,
894                now,
895                into_entity.deleted_at,
896                Option::<String>::None,
897                Option::<String>::None,
898            ],
899        )?;
900
901        // --- Reindex into_id in FTS (delete existing, insert updated) ---
902        let fts_body = match &merged_description {
903            Some(d) if !d.is_empty() => format!("{} {}", merged_name, d),
904            _ => merged_name.clone(),
905        };
906        let kind_str = SubstrateKind::Entity.to_string();
907
908        conn.execute(
909            &format!(
910                "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
911                fts_table
912            ),
913            rusqlite::params![&namespace, &into_str],
914        )?;
915        conn.execute(
916            &format!(
917                "INSERT INTO {} \
918                 (subject_id, kind, title, body, tags, namespace, metadata, updated_at) \
919                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
920                fts_table
921            ),
922            rusqlite::params![
923                &into_str,
924                &kind_str,
925                &merged_name,
926                &fts_body,
927                &tags_json,
928                &namespace,
929                &props_str,
930                now,
931            ],
932        )?;
933
934        // --- Delete from_id from FTS ---
935        conn.execute(
936            &format!(
937                "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
938                fts_table
939            ),
940            rusqlite::params![&namespace, &from_str],
941        )?;
942
943        // --- Delete from_id from vector index if configured ---
944        if let Some(ref vec_tbl) = vec_table {
945            conn.execute(
946                &format!(
947                    "DELETE FROM {} WHERE subject_id = ?1 AND namespace = ?2",
948                    vec_tbl
949                ),
950                rusqlite::params![&from_str, &namespace],
951            )?;
952        }
953
954        // --- Tombstone from entity (ADR-014: soft-delete with provenance) ---
955        let merge_event_id = Uuid::new_v4();
956        conn.execute(
957            "UPDATE entities \
958             SET deleted_at = ?1, merged_into = ?2, merge_event_id = ?3, updated_at = ?1 \
959             WHERE namespace = ?4 AND id = ?5 AND deleted_at IS NULL",
960            rusqlite::params![
961                now,
962                into_str,
963                merge_event_id.to_string(),
964                &namespace,
965                &from_str,
966            ],
967        )?;
968    }
969
970    let updated_entity = Entity {
971        id: into_id,
972        namespace,
973        kind: into_entity.kind,
974        entity_type: into_entity.entity_type,
975        name: merged_name,
976        description: merged_description,
977        properties: merged_props,
978        tags: merged_tags,
979        created_at: into_entity.created_at,
980        updated_at: now,
981        deleted_at: into_entity.deleted_at,
982        merged_into: None,
983        merge_event_id: None,
984    };
985
986    Ok((
987        MergeSummary {
988            kept_id: into_id,
989            removed_id: from_id,
990            edges_rewired,
991            properties_merged,
992            tags_unioned,
993            content_appended: false,
994            dry_run,
995        },
996        updated_entity,
997    ))
998}
999
1000// ---------------------------------------------------------------------------
1001// Note merge SQL helpers
1002// ---------------------------------------------------------------------------
1003
1004/// Read one note row by ID within a namespace, returning `SqliteError` on missing/wrong-ns.
1005fn read_merge_note(
1006    conn: &rusqlite::Connection,
1007    id: Uuid,
1008    namespace: &str,
1009) -> Result<khive_storage::note::Note, SqliteError> {
1010    use khive_storage::note::Note;
1011    let id_str = id.to_string();
1012    let mut stmt = conn.prepare(
1013        "SELECT id, namespace, kind, status, name, content, salience, decay_factor, \
1014         expires_at, properties, created_at, updated_at, deleted_at \
1015         FROM notes WHERE id = ?1 AND deleted_at IS NULL",
1016    )?;
1017    let mut rows = stmt.query(rusqlite::params![id_str])?;
1018    let row = rows
1019        .next()?
1020        .ok_or_else(|| SqliteError::InvalidData(format!("note {id} not found")))?;
1021
1022    let id_s: String = row.get(0)?;
1023    let ns: String = row.get(1)?;
1024    let kind: String = row.get(2)?;
1025    let status: String = row.get(3)?;
1026    let name: Option<String> = row.get(4)?;
1027    let content: String = row.get(5)?;
1028    let salience: Option<f64> = row.get(6)?;
1029    let decay_factor: Option<f64> = row.get(7)?;
1030    let expires_at: Option<i64> = row.get(8)?;
1031    let properties_str: Option<String> = row.get(9)?;
1032    let created_at: i64 = row.get(10)?;
1033    let updated_at: i64 = row.get(11)?;
1034    let deleted_at: Option<i64> = row.get(12)?;
1035
1036    if ns != namespace {
1037        return Err(SqliteError::InvalidData(format!(
1038            "note {id} belongs to namespace '{ns}', not '{namespace}'"
1039        )));
1040    }
1041
1042    let note_id = Uuid::parse_str(&id_s).map_err(|e| SqliteError::InvalidData(e.to_string()))?;
1043    let properties: Option<serde_json::Value> = properties_str
1044        .map(|s| serde_json::from_str(&s).map_err(|e| SqliteError::InvalidData(e.to_string())))
1045        .transpose()?;
1046
1047    Ok(Note {
1048        id: note_id,
1049        namespace: ns,
1050        kind,
1051        status,
1052        name,
1053        content,
1054        salience,
1055        decay_factor,
1056        expires_at,
1057        properties,
1058        created_at,
1059        updated_at,
1060        deleted_at,
1061    })
1062}
1063
1064fn max_option_f64(a: Option<f64>, b: Option<f64>) -> Option<f64> {
1065    match (a, b) {
1066        (Some(x), Some(y)) => Some(x.max(y)),
1067        (Some(x), None) => Some(x),
1068        (None, Some(y)) => Some(y),
1069        (None, None) => None,
1070    }
1071}
1072
1073fn append_merge_history(props: Option<Value>, entry: Value) -> Result<Option<Value>, SqliteError> {
1074    use serde_json::{json, Map};
1075    let mut obj: Map<String, Value> = match props {
1076        Some(Value::Object(m)) => m,
1077        Some(other) => {
1078            let mut m = Map::new();
1079            m.insert("_value".into(), other);
1080            m
1081        }
1082        None => Map::new(),
1083    };
1084    let history = obj
1085        .entry("_merge_history".to_string())
1086        .or_insert_with(|| json!([]));
1087    if let Value::Array(arr) = history {
1088        arr.push(entry);
1089    }
1090    Ok(Some(Value::Object(obj)))
1091}
1092
1093/// All note merge SQL on one connection inside a `BEGIN IMMEDIATE` transaction.
1094///
1095/// Reads both notes (must have same `kind`), rewires/drops incident edges, merges content
1096/// per `content_strategy`, tombstones `from`. Returns the updated `into` note for async
1097/// re-embedding.
1098///
1099/// When `dry_run` is true, all reads and computations are performed but no writes are issued.
1100#[allow(clippy::too_many_arguments)]
1101fn merge_note_sql(
1102    conn: &rusqlite::Connection,
1103    namespace: String,
1104    fts_table: String,
1105    vec_table: Option<String>,
1106    into_id: Uuid,
1107    from_id: Uuid,
1108    strategy: EntityDedupMergePolicy,
1109    content_strategy: ContentMergeStrategy,
1110    dry_run: bool,
1111) -> Result<(MergeSummary, khive_storage::note::Note), SqliteError> {
1112    let into_note = read_merge_note(conn, into_id, &namespace)?;
1113    let from_note = read_merge_note(conn, from_id, &namespace)?;
1114
1115    if into_note.kind != from_note.kind {
1116        return Err(SqliteError::InvalidData(format!(
1117            "cannot merge notes of different kinds: {} vs {}",
1118            into_note.kind, from_note.kind
1119        )));
1120    }
1121
1122    let now = chrono::Utc::now().timestamp_micros();
1123    let into_str = into_id.to_string();
1124    let from_str = from_id.to_string();
1125
1126    // Collect edges incident to from_id.
1127    #[allow(dead_code)]
1128    struct EdgeRow {
1129        id: Uuid,
1130        source_id: Uuid,
1131        target_id: Uuid,
1132        relation: String,
1133        weight: f64,
1134        created_at: i64,
1135        updated_at: i64,
1136        deleted_at: Option<i64>,
1137        target_backend: Option<String>,
1138        metadata: Option<String>,
1139    }
1140    let parse_id =
1141        |s: String| Uuid::parse_str(&s).map_err(|e| SqliteError::InvalidData(e.to_string()));
1142
1143    let mut outbound: Vec<EdgeRow> = Vec::new();
1144    {
1145        let mut stmt = conn.prepare(
1146            "SELECT id, source_id, target_id, relation, weight, created_at, updated_at, deleted_at, target_backend, metadata \
1147             FROM graph_edges WHERE namespace = ?1 AND source_id = ?2",
1148        )?;
1149        let mut rows = stmt.query(rusqlite::params![&namespace, &from_str])?;
1150        while let Some(row) = rows.next()? {
1151            outbound.push(EdgeRow {
1152                id: parse_id(row.get(0)?)?,
1153                source_id: parse_id(row.get(1)?)?,
1154                target_id: parse_id(row.get(2)?)?,
1155                relation: row.get(3)?,
1156                weight: row.get(4)?,
1157                created_at: row.get(5)?,
1158                updated_at: row.get(6)?,
1159                deleted_at: row.get(7)?,
1160                target_backend: row.get(8)?,
1161                metadata: row.get(9)?,
1162            });
1163        }
1164    }
1165    let mut inbound: Vec<EdgeRow> = Vec::new();
1166    {
1167        let mut stmt = conn.prepare(
1168            "SELECT id, source_id, target_id, relation, weight, created_at, updated_at, deleted_at, target_backend, metadata \
1169             FROM graph_edges WHERE namespace = ?1 AND target_id = ?2",
1170        )?;
1171        let mut rows = stmt.query(rusqlite::params![&namespace, &from_str])?;
1172        while let Some(row) = rows.next()? {
1173            inbound.push(EdgeRow {
1174                id: parse_id(row.get(0)?)?,
1175                source_id: parse_id(row.get(1)?)?,
1176                target_id: parse_id(row.get(2)?)?,
1177                relation: row.get(3)?,
1178                weight: row.get(4)?,
1179                created_at: row.get(5)?,
1180                updated_at: row.get(6)?,
1181                deleted_at: row.get(7)?,
1182                target_backend: row.get(8)?,
1183                metadata: row.get(9)?,
1184            });
1185        }
1186    }
1187    let mut seen: HashSet<Uuid> = HashSet::new();
1188    let mut all_edges: Vec<EdgeRow> = Vec::new();
1189    for edge in outbound.into_iter().chain(inbound) {
1190        if seen.insert(edge.id) {
1191            all_edges.push(edge);
1192        }
1193    }
1194
1195    // Merge note fields.
1196    let (merged_content, content_appended) = match content_strategy {
1197        ContentMergeStrategy::Append => {
1198            if from_note.content.is_empty() {
1199                (into_note.content.clone(), false)
1200            } else {
1201                (
1202                    format!("{}\n\n---\n\n{}", into_note.content, from_note.content),
1203                    true,
1204                )
1205            }
1206        }
1207        ContentMergeStrategy::PreferInto => (into_note.content.clone(), false),
1208        ContentMergeStrategy::PreferFrom => (from_note.content.clone(), false),
1209    };
1210
1211    let merged_name = match strategy {
1212        EntityDedupMergePolicy::PreferFrom => from_note.name.clone().or(into_note.name.clone()),
1213        _ => into_note.name.clone().or(from_note.name.clone()),
1214    };
1215
1216    let (merged_props, properties_merged) =
1217        merge_properties(&into_note.properties, &from_note.properties, strategy);
1218
1219    // Append merge history to properties.
1220    let merge_history_entry = serde_json::json!({
1221        "merged_from": from_id.to_string(),
1222        "merged_at": now,
1223        "strategy": format!("{:?}", strategy),
1224        "content_strategy": format!("{:?}", content_strategy),
1225    });
1226    let merged_props = append_merge_history(merged_props, merge_history_entry)?;
1227
1228    let merged_salience = max_option_f64(into_note.salience, from_note.salience);
1229    let merged_expires_at = match (into_note.expires_at, from_note.expires_at) {
1230        (Some(a), Some(b)) => Some(a.max(b)),
1231        (Some(a), None) => Some(a),
1232        (None, Some(b)) => Some(b),
1233        (None, None) => None,
1234    };
1235
1236    let props_str = merged_props
1237        .as_ref()
1238        .map(|v| serde_json::to_string(v).unwrap_or_default());
1239
1240    let mut edges_rewired = 0usize;
1241    if !dry_run {
1242        // Rewire and upsert.
1243        for edge in all_edges {
1244            let raw_src = if edge.source_id == from_id {
1245                into_id
1246            } else {
1247                edge.source_id
1248            };
1249            let raw_tgt = if edge.target_id == from_id {
1250                into_id
1251            } else {
1252                edge.target_id
1253            };
1254            // ADR-002 §134: canonicalize symmetric relations before conflict check + UPDATE.
1255            let (new_src, new_tgt) = match edge.relation.parse::<EdgeRelation>() {
1256                Ok(rel) => canonical_edge_endpoints(rel, raw_src, raw_tgt),
1257                Err(_) => (raw_src, raw_tgt),
1258            };
1259            if new_src == new_tgt {
1260                conn.execute(
1261                    "DELETE FROM graph_edges WHERE namespace = ?1 AND id = ?2",
1262                    rusqlite::params![&namespace, edge.id.to_string()],
1263                )?;
1264                continue;
1265            }
1266            let now_ts = chrono::Utc::now().timestamp();
1267            // Same two-step approach as entity merge rewire: preserve original edge ID
1268            // when no conflict, merge into existing row when conflict exists.
1269            let conflict_id: Option<String> = {
1270                let conflict_src = new_src.to_string();
1271                let conflict_tgt = new_tgt.to_string();
1272                conn.query_row(
1273                    "SELECT id FROM graph_edges \
1274                     WHERE namespace = ?1 AND source_id = ?2 AND target_id = ?3 \
1275                     AND relation = ?4 AND id != ?5",
1276                    rusqlite::params![
1277                        &namespace,
1278                        &conflict_src,
1279                        &conflict_tgt,
1280                        &edge.relation,
1281                        edge.id.to_string(),
1282                    ],
1283                    |row| row.get(0),
1284                )
1285                .optional()
1286                .map_err(SqliteError::Rusqlite)?
1287            };
1288
1289            let changed = if let Some(existing_id) = conflict_id {
1290                conn.execute(
1291                    "DELETE FROM graph_edges WHERE namespace = ?1 AND id = ?2",
1292                    rusqlite::params![&namespace, edge.id.to_string()],
1293                )?;
1294                conn.execute(
1295                    "UPDATE graph_edges SET \
1296                     weight = ?1, updated_at = ?2, deleted_at = NULL, \
1297                     target_backend = ?3, metadata = ?4 \
1298                     WHERE namespace = ?5 AND id = ?6",
1299                    rusqlite::params![
1300                        edge.weight,
1301                        now_ts,
1302                        edge.target_backend,
1303                        edge.metadata,
1304                        &namespace,
1305                        &existing_id,
1306                    ],
1307                )?
1308            } else {
1309                conn.execute(
1310                    "UPDATE graph_edges SET \
1311                     source_id = ?1, target_id = ?2, updated_at = ?3 \
1312                     WHERE namespace = ?4 AND id = ?5",
1313                    rusqlite::params![
1314                        new_src.to_string(),
1315                        new_tgt.to_string(),
1316                        now_ts,
1317                        &namespace,
1318                        edge.id.to_string(),
1319                    ],
1320                )?
1321            };
1322            if changed > 0 {
1323                edges_rewired += 1;
1324            }
1325        }
1326
1327        // Upsert merged into-note.
1328        conn.execute(
1329            "INSERT OR REPLACE INTO notes \
1330             (id, namespace, kind, status, name, content, salience, decay_factor, \
1331              expires_at, properties, created_at, updated_at, deleted_at) \
1332             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
1333            rusqlite::params![
1334                &into_str,
1335                &namespace,
1336                &into_note.kind,
1337                &into_note.status,
1338                &merged_name,
1339                &merged_content,
1340                merged_salience,
1341                into_note.decay_factor,
1342                merged_expires_at,
1343                &props_str,
1344                into_note.created_at,
1345                now,
1346                into_note.deleted_at,
1347            ],
1348        )?;
1349
1350        // Update FTS for into-note.
1351        conn.execute(
1352            &format!(
1353                "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
1354                fts_table
1355            ),
1356            rusqlite::params![&namespace, &into_str],
1357        )?;
1358        conn.execute(
1359            &format!(
1360                "INSERT INTO {} \
1361                 (subject_id, kind, title, body, tags, namespace, metadata, updated_at) \
1362                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1363                fts_table
1364            ),
1365            rusqlite::params![
1366                &into_str,
1367                SubstrateKind::Note.to_string(),
1368                &merged_name,
1369                &merged_content,
1370                "[]",
1371                &namespace,
1372                &props_str,
1373                now,
1374            ],
1375        )?;
1376
1377        // Delete from-note from FTS.
1378        conn.execute(
1379            &format!(
1380                "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
1381                fts_table
1382            ),
1383            rusqlite::params![&namespace, &from_str],
1384        )?;
1385
1386        // Delete from-note from vector index if configured.
1387        if let Some(ref vec_tbl) = vec_table {
1388            conn.execute(
1389                &format!(
1390                    "DELETE FROM {} WHERE subject_id = ?1 AND namespace = ?2",
1391                    vec_tbl
1392                ),
1393                rusqlite::params![&from_str, &namespace],
1394            )?;
1395        }
1396
1397        // Tombstone the from-note.
1398        conn.execute(
1399            "UPDATE notes SET status = 'deleted', deleted_at = ?1, updated_at = ?1 \
1400             WHERE namespace = ?2 AND id = ?3 AND deleted_at IS NULL",
1401            rusqlite::params![now, &namespace, &from_str],
1402        )?;
1403    }
1404
1405    let updated_note = khive_storage::note::Note {
1406        id: into_id,
1407        namespace: namespace.clone(),
1408        kind: into_note.kind.clone(),
1409        status: into_note.status.clone(),
1410        name: merged_name,
1411        content: merged_content,
1412        salience: merged_salience,
1413        decay_factor: into_note.decay_factor,
1414        expires_at: merged_expires_at,
1415        properties: merged_props,
1416        created_at: into_note.created_at,
1417        updated_at: now,
1418        deleted_at: into_note.deleted_at,
1419    };
1420
1421    Ok((
1422        MergeSummary {
1423            kept_id: into_id,
1424            removed_id: from_id,
1425            edges_rewired,
1426            properties_merged,
1427            tags_unioned: 0,
1428            content_appended,
1429            dry_run,
1430        },
1431        updated_note,
1432    ))
1433}
1434
1435// ---------------------------------------------------------------------------
1436// Merge helpers (pure functions — easier to unit test)
1437// ---------------------------------------------------------------------------
1438
1439fn merge_string_field(into: &str, from: &str, strategy: EntityDedupMergePolicy) -> String {
1440    match strategy {
1441        EntityDedupMergePolicy::PreferInto | EntityDedupMergePolicy::Union => into.to_string(),
1442        EntityDedupMergePolicy::PreferFrom => from.to_string(),
1443    }
1444}
1445
1446fn merge_option_string_field(
1447    into: &Option<String>,
1448    from: &Option<String>,
1449    strategy: EntityDedupMergePolicy,
1450) -> Option<String> {
1451    match strategy {
1452        EntityDedupMergePolicy::PreferInto => {
1453            if into.is_some() {
1454                into.clone()
1455            } else {
1456                from.clone()
1457            }
1458        }
1459        EntityDedupMergePolicy::PreferFrom => {
1460            if from.is_some() {
1461                from.clone()
1462            } else {
1463                into.clone()
1464            }
1465        }
1466        EntityDedupMergePolicy::Union => {
1467            // Keep into's description; if empty, append from's.
1468            match (into, from) {
1469                (Some(a), _) if !a.is_empty() => Some(a.clone()),
1470                (_, Some(b)) => Some(b.clone()),
1471                _ => None,
1472            }
1473        }
1474    }
1475}
1476
1477/// Merge two property objects. Returns (merged, count_of_fields_from_from_that_were_added).
1478fn merge_properties(
1479    into: &Option<Value>,
1480    from: &Option<Value>,
1481    strategy: EntityDedupMergePolicy,
1482) -> (Option<Value>, usize) {
1483    match (into, from) {
1484        (None, None) => (None, 0),
1485        (Some(a), None) => (Some(a.clone()), 0),
1486        (None, Some(b)) => {
1487            let count = if let Value::Object(m) = b { m.len() } else { 1 };
1488            (Some(b.clone()), count)
1489        }
1490        (Some(into_val), Some(from_val)) => {
1491            let (merged, added) = merge_json(into_val, from_val, strategy);
1492            (Some(merged), added)
1493        }
1494    }
1495}
1496
1497/// Deep-merge two JSON values per strategy. Returns (merged, keys_contributed_by_from).
1498fn merge_json(into: &Value, from: &Value, strategy: EntityDedupMergePolicy) -> (Value, usize) {
1499    match (into, from, strategy) {
1500        (Value::Object(a), Value::Object(b), EntityDedupMergePolicy::Union) => {
1501            let mut result = a.clone();
1502            let mut added = 0usize;
1503            for (k, v_from) in b {
1504                if let Some(v_into) = a.get(k) {
1505                    let (merged, sub_added) =
1506                        merge_json(v_into, v_from, EntityDedupMergePolicy::Union);
1507                    result.insert(k.clone(), merged);
1508                    added += sub_added;
1509                } else {
1510                    result.insert(k.clone(), v_from.clone());
1511                    added += 1;
1512                }
1513            }
1514            (Value::Object(result), added)
1515        }
1516        (Value::Object(a), Value::Object(b), EntityDedupMergePolicy::PreferInto) => {
1517            let mut result = a.clone();
1518            let mut added = 0usize;
1519            for (k, v) in b {
1520                if !a.contains_key(k) {
1521                    result.insert(k.clone(), v.clone());
1522                    added += 1;
1523                }
1524            }
1525            (Value::Object(result), added)
1526        }
1527        (Value::Object(a), Value::Object(b), EntityDedupMergePolicy::PreferFrom) => {
1528            let mut result = a.clone();
1529            let mut added = 0usize;
1530            for (k, v) in b {
1531                result.insert(k.clone(), v.clone());
1532                if !a.contains_key(k) {
1533                    added += 1;
1534                }
1535            }
1536            (Value::Object(result), added)
1537        }
1538        // Non-object scalars: apply strategy directly.
1539        (_into_val, from_val, EntityDedupMergePolicy::PreferFrom) => (from_val.clone(), 1),
1540        _ => (into.clone(), 0),
1541    }
1542}
1543
1544fn union_tags(into: &[String], from: &[String]) -> (Vec<String>, usize) {
1545    let mut seen: HashSet<&str> = into.iter().map(|s| s.as_str()).collect();
1546    let mut result: Vec<String> = into.to_vec();
1547    let mut added = 0usize;
1548    for tag in from {
1549        if seen.insert(tag.as_str()) {
1550            result.push(tag.clone());
1551            added += 1;
1552        }
1553    }
1554    (result, added)
1555}
1556
1557// ---------------------------------------------------------------------------
1558// Unit tests
1559// ---------------------------------------------------------------------------
1560
1561#[cfg(test)]
1562mod tests {
1563    use super::*;
1564    use crate::runtime::{KhiveRuntime, NamespaceToken};
1565    use khive_storage::types::{Direction, TextFilter, TextQueryMode, TextSearchRequest};
1566
1567    fn rt() -> KhiveRuntime {
1568        KhiveRuntime::memory().unwrap()
1569    }
1570
1571    // Helper: search FTS5 for `query` in a runtime namespace.
1572    async fn fts_hit(rt: &KhiveRuntime, token: &NamespaceToken, query: &str) -> Vec<Uuid> {
1573        let ns = token.namespace().as_str().to_string();
1574        rt.text(token)
1575            .unwrap()
1576            .search(TextSearchRequest {
1577                query: query.to_string(),
1578                mode: TextQueryMode::Plain,
1579                filter: Some(TextFilter {
1580                    namespaces: vec![ns],
1581                    ..Default::default()
1582                }),
1583                top_k: 50,
1584                snippet_chars: 100,
1585            })
1586            .await
1587            .unwrap()
1588            .into_iter()
1589            .map(|h| h.subject_id)
1590            .collect()
1591    }
1592
1593    #[tokio::test]
1594    async fn update_entity_patch_changes_only_specified_fields() {
1595        let rt = rt();
1596        let tok = NamespaceToken::local();
1597        let entity = rt
1598            .create_entity(
1599                &tok,
1600                "concept",
1601                None,
1602                "OriginalName",
1603                Some("orig desc"),
1604                Some(serde_json::json!({"k":"v"})),
1605                vec![],
1606            )
1607            .await
1608            .unwrap();
1609
1610        let updated = rt
1611            .update_entity(
1612                &tok,
1613                entity.id,
1614                EntityPatch {
1615                    description: Some(Some("new desc".to_string())),
1616                    ..Default::default()
1617                },
1618            )
1619            .await
1620            .unwrap();
1621
1622        assert_eq!(updated.name, "OriginalName");
1623        assert_eq!(updated.description.as_deref(), Some("new desc"));
1624        assert_eq!(updated.properties, Some(serde_json::json!({"k":"v"})));
1625    }
1626
1627    #[tokio::test]
1628    async fn update_entity_clear_description_with_some_none() {
1629        let rt = rt();
1630        let tok = NamespaceToken::local();
1631        let entity = rt
1632            .create_entity(
1633                &tok,
1634                "concept",
1635                None,
1636                "ClearDesc",
1637                Some("has description"),
1638                None,
1639                vec![],
1640            )
1641            .await
1642            .unwrap();
1643
1644        let updated = rt
1645            .update_entity(
1646                &tok,
1647                entity.id,
1648                EntityPatch {
1649                    description: Some(None),
1650                    ..Default::default()
1651                },
1652            )
1653            .await
1654            .unwrap();
1655
1656        assert!(
1657            updated.description.is_none(),
1658            "description should be cleared"
1659        );
1660    }
1661
1662    #[tokio::test]
1663    async fn update_entity_reindexes_when_name_changes() {
1664        let rt = rt();
1665        let tok = NamespaceToken::local();
1666        let entity = rt
1667            .create_entity(&tok, "concept", None, "OldName", None, None, vec![])
1668            .await
1669            .unwrap();
1670
1671        // Old name is findable.
1672        let hits_before = fts_hit(&rt, &tok, "OldName").await;
1673        assert!(
1674            hits_before.contains(&entity.id),
1675            "entity should be findable by old name"
1676        );
1677
1678        rt.update_entity(
1679            &tok,
1680            entity.id,
1681            EntityPatch {
1682                name: Some("NewName".to_string()),
1683                ..Default::default()
1684            },
1685        )
1686        .await
1687        .unwrap();
1688
1689        let hits_old = fts_hit(&rt, &tok, "OldName").await;
1690        let hits_new = fts_hit(&rt, &tok, "NewName").await;
1691
1692        // After rename, old name no longer matches this entity (FTS index updated).
1693        assert!(
1694            !hits_old.contains(&entity.id),
1695            "old name should no longer match after rename"
1696        );
1697        assert!(
1698            hits_new.contains(&entity.id),
1699            "new name should be findable after rename"
1700        );
1701    }
1702
1703    #[tokio::test]
1704    async fn update_entity_properties_merges_preserving_existing_keys() {
1705        let rt = rt();
1706        let tok = NamespaceToken::local();
1707        let entity = rt
1708            .create_entity(
1709                &tok,
1710                "concept",
1711                None,
1712                "MergeProps",
1713                None,
1714                Some(serde_json::json!({
1715                    "domain": "inference",
1716                    "repo": "lattice",
1717                    "status": "researched",
1718                })),
1719                vec![],
1720            )
1721            .await
1722            .unwrap();
1723
1724        let updated = rt
1725            .update_entity(
1726                &tok,
1727                entity.id,
1728                EntityPatch {
1729                    properties: Some(serde_json::json!({"status": "implemented"})),
1730                    ..Default::default()
1731                },
1732            )
1733            .await
1734            .unwrap();
1735
1736        let props = updated.properties.expect("properties should remain set");
1737        assert_eq!(props["domain"], "inference", "domain key must be preserved");
1738        assert_eq!(props["repo"], "lattice", "repo key must be preserved");
1739        assert_eq!(
1740            props["status"], "implemented",
1741            "status key must be updated by patch"
1742        );
1743    }
1744
1745    #[tokio::test]
1746    async fn update_entity_skips_reindex_when_only_properties_change() {
1747        let rt = rt();
1748        let tok = NamespaceToken::local();
1749        let entity = rt
1750            .create_entity(&tok, "concept", None, "StableIndexed", None, None, vec![])
1751            .await
1752            .unwrap();
1753
1754        // Verify it's in the index before.
1755        let hits_before = fts_hit(&rt, &tok, "StableIndexed").await;
1756        assert!(hits_before.contains(&entity.id));
1757
1758        // Only patch properties — text index should be untouched (still findable).
1759        rt.update_entity(
1760            &tok,
1761            entity.id,
1762            EntityPatch {
1763                properties: Some(serde_json::json!({"new": "prop"})),
1764                ..Default::default()
1765            },
1766        )
1767        .await
1768        .unwrap();
1769
1770        let hits_after = fts_hit(&rt, &tok, "StableIndexed").await;
1771        assert!(
1772            hits_after.contains(&entity.id),
1773            "still findable after props-only patch"
1774        );
1775    }
1776
1777    #[tokio::test]
1778    async fn merge_entity_rewires_edges() {
1779        let rt = rt();
1780        let tok = NamespaceToken::local();
1781        let a = rt
1782            .create_entity(&tok, "concept", None, "A", None, None, vec![])
1783            .await
1784            .unwrap();
1785        let b = rt
1786            .create_entity(&tok, "concept", None, "B", None, None, vec![])
1787            .await
1788            .unwrap();
1789        let c = rt
1790            .create_entity(&tok, "concept", None, "C", None, None, vec![])
1791            .await
1792            .unwrap();
1793        let d = rt
1794            .create_entity(&tok, "concept", None, "D", None, None, vec![])
1795            .await
1796            .unwrap();
1797
1798        // A→B and C→B; merge B into D → should become A→D and C→D.
1799        rt.link(&tok, a.id, b.id, EdgeRelation::Extends, 1.0, None)
1800            .await
1801            .unwrap();
1802        rt.link(&tok, c.id, b.id, EdgeRelation::Extends, 1.0, None)
1803            .await
1804            .unwrap();
1805
1806        let summary = rt
1807            .merge_entity(&tok, d.id, b.id, EntityDedupMergePolicy::PreferInto, false)
1808            .await
1809            .unwrap();
1810
1811        assert_eq!(summary.kept_id, d.id);
1812        assert_eq!(summary.removed_id, b.id);
1813        assert_eq!(summary.edges_rewired, 2);
1814
1815        // Verify edges now point to D.
1816        let a_neighbors = rt
1817            .neighbors(&tok, a.id, Direction::Out, None, None)
1818            .await
1819            .unwrap();
1820        assert_eq!(a_neighbors.len(), 1);
1821        assert_eq!(a_neighbors[0].node_id, d.id);
1822
1823        let c_neighbors = rt
1824            .neighbors(&tok, c.id, Direction::Out, None, None)
1825            .await
1826            .unwrap();
1827        assert_eq!(c_neighbors.len(), 1);
1828        assert_eq!(c_neighbors[0].node_id, d.id);
1829    }
1830
1831    #[tokio::test]
1832    async fn merge_entity_self_merge_rejected() {
1833        let rt = rt();
1834        let tok = NamespaceToken::local();
1835        let a = rt
1836            .create_entity(&tok, "concept", None, "A", None, None, vec![])
1837            .await
1838            .unwrap();
1839        let err = rt
1840            .merge_entity(&tok, a.id, a.id, EntityDedupMergePolicy::PreferInto, false)
1841            .await
1842            .unwrap_err();
1843        assert!(
1844            format!("{err:?}").contains("cannot merge an entity into itself"),
1845            "expected self-merge rejection, got: {err:?}"
1846        );
1847    }
1848
1849    #[tokio::test]
1850    async fn merge_entity_prefer_into_strategy() {
1851        let rt = rt();
1852        let tok = NamespaceToken::local();
1853        let into = rt
1854            .create_entity(
1855                &tok,
1856                "concept",
1857                None,
1858                "Into",
1859                None,
1860                Some(serde_json::json!({"a": 1})),
1861                vec![],
1862            )
1863            .await
1864            .unwrap();
1865        let from = rt
1866            .create_entity(
1867                &tok,
1868                "concept",
1869                None,
1870                "From",
1871                None,
1872                Some(serde_json::json!({"a": 2, "b": 3})),
1873                vec![],
1874            )
1875            .await
1876            .unwrap();
1877
1878        rt.merge_entity(
1879            &tok,
1880            into.id,
1881            from.id,
1882            EntityDedupMergePolicy::PreferInto,
1883            false,
1884        )
1885        .await
1886        .unwrap();
1887
1888        let kept = rt.get_entity(&tok, into.id).await.unwrap();
1889        let props = kept.properties.unwrap();
1890        // a stays as 1 (into wins), b is added from from.
1891        assert_eq!(props["a"], 1);
1892        assert_eq!(props["b"], 3);
1893    }
1894
1895    #[tokio::test]
1896    async fn merge_entity_prefer_from_strategy() {
1897        let rt = rt();
1898        let tok = NamespaceToken::local();
1899        let into = rt
1900            .create_entity(
1901                &tok,
1902                "concept",
1903                None,
1904                "Into",
1905                None,
1906                Some(serde_json::json!({"a": 1})),
1907                vec![],
1908            )
1909            .await
1910            .unwrap();
1911        let from = rt
1912            .create_entity(
1913                &tok,
1914                "concept",
1915                None,
1916                "From",
1917                None,
1918                Some(serde_json::json!({"a": 2, "b": 3})),
1919                vec![],
1920            )
1921            .await
1922            .unwrap();
1923
1924        rt.merge_entity(
1925            &tok,
1926            into.id,
1927            from.id,
1928            EntityDedupMergePolicy::PreferFrom,
1929            false,
1930        )
1931        .await
1932        .unwrap();
1933
1934        let kept = rt.get_entity(&tok, into.id).await.unwrap();
1935        let props = kept.properties.unwrap();
1936        // from wins on a, b also from from.
1937        assert_eq!(props["a"], 2);
1938        assert_eq!(props["b"], 3);
1939    }
1940
1941    #[tokio::test]
1942    async fn merge_entity_union_strategy() {
1943        let rt = rt();
1944        let tok = NamespaceToken::local();
1945        let into = rt
1946            .create_entity(
1947                &tok,
1948                "concept",
1949                None,
1950                "Into",
1951                None,
1952                Some(serde_json::json!({"a": 1})),
1953                vec![],
1954            )
1955            .await
1956            .unwrap();
1957        let from = rt
1958            .create_entity(
1959                &tok,
1960                "concept",
1961                None,
1962                "From",
1963                None,
1964                Some(serde_json::json!({"a": 2, "b": 3})),
1965                vec![],
1966            )
1967            .await
1968            .unwrap();
1969
1970        rt.merge_entity(&tok, into.id, from.id, EntityDedupMergePolicy::Union, false)
1971            .await
1972            .unwrap();
1973
1974        let kept = rt.get_entity(&tok, into.id).await.unwrap();
1975        let props = kept.properties.unwrap();
1976        // Scalar conflict: into wins → a=1. b added from from.
1977        assert_eq!(props["a"], 1);
1978        assert_eq!(props["b"], 3);
1979    }
1980
1981    #[tokio::test]
1982    async fn merge_entity_unions_tags() {
1983        let rt = rt();
1984        let tok = NamespaceToken::local();
1985        let into = rt
1986            .create_entity(
1987                &tok,
1988                "concept",
1989                None,
1990                "Into",
1991                None,
1992                None,
1993                vec!["x".to_string(), "y".to_string()],
1994            )
1995            .await
1996            .unwrap();
1997        let from = rt
1998            .create_entity(
1999                &tok,
2000                "concept",
2001                None,
2002                "From",
2003                None,
2004                None,
2005                vec!["y".to_string(), "z".to_string()],
2006            )
2007            .await
2008            .unwrap();
2009
2010        rt.merge_entity(
2011            &tok,
2012            into.id,
2013            from.id,
2014            EntityDedupMergePolicy::PreferInto,
2015            false,
2016        )
2017        .await
2018        .unwrap();
2019
2020        let kept = rt.get_entity(&tok, into.id).await.unwrap();
2021        let mut tags = kept.tags.clone();
2022        tags.sort();
2023        assert_eq!(tags, vec!["x", "y", "z"]);
2024    }
2025
2026    #[tokio::test]
2027    async fn merge_entity_drops_self_loops() {
2028        let rt = rt();
2029        let tok = NamespaceToken::local();
2030        let a = rt
2031            .create_entity(&tok, "concept", None, "A", None, None, vec![])
2032            .await
2033            .unwrap();
2034        let b = rt
2035            .create_entity(&tok, "concept", None, "B", None, None, vec![])
2036            .await
2037            .unwrap();
2038
2039        // A `extends` B — merging B into A would produce A `extends` A → drop it.
2040        rt.link(&tok, a.id, b.id, EdgeRelation::Extends, 1.0, None)
2041            .await
2042            .unwrap();
2043
2044        let summary = rt
2045            .merge_entity(&tok, a.id, b.id, EntityDedupMergePolicy::PreferInto, false)
2046            .await
2047            .unwrap();
2048
2049        assert_eq!(
2050            summary.edges_rewired, 0,
2051            "self-loop should be dropped, not rewired"
2052        );
2053
2054        let a_out = rt
2055            .neighbors(&tok, a.id, Direction::Out, None, None)
2056            .await
2057            .unwrap();
2058        assert!(a_out.is_empty(), "no self-loop should remain");
2059    }
2060
2061    // ---- merge helper unit tests ----
2062
2063    #[test]
2064    fn union_tags_deduplicates() {
2065        let (tags, added) = union_tags(
2066            &["x".to_string(), "y".to_string()],
2067            &["y".to_string(), "z".to_string()],
2068        );
2069        let mut sorted = tags.clone();
2070        sorted.sort();
2071        assert_eq!(sorted, vec!["x", "y", "z"]);
2072        assert_eq!(added, 1);
2073    }
2074
2075    #[test]
2076    fn merge_properties_prefer_into_fills_missing_keys() {
2077        let a = serde_json::json!({"a": 1});
2078        let b = serde_json::json!({"a": 99, "b": 2});
2079        let (merged, added) =
2080            merge_properties(&Some(a), &Some(b), EntityDedupMergePolicy::PreferInto);
2081        let m = merged.unwrap();
2082        assert_eq!(m["a"], 1);
2083        assert_eq!(m["b"], 2);
2084        assert_eq!(added, 1);
2085    }
2086
2087    // ---- tombstone and note merge tests ----
2088
2089    #[tokio::test]
2090    async fn merge_entity_tombstones_source_with_provenance() {
2091        let rt = rt();
2092        let tok = NamespaceToken::local();
2093        let into = rt
2094            .create_entity(&tok, "concept", None, "Into", None, None, vec![])
2095            .await
2096            .unwrap();
2097        let from = rt
2098            .create_entity(&tok, "concept", None, "From", None, None, vec![])
2099            .await
2100            .unwrap();
2101        let from_id = from.id;
2102
2103        rt.merge_entity(
2104            &tok,
2105            into.id,
2106            from_id,
2107            EntityDedupMergePolicy::PreferInto,
2108            false,
2109        )
2110        .await
2111        .unwrap();
2112
2113        // After merge, get_entity returns an error (soft-deleted rows are excluded).
2114        assert!(
2115            rt.get_entity(&tok, from_id).await.is_err(),
2116            "tombstoned source should not be returned by get_entity"
2117        );
2118
2119        // Verify the source row still exists in SQL with provenance.
2120        let pool = rt.backend().pool_arc();
2121        let (deleted_at, merged_into): (Option<i64>, Option<String>) =
2122            tokio::task::spawn_blocking(move || {
2123                let guard = pool.writer().unwrap();
2124                guard
2125                    .conn()
2126                    .query_row(
2127                        "SELECT deleted_at, merged_into FROM entities WHERE id = ?1",
2128                        [from_id.to_string()],
2129                        |row| Ok((row.get(0)?, row.get(1)?)),
2130                    )
2131                    .unwrap()
2132            })
2133            .await
2134            .unwrap();
2135        assert!(
2136            deleted_at.is_some(),
2137            "tombstoned entity must have deleted_at set"
2138        );
2139        assert_eq!(
2140            merged_into.as_deref(),
2141            Some(into.id.to_string().as_str()),
2142            "merged_into must point to into_id"
2143        );
2144    }
2145
2146    #[tokio::test]
2147    async fn merge_note_same_kind_appends_content() {
2148        let rt = rt();
2149        let tok = NamespaceToken::local();
2150        let into = rt
2151            .create_note(
2152                &tok,
2153                "observation",
2154                None,
2155                "Into content",
2156                None,
2157                None,
2158                vec![],
2159            )
2160            .await
2161            .unwrap();
2162        let from = rt
2163            .create_note(
2164                &tok,
2165                "observation",
2166                None,
2167                "From content",
2168                None,
2169                None,
2170                vec![],
2171            )
2172            .await
2173            .unwrap();
2174        let from_id = from.id;
2175
2176        let summary = rt
2177            .merge_note(
2178                &tok,
2179                into.id,
2180                from_id,
2181                EntityDedupMergePolicy::PreferInto,
2182                ContentMergeStrategy::Append,
2183                false,
2184            )
2185            .await
2186            .unwrap();
2187
2188        assert_eq!(summary.kept_id, into.id);
2189        assert_eq!(summary.removed_id, from_id);
2190        assert!(summary.content_appended);
2191        assert!(!summary.dry_run);
2192
2193        // Source is no longer findable.
2194        let from_store = rt.notes(&tok).unwrap();
2195        assert!(
2196            from_store.get_note(from_id).await.unwrap().is_none(),
2197            "merged-from note should be soft-deleted"
2198        );
2199    }
2200
2201    #[tokio::test]
2202    async fn merge_note_different_kinds_rejected() {
2203        let rt = rt();
2204        let tok = NamespaceToken::local();
2205        let into = rt
2206            .create_note(&tok, "observation", None, "Into", None, None, vec![])
2207            .await
2208            .unwrap();
2209        let from = rt
2210            .create_note(&tok, "decision", None, "From", None, None, vec![])
2211            .await
2212            .unwrap();
2213
2214        let result = rt
2215            .merge_note(
2216                &tok,
2217                into.id,
2218                from.id,
2219                EntityDedupMergePolicy::PreferInto,
2220                ContentMergeStrategy::Append,
2221                false,
2222            )
2223            .await;
2224        assert!(result.is_err(), "merging different note kinds must fail");
2225    }
2226
2227    #[tokio::test]
2228    async fn merge_note_dry_run_leaves_notes_unchanged() {
2229        let rt = rt();
2230        let tok = NamespaceToken::local();
2231        let into = rt
2232            .create_note(
2233                &tok,
2234                "observation",
2235                None,
2236                "Into content",
2237                None,
2238                None,
2239                vec![],
2240            )
2241            .await
2242            .unwrap();
2243        let from = rt
2244            .create_note(
2245                &tok,
2246                "observation",
2247                None,
2248                "From content",
2249                None,
2250                None,
2251                vec![],
2252            )
2253            .await
2254            .unwrap();
2255        let into_id = into.id;
2256        let from_id = from.id;
2257
2258        let summary = rt
2259            .merge_note(
2260                &tok,
2261                into_id,
2262                from_id,
2263                EntityDedupMergePolicy::PreferInto,
2264                ContentMergeStrategy::Append,
2265                true,
2266            )
2267            .await
2268            .unwrap();
2269
2270        assert!(summary.dry_run);
2271
2272        // Both notes still exist unchanged.
2273        let store = rt.notes(&tok).unwrap();
2274        let into_after = store.get_note(into_id).await.unwrap().unwrap();
2275        let from_after = store.get_note(from_id).await.unwrap().unwrap();
2276        assert_eq!(
2277            into_after.content, "Into content",
2278            "dry_run must not mutate into-note"
2279        );
2280        assert_eq!(
2281            from_after.content, "From content",
2282            "dry_run must not mutate from-note"
2283        );
2284    }
2285
2286    #[tokio::test]
2287    async fn update_edge_updates_properties() {
2288        use khive_storage::EdgeRelation;
2289        let rt = rt();
2290        let tok = NamespaceToken::local();
2291        let a = rt
2292            .create_entity(&tok, "concept", None, "A", None, None, vec![])
2293            .await
2294            .unwrap();
2295        let b = rt
2296            .create_entity(&tok, "concept", None, "B", None, None, vec![])
2297            .await
2298            .unwrap();
2299        let edge = rt
2300            .link(&tok, a.id, b.id, EdgeRelation::Extends, 0.5, None)
2301            .await
2302            .unwrap();
2303        let edge_id: Uuid = edge.id.into();
2304
2305        let updated = rt
2306            .update_edge(
2307                &tok,
2308                edge_id,
2309                EdgePatch {
2310                    properties: Some(serde_json::json!({"source": "manual"})),
2311                    ..Default::default()
2312                },
2313            )
2314            .await
2315            .unwrap();
2316
2317        assert_eq!(updated.metadata.as_ref().unwrap()["source"], "manual");
2318        assert!((updated.weight - 0.5).abs() < 0.001, "weight unchanged");
2319    }
2320
2321    // scenario-kg-maintenance C1 regression: merge must not crash when both
2322    // entities share a common third-party edge (duplicate triple after rewire).
2323    // Before the fix, the double-ON-CONFLICT INSERT raised a UNIQUE constraint
2324    // error at the SQLite layer and the merge aborted mid-transaction.
2325    #[tokio::test]
2326    async fn merge_entity_survives_shared_edge_to_third_party() {
2327        use khive_storage::EdgeRelation;
2328        let rt = rt();
2329        let tok = NamespaceToken::local();
2330
2331        // Create three entities: A and B will be merged; shared is the common target.
2332        // Use `extends` (concept→concept) which is in the ADR-002 base allowlist.
2333        let a = rt
2334            .create_entity(&tok, "concept", None, "A", None, None, vec![])
2335            .await
2336            .unwrap();
2337        let b = rt
2338            .create_entity(&tok, "concept", None, "B", None, None, vec![])
2339            .await
2340            .unwrap();
2341        let shared = rt
2342            .create_entity(&tok, "concept", None, "Shared", None, None, vec![])
2343            .await
2344            .unwrap();
2345
2346        // Both A and B extend the same shared concept — this creates a duplicate
2347        // triple (A/B → shared, extends) that triggers the crash on rewire.
2348        rt.link(&tok, a.id, shared.id, EdgeRelation::Extends, 1.0, None)
2349            .await
2350            .unwrap();
2351        rt.link(&tok, b.id, shared.id, EdgeRelation::Extends, 1.0, None)
2352            .await
2353            .unwrap();
2354
2355        // Before the fix this would return Err with "UNIQUE constraint failed".
2356        let summary = rt
2357            .merge_entity(
2358                &tok,
2359                a.id,
2360                b.id,
2361                crate::EntityDedupMergePolicy::PreferInto,
2362                false,
2363            )
2364            .await
2365            .expect(
2366                "C1: merge must succeed even when both entities share an edge to a third party",
2367            );
2368
2369        assert_eq!(summary.kept_id, a.id);
2370        assert_eq!(summary.removed_id, b.id);
2371        // A already had the Extends edge to shared; when B→shared is rewired to
2372        // A→shared, the ON CONFLICT DO UPDATE refreshes the existing row (clears
2373        // deleted_at, updates weight). rusqlite reports this as 1 change, so
2374        // edges_rewired will be >= 0. The important invariant is that the merge
2375        // did NOT crash and exactly one live edge A→shared remains.
2376
2377        // One live edge A→shared must exist after merge.
2378        let a_edges = rt
2379            .list_edges(
2380                &tok,
2381                crate::EdgeListFilter {
2382                    source_id: Some(a.id),
2383                    target_id: Some(shared.id),
2384                    relations: vec![EdgeRelation::Extends],
2385                    ..Default::default()
2386                },
2387                10,
2388            )
2389            .await
2390            .unwrap();
2391        assert_eq!(
2392            a_edges.len(),
2393            1,
2394            "C1: exactly one live A→shared Extends edge must exist after merge; got: {a_edges:?}"
2395        );
2396
2397        // Tombstone check: B must be soft-deleted after successful merge (C3).
2398        // get_entity filters deleted_at IS NULL, so a tombstoned entity returns None.
2399        let b_after = rt.entities(&tok).unwrap().get_entity(b.id).await.unwrap();
2400        assert!(
2401            b_after.is_none(),
2402            "C3: from_entity must be tombstoned (get_entity returns None for deleted) after merge; got: {b_after:?}"
2403        );
2404    }
2405
2406    // H2 regression: merge_entity at the runtime level must reject cross-kind merges.
2407    // Before the H2 fix, only the pack handler had this guard; a direct runtime caller
2408    // could still merge concept+project, silently tombstoning the source entity.
2409    #[tokio::test]
2410    async fn merge_entity_cross_kind_rejected_at_runtime() {
2411        let rt = rt();
2412        let tok = NamespaceToken::local();
2413
2414        let concept = rt
2415            .create_entity(&tok, "concept", None, "H2Concept", None, None, vec![])
2416            .await
2417            .unwrap();
2418        let project = rt
2419            .create_entity(&tok, "project", None, "H2Project", None, None, vec![])
2420            .await
2421            .unwrap();
2422
2423        // Cross-kind merge must return InvalidInput at the runtime level.
2424        let err = rt
2425            .merge_entity(
2426                &tok,
2427                concept.id,
2428                project.id,
2429                crate::EntityDedupMergePolicy::PreferInto,
2430                false,
2431            )
2432            .await
2433            .expect_err("H2: cross-kind merge must be rejected by runtime");
2434        assert!(
2435            matches!(err, crate::RuntimeError::InvalidInput(_)),
2436            "H2: expected InvalidInput, got: {err:?}"
2437        );
2438
2439        // Both entities must survive the failed merge attempt with no tombstone.
2440        let concept_after = rt.get_entity(&tok, concept.id).await;
2441        let project_after = rt.get_entity(&tok, project.id).await;
2442        assert!(
2443            concept_after.is_ok(),
2444            "H2: concept must remain live after rejected merge; got: {concept_after:?}"
2445        );
2446        assert!(
2447            project_after.is_ok(),
2448            "H2: project must remain live after rejected merge; got: {project_after:?}"
2449        );
2450    }
2451
2452    // scenario-kg-maintenance C2 regression: same-kind merge must succeed.
2453    #[tokio::test]
2454    async fn merge_entity_same_kind_succeeds() {
2455        let rt = rt();
2456        let tok = NamespaceToken::local();
2457
2458        let c1 = rt
2459            .create_entity(&tok, "concept", None, "Concept1", None, None, vec![])
2460            .await
2461            .unwrap();
2462        let c2 = rt
2463            .create_entity(&tok, "concept", None, "Concept2", None, None, vec![])
2464            .await
2465            .unwrap();
2466
2467        let summary = rt
2468            .merge_entity(
2469                &tok,
2470                c1.id,
2471                c2.id,
2472                crate::EntityDedupMergePolicy::PreferInto,
2473                false,
2474            )
2475            .await
2476            .expect("same-kind merge must succeed");
2477        assert_eq!(summary.kept_id, c1.id);
2478        assert_eq!(summary.removed_id, c2.id);
2479
2480        // c2 must be tombstoned.
2481        let c2_after = rt.entities(&tok).unwrap().get_entity(c2.id).await.unwrap();
2482        assert!(c2_after.is_none(), "from_entity must be tombstoned");
2483    }
2484
2485    // ── #567 regression: cross-namespace merge_note must be denied on either ID ──
2486
2487    #[tokio::test]
2488    async fn merge_note_cross_namespace_either_id_returns_not_found() {
2489        use crate::error::RuntimeError;
2490        use crate::Namespace;
2491
2492        let rt = rt();
2493        let ns_a = NamespaceToken::for_namespace(Namespace::parse("ns-a").unwrap());
2494        let ns_b = NamespaceToken::for_namespace(Namespace::parse("ns-b").unwrap());
2495
2496        let into_a = rt
2497            .create_note(&ns_a, "observation", None, "Into A", None, None, vec![])
2498            .await
2499            .unwrap();
2500        let from_a = rt
2501            .create_note(&ns_a, "observation", None, "From A", None, None, vec![])
2502            .await
2503            .unwrap();
2504        let note_b = rt
2505            .create_note(&ns_b, "observation", None, "Note B", None, None, vec![])
2506            .await
2507            .unwrap();
2508
2509        // foreign into_id: note_b belongs to ns_b, caller token is ns_a
2510        let foreign_into = rt
2511            .merge_note(
2512                &ns_a,
2513                note_b.id,
2514                from_a.id,
2515                EntityDedupMergePolicy::PreferInto,
2516                ContentMergeStrategy::Append,
2517                false,
2518            )
2519            .await;
2520        assert!(
2521            matches!(foreign_into, Err(RuntimeError::NotFound(_))),
2522            "foreign into_id must be denied before merge, got {foreign_into:?}"
2523        );
2524
2525        // foreign from_id: note_b belongs to ns_b, caller token is ns_a
2526        let foreign_from = rt
2527            .merge_note(
2528                &ns_a,
2529                into_a.id,
2530                note_b.id,
2531                EntityDedupMergePolicy::PreferInto,
2532                ContentMergeStrategy::Append,
2533                false,
2534            )
2535            .await;
2536        assert!(
2537            matches!(foreign_from, Err(RuntimeError::NotFound(_))),
2538            "foreign from_id must be denied before merge, got {foreign_from:?}"
2539        );
2540    }
2541}