Skip to main content

khive_runtime/
curation.rs

1// Licensed under the Apache License, Version 2.0.
2
3// FILE SIZE JUSTIFICATION: curation.rs holds entity/note/edge patch types alongside
4// their update and merge implementations. The implementations share private helpers
5// (merge_properties, namespace checks, dedup policy) that need pub(crate) access to
6// runtime internals. Inline tests cover merge semantics that require direct access to
7// those helpers. Split plan: extract patch types into `curation/patch.rs` and merge
8// logic into `curation/merge.rs` once the dedup policy API stabilises.
9//! Curation operations: entity update/merge and edge-list filter type.
10
11use std::collections::HashSet;
12
13use serde::{Deserialize, Serialize};
14use serde_json::Value;
15use uuid::Uuid;
16
17use khive_db::SqliteError;
18use khive_storage::types::{EdgeFilter, TextDocument};
19use khive_storage::{EdgeRelation, Entity, SubstrateKind};
20use khive_types::EventKind;
21use rusqlite::OptionalExtension;
22
23use crate::error::{RuntimeError, RuntimeResult};
24use crate::operations::canonical_edge_endpoints;
25use crate::runtime::{KhiveRuntime, NamespaceToken};
26
27// ---------------------------------------------------------------------------
28// Public types
29// ---------------------------------------------------------------------------
30
31/// Patch for `update_entity`. Only `Some(_)` fields are applied; `None` means "leave unchanged".
32///
33/// For `description`:
34/// - `None` (outer) — leave the current description as-is
35/// - `Some(None)` — clear the description (set to NULL)
36/// - `Some(Some(s))` — set the description to `s`
37///
38/// For `properties` (deep-merge semantics):
39/// - `None` — leave properties as-is
40/// - `Some(value)` — deep-merge `value` into existing properties. Keys present in
41///   the patch overwrite existing keys; keys absent from the patch are preserved.
42///   Removing a key requires explicit replacement of the parent object (or a future
43///   `unset`/`null-marker` extension).
44///
45/// For `tags` — replace semantics: `Some(vec)` sets tags to exactly `vec`. To add
46/// a tag without losing existing tags, read the entity first, push the new tag,
47/// and pass the full list back.
48#[derive(Clone, Debug, Default)]
49pub struct EntityPatch {
50    pub name: Option<String>,
51    pub description: Option<Option<String>>,
52    pub properties: Option<Value>,
53    pub tags: Option<Vec<String>>,
54}
55
56/// Policy used when deduplicating two entities.
57#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
58#[serde(rename_all = "snake_case")]
59pub enum EntityDedupMergePolicy {
60    /// `into` values win on conflict. Tags are unioned. Properties from `from` fill in
61    /// keys that `into` doesn't have. This is the default.
62    #[default]
63    PreferInto,
64    /// `from` values win on conflict.
65    PreferFrom,
66    /// Deep-merge: object properties merge recursively. Scalar conflicts go to `into`.
67    Union,
68}
69
70/// Strategy for merging note content when two notes are combined.
71#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
72#[serde(rename_all = "snake_case")]
73pub enum ContentMergeStrategy {
74    #[default]
75    Append,
76    PreferInto,
77    PreferFrom,
78}
79
80/// Result returned by `merge_entity` / `merge_note`.
81#[derive(Clone, Debug, Serialize, Deserialize)]
82pub struct MergeSummary {
83    pub kept_id: Uuid,
84    pub removed_id: Uuid,
85    pub edges_rewired: usize,
86    pub properties_merged: usize,
87    pub tags_unioned: usize,
88    pub content_appended: bool,
89    pub dry_run: bool,
90}
91
92/// Patch for `update_edge`. Only `Some(_)` fields are applied; `None` means "leave unchanged".
93///
94/// For `properties` — replacement semantics (not deep merge): `Some(value)` replaces
95/// the entire metadata object. `None` leaves metadata unchanged.
96#[derive(Clone, Debug, Default)]
97pub struct EdgePatch {
98    pub relation: Option<EdgeRelation>,
99    pub weight: Option<f64>,
100    pub properties: Option<Value>,
101}
102
103/// Patch for `update_note`. Only `Some(_)` fields are applied; `None` means "leave unchanged".
104///
105/// For `salience`/`decay_factor`:
106/// - `None` (outer) — leave unchanged
107/// - `Some(None)` — clear the value
108/// - `Some(Some(v))` — set to v
109#[derive(Clone, Debug, Default)]
110pub struct NotePatch {
111    pub name: Option<Option<String>>,
112    pub content: Option<String>,
113    pub salience: Option<Option<f64>>,
114    pub decay_factor: Option<Option<f64>>,
115    pub properties: Option<Value>,
116    pub(crate) kind_status: Option<String>,
117}
118
119impl NotePatch {
120    /// Construct a `NotePatch` from the public fields only.
121    /// Use this from external crates; `kind_status` is set to `None`.
122    pub fn new(
123        name: Option<Option<String>>,
124        content: Option<String>,
125        salience: Option<Option<f64>>,
126        decay_factor: Option<Option<f64>>,
127        properties: Option<Value>,
128    ) -> Self {
129        Self {
130            name,
131            content,
132            salience,
133            decay_factor,
134            properties,
135            kind_status: None,
136        }
137    }
138}
139
140/// Filter for `list_edges` / `count_edges`.
141#[derive(Clone, Debug, Default)]
142pub struct EdgeListFilter {
143    pub source_id: Option<Uuid>,
144    pub target_id: Option<Uuid>,
145    /// Empty = any relation.
146    pub relations: Vec<EdgeRelation>,
147    pub min_weight: Option<f64>,
148    pub max_weight: Option<f64>,
149}
150
151impl From<EdgeListFilter> for EdgeFilter {
152    fn from(f: EdgeListFilter) -> Self {
153        EdgeFilter {
154            source_ids: f.source_id.into_iter().collect(),
155            target_ids: f.target_id.into_iter().collect(),
156            relations: f.relations,
157            min_weight: f.min_weight,
158            max_weight: f.max_weight,
159            ..Default::default()
160        }
161    }
162}
163
164// ---------------------------------------------------------------------------
165// Implementation
166// ---------------------------------------------------------------------------
167
168impl KhiveRuntime {
169    /// Patch-style entity update.
170    ///
171    /// Only fields set to `Some(_)` are changed. Re-indexes FTS5 (and vectors if configured)
172    /// when `name` or `description` changes; skips re-indexing for property/tag-only patches.
173    ///
174    /// Returns `RuntimeError::NotFound` if the entity does not exist or belongs to a different
175    /// namespace. Namespace isolation is enforced at the runtime layer.
176    pub async fn update_entity(
177        &self,
178        token: &NamespaceToken,
179        id: Uuid,
180        patch: EntityPatch,
181    ) -> RuntimeResult<Entity> {
182        let store = self.entities(token)?;
183        let mut entity = store
184            .get_entity(id)
185            .await?
186            .ok_or_else(|| RuntimeError::NotFound(format!("entity {id}")))?;
187
188        Self::ensure_namespace(&entity.namespace, token.namespace().as_str())?;
189
190        let mut text_changed = false;
191        let mut changed_fields: Vec<&'static str> = Vec::new();
192
193        if let Some(name) = patch.name {
194            text_changed |= entity.name != name;
195            entity.name = name;
196            changed_fields.push("name");
197        }
198        if let Some(desc_patch) = patch.description {
199            text_changed |= entity.description != desc_patch;
200            entity.description = desc_patch;
201            changed_fields.push("description");
202        }
203        if let Some(props) = patch.properties {
204            let (merged, _) = merge_properties(
205                &entity.properties,
206                &Some(props),
207                EntityDedupMergePolicy::PreferFrom,
208            );
209            entity.properties = merged;
210            changed_fields.push("properties");
211        }
212        if let Some(tags) = patch.tags {
213            entity.tags = tags;
214            changed_fields.push("tags");
215        }
216
217        entity.updated_at = chrono::Utc::now().timestamp_micros();
218        store.upsert_entity(entity.clone()).await?;
219
220        if text_changed {
221            self.reindex_entity(token, &entity).await?;
222        }
223
224        let event_store = self.events(token)?;
225        let event = khive_storage::event::Event::new(
226            entity.namespace.clone(),
227            "update",
228            EventKind::EntityUpdated,
229            SubstrateKind::Entity,
230            "",
231        )
232        .with_target(entity.id)
233        .with_payload(serde_json::json!({
234            "id": entity.id,
235            "namespace": entity.namespace,
236            "changed_fields": changed_fields,
237        }));
238        event_store.append_event(event).await.map_err(|e| {
239            RuntimeError::Internal(format!("update_entity: event store write failed: {e}"))
240        })?;
241
242        Ok(entity)
243    }
244
245    /// Merge `from_id` into `into_id`.
246    ///
247    /// All edges incident to `from_id` are rewired to `into_id`. Self-loops that would
248    /// result from the rewire are dropped. Properties and tags are merged per `strategy`.
249    /// `from_id` is tombstoned with merge provenance and removed from indexes. Returns a summary.
250    ///
251    /// If `dry_run` is true, computes and returns the planned summary without mutating any rows.
252    ///
253    /// Atomic: all SQL (entity reads/writes, edge rewires, FTS updates, vec-index delete)
254    /// runs on a single pool connection inside one `BEGIN IMMEDIATE` transaction via
255    /// `merge_entity_sql`. If embedding vectors are configured, the vector re-insert for
256    /// `into_id` is performed after the transaction (requires async embedding computation).
257    pub async fn merge_entity(
258        &self,
259        token: &NamespaceToken,
260        into_id: Uuid,
261        from_id: Uuid,
262        strategy: EntityDedupMergePolicy,
263        dry_run: bool,
264    ) -> RuntimeResult<MergeSummary> {
265        if into_id == from_id {
266            return Err(RuntimeError::InvalidInput(
267                "cannot merge an entity into itself".into(),
268            ));
269        }
270        // H2 fix: enforce same-kind constraint at the runtime layer.
271        // The handler also checks this, but any direct runtime caller (CLI, tests,
272        // future SDK) would bypass the handler guard without this check here.
273        {
274            let into_entity = self.get_entity(token, into_id).await?;
275            let from_entity = self.get_entity(token, from_id).await?;
276            if into_entity.kind != from_entity.kind {
277                return Err(RuntimeError::InvalidInput(format!(
278                    "cannot merge entities of different kinds: into={} ({}), from={} ({}); \
279                     merge requires both entities to share the same kind",
280                    into_id, into_entity.kind, from_id, from_entity.kind
281                )));
282            }
283        }
284        let ns = token.namespace().as_str().to_owned();
285        let sanitized_ns: String = ns
286            .chars()
287            .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
288            .collect();
289        let fts_table = format!("fts_entities_{}", sanitized_ns);
290        let vec_table = self.config().embedding_model.map(|model| {
291            let key: String = model
292                .to_string()
293                .chars()
294                .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
295                .collect();
296            format!("vec_{}", key)
297        });
298
299        // Ensure all required tables exist before entering the transaction.
300        // Each accessor applies its DDL idempotently via `CREATE TABLE IF NOT EXISTS`.
301        let _ = self.entities(token)?;
302        let _ = self.graph(token)?;
303        let _ = self.text(token)?;
304        if self.config().embedding_model.is_some() {
305            let _ = self.vectors(token)?;
306        }
307
308        let pool = self.backend().pool_arc();
309
310        let (summary, updated_entity) = tokio::task::spawn_blocking(move || {
311            let guard = pool.writer()?;
312            guard.transaction(|conn| {
313                merge_entity_sql(
314                    conn, ns, fts_table, vec_table, into_id, from_id, strategy, dry_run,
315                )
316            })
317        })
318        .await
319        .map_err(|e| RuntimeError::Internal(e.to_string()))??;
320
321        // If vectors are configured, reindex into_entity (requires async embedding).
322        // FTS and vec-delete were already committed inside the transaction above.
323        if !dry_run && self.config().embedding_model.is_some() {
324            self.reindex_entity(token, &updated_entity).await?;
325        }
326
327        let event_store = self.events(token)?;
328        // Mirror the wire-level strategy spelling from MergeParams so consumers
329        // can round-trip the policy string back into a request.
330        let policy_str = match strategy {
331            EntityDedupMergePolicy::PreferInto => "prefer_into",
332            EntityDedupMergePolicy::PreferFrom => "prefer_from",
333            EntityDedupMergePolicy::Union => "union",
334        };
335        let event = khive_storage::event::Event::new(
336            updated_entity.namespace.clone(),
337            "merge",
338            EventKind::EntityMerged,
339            SubstrateKind::Entity,
340            "",
341        )
342        .with_target(summary.kept_id)
343        .with_payload(serde_json::json!({
344            "into_id": summary.kept_id,
345            "from_id": summary.removed_id,
346            "policy": policy_str,
347            "edges_rewired": summary.edges_rewired,
348        }));
349        event_store.append_event(event).await.map_err(|e| {
350            RuntimeError::Internal(format!("merge_entity: event store write failed: {e}"))
351        })?;
352
353        Ok(summary)
354    }
355
356    // ---- Internal helpers ----
357
358    /// Re-upsert FTS5 document (and vector if model configured) for the entity.
359    ///
360    /// Uses `entity.namespace` — the authoritative namespace stored on the record — rather
361    /// than the caller-supplied `namespace` parameter. This prevents a cross-namespace
362    /// reindex from writing the search document into the wrong namespace's FTS index.
363    pub(crate) async fn reindex_entity(
364        &self,
365        token: &NamespaceToken,
366        entity: &Entity,
367    ) -> RuntimeResult<()> {
368        let body = match &entity.description {
369            Some(d) if !d.is_empty() => format!("{} {}", entity.name, d),
370            _ => entity.name.clone(),
371        };
372        // Use entity.namespace (authoritative) rather than token.namespace().as_str() (caller claim).
373        let ns = entity.namespace.clone();
374        self.text(token)?
375            .upsert_document(TextDocument {
376                subject_id: entity.id,
377                kind: SubstrateKind::Entity,
378                title: Some(entity.name.clone()),
379                body: body.clone(),
380                tags: entity.tags.clone(),
381                namespace: ns.clone(),
382                metadata: entity.properties.clone(),
383                updated_at: chrono::Utc::now(),
384            })
385            .await?;
386
387        if self.config().embedding_model.is_some() {
388            let vector = self.embed(&body).await?;
389            self.vectors(token)?
390                .insert(
391                    entity.id,
392                    SubstrateKind::Entity,
393                    &ns,
394                    "entity.body",
395                    vec![vector],
396                )
397                .await?;
398        }
399
400        Ok(())
401    }
402
403    /// Remove an entity from FTS5 and (if configured) vector indexes.
404    pub(crate) async fn remove_from_indexes(
405        &self,
406        token: &NamespaceToken,
407        id: Uuid,
408    ) -> RuntimeResult<()> {
409        let ns = token.namespace().as_str().to_owned();
410        self.text(token)?.delete_document(&ns, id).await?;
411        if self.config().embedding_model.is_some() {
412            self.vectors(token)?.delete(id).await?;
413        }
414        Ok(())
415    }
416
417    /// Re-upsert FTS5 document (and vector if model configured) for the note.
418    pub(crate) async fn reindex_note(
419        &self,
420        token: &NamespaceToken,
421        note: &khive_storage::note::Note,
422    ) -> RuntimeResult<()> {
423        let ns = note.namespace.clone();
424        self.text_for_notes(token)?
425            .upsert_document(TextDocument {
426                subject_id: note.id,
427                kind: SubstrateKind::Note,
428                title: note.name.clone(),
429                body: note.content.clone(),
430                tags: Vec::new(),
431                namespace: ns.clone(),
432                metadata: note.properties.clone(),
433                updated_at: chrono::Utc::now(),
434            })
435            .await?;
436
437        if self.config().embedding_model.is_some() {
438            let vector = self.embed(&note.content).await?;
439            self.vectors(token)?
440                .insert(
441                    note.id,
442                    SubstrateKind::Note,
443                    &ns,
444                    "note.content",
445                    vec![vector],
446                )
447                .await?;
448        }
449        Ok(())
450    }
451
452    /// Patch-style note update.
453    pub async fn update_note(
454        &self,
455        token: &NamespaceToken,
456        id: Uuid,
457        patch: NotePatch,
458    ) -> RuntimeResult<khive_storage::note::Note> {
459        let store = self.notes(token)?;
460        let mut note = store
461            .get_note(id)
462            .await?
463            .ok_or_else(|| RuntimeError::NotFound(format!("note {id}")))?;
464
465        Self::ensure_namespace(&note.namespace, token.namespace().as_str())?;
466
467        let mut text_changed = false;
468
469        if let Some(name_patch) = patch.name {
470            text_changed |= note.name != name_patch;
471            note.name = name_patch;
472        }
473        if let Some(content) = patch.content {
474            text_changed |= note.content != content;
475            note.content = content;
476        }
477        if let Some(salience_patch) = patch.salience {
478            // Reject non-finite or out-of-range salience at the runtime boundary
479            // rather than silently clamping invalid caller input (coding-standards §608-622).
480            if let Some(s) = salience_patch {
481                if !s.is_finite() || !(0.0..=1.0).contains(&s) {
482                    return Err(crate::RuntimeError::InvalidInput(format!(
483                        "salience must be a finite value in [0.0, 1.0]; got {s}"
484                    )));
485                }
486            }
487            note.salience = salience_patch;
488        }
489        if let Some(decay_patch) = patch.decay_factor {
490            // Reject non-finite or negative decay_factor at the runtime boundary.
491            if let Some(d) = decay_patch {
492                if !d.is_finite() || d < 0.0 {
493                    return Err(crate::RuntimeError::InvalidInput(format!(
494                        "decay_factor must be a finite value >= 0.0; got {d}"
495                    )));
496                }
497            }
498            note.decay_factor = decay_patch;
499        }
500        if let Some(props) = patch.properties {
501            let (merged, _) = merge_properties(
502                &note.properties,
503                &Some(props),
504                EntityDedupMergePolicy::PreferFrom,
505            );
506            note.properties = merged;
507        }
508        if let Some(status) = patch.kind_status {
509            note.status = status;
510        }
511
512        note.updated_at = chrono::Utc::now().timestamp_micros();
513        store.upsert_note(note.clone()).await?;
514
515        if text_changed {
516            self.reindex_note(token, &note).await?;
517        }
518
519        Ok(note)
520    }
521
522    /// Merge `from_id` note into `into_id` note.
523    ///
524    /// Both notes must exist in the namespace and have the same `kind`. Content is merged
525    /// per `content_strategy`. Properties are merged per `strategy`. `from_id` is
526    /// tombstoned (status='deleted', deleted_at set). Returns a summary.
527    ///
528    /// If `dry_run` is true, computes and returns the planned summary without mutating
529    /// any rows, edges, or indexes.
530    pub async fn merge_note(
531        &self,
532        token: &NamespaceToken,
533        into_id: Uuid,
534        from_id: Uuid,
535        strategy: EntityDedupMergePolicy,
536        content_strategy: ContentMergeStrategy,
537        dry_run: bool,
538    ) -> RuntimeResult<MergeSummary> {
539        if into_id == from_id {
540            return Err(RuntimeError::InvalidInput(
541                "cannot merge a note into itself".into(),
542            ));
543        }
544        let ns = token.namespace().as_str().to_string();
545        let sanitized_ns: String = ns
546            .chars()
547            .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
548            .collect();
549        let fts_table = format!("fts_notes_{}", sanitized_ns);
550        let vec_table = self.config().embedding_model.map(|model| {
551            let key: String = model
552                .to_string()
553                .chars()
554                .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
555                .collect();
556            format!("vec_{}", key)
557        });
558
559        let note_store = self.notes(token)?;
560        let into_note = note_store
561            .get_note(into_id)
562            .await?
563            .ok_or_else(|| RuntimeError::NotFound("not found in this namespace".into()))?;
564        Self::ensure_namespace(&into_note.namespace, &ns)?;
565
566        let from_note = note_store
567            .get_note(from_id)
568            .await?
569            .ok_or_else(|| RuntimeError::NotFound("not found in this namespace".into()))?;
570        Self::ensure_namespace(&from_note.namespace, &ns)?;
571
572        let _ = self.graph(token)?;
573        let _ = self.text_for_notes(token)?;
574        if self.config().embedding_model.is_some() {
575            let _ = self.vectors(token)?;
576        }
577
578        let pool = self.backend().pool_arc();
579        let (summary, updated_note) = tokio::task::spawn_blocking(move || {
580            let guard = pool.writer()?;
581            guard.transaction(|conn| {
582                merge_note_sql(
583                    conn,
584                    ns,
585                    fts_table,
586                    vec_table,
587                    into_id,
588                    from_id,
589                    strategy,
590                    content_strategy,
591                    dry_run,
592                )
593            })
594        })
595        .await
596        .map_err(|e| RuntimeError::Internal(e.to_string()))??;
597
598        if !dry_run && self.config().embedding_model.is_some() {
599            self.reindex_note(token, &updated_note).await?;
600        }
601        Ok(summary)
602    }
603}
604
605// ---------------------------------------------------------------------------
606// Transactional merge SQL helpers
607// ---------------------------------------------------------------------------
608
609/// Read one entity row by ID within a namespace, returning `SqliteError` on missing/wrong-ns.
610fn read_merge_entity(
611    conn: &rusqlite::Connection,
612    id: Uuid,
613    namespace: &str,
614) -> Result<Entity, SqliteError> {
615    let id_str = id.to_string();
616    let mut stmt = conn.prepare(
617        "SELECT id, namespace, kind, entity_type, name, description, properties, tags, \
618         created_at, updated_at, deleted_at, merged_into, merge_event_id \
619         FROM entities WHERE id = ?1 AND deleted_at IS NULL",
620    )?;
621    let mut rows = stmt.query(rusqlite::params![id_str])?;
622    let row = rows
623        .next()?
624        .ok_or_else(|| SqliteError::InvalidData(format!("entity {id} not found")))?;
625
626    let id_s: String = row.get(0)?;
627    let ns: String = row.get(1)?;
628    let kind: String = row.get(2)?;
629    let entity_type: Option<String> = row.get(3)?;
630    let name: String = row.get(4)?;
631    let description: Option<String> = row.get(5)?;
632    let properties_str: Option<String> = row.get(6)?;
633    let tags_str: String = row.get(7)?;
634    let created_at: i64 = row.get(8)?;
635    let updated_at: i64 = row.get(9)?;
636    let deleted_at: Option<i64> = row.get(10)?;
637    let merged_into_str: Option<String> = row.get(11)?;
638    let merge_event_id_str: Option<String> = row.get(12)?;
639
640    if ns != namespace {
641        return Err(SqliteError::InvalidData(format!(
642            "entity {id} belongs to namespace '{ns}', not '{namespace}'"
643        )));
644    }
645
646    let entity_id = Uuid::parse_str(&id_s).map_err(|e| SqliteError::InvalidData(e.to_string()))?;
647    let properties: Option<Value> = properties_str
648        .map(|s| {
649            serde_json::from_str::<Value>(&s).map_err(|e| SqliteError::InvalidData(e.to_string()))
650        })
651        .transpose()?;
652    let tags: Vec<String> =
653        serde_json::from_str(&tags_str).map_err(|e| SqliteError::InvalidData(e.to_string()))?;
654    let merged_into = merged_into_str
655        .as_deref()
656        .map(Uuid::parse_str)
657        .transpose()
658        .map_err(|e| SqliteError::InvalidData(e.to_string()))?;
659    let merge_event_id = merge_event_id_str
660        .as_deref()
661        .map(Uuid::parse_str)
662        .transpose()
663        .map_err(|e| SqliteError::InvalidData(e.to_string()))?;
664
665    Ok(Entity {
666        id: entity_id,
667        namespace: ns,
668        kind,
669        entity_type,
670        name,
671        description,
672        properties,
673        tags,
674        created_at,
675        updated_at,
676        deleted_at,
677        merged_into,
678        merge_event_id,
679    })
680}
681
682/// All merge SQL on one connection inside an already-open `BEGIN IMMEDIATE` transaction.
683///
684/// Reads both entities, rewires/drops incident edges, merges entity fields, updates FTS,
685/// deletes the `from` vec entry (if `vec_table` is Some), and tombstones `from` with merge
686/// provenance.  Returns the updated `into` entity so the caller can do the async vec re-insert.
687///
688/// When `dry_run` is true, all reads and computations are performed but no writes are issued.
689#[allow(clippy::too_many_arguments)]
690fn merge_entity_sql(
691    conn: &rusqlite::Connection,
692    namespace: String,
693    fts_table: String,
694    vec_table: Option<String>,
695    into_id: Uuid,
696    from_id: Uuid,
697    strategy: EntityDedupMergePolicy,
698    dry_run: bool,
699) -> Result<(MergeSummary, Entity), SqliteError> {
700    let into_entity = read_merge_entity(conn, into_id, &namespace)?;
701    let from_entity = read_merge_entity(conn, from_id, &namespace)?;
702
703    // --- Collect edges incident to from_id ---
704    #[allow(dead_code)]
705    struct EdgeRow {
706        id: Uuid,
707        source_id: Uuid,
708        target_id: Uuid,
709        relation: String,
710        weight: f64,
711        created_at: i64,
712        updated_at: i64,
713        deleted_at: Option<i64>,
714        target_backend: Option<String>,
715        metadata: Option<String>,
716    }
717
718    let parse_id =
719        |s: String| Uuid::parse_str(&s).map_err(|e| SqliteError::InvalidData(e.to_string()));
720
721    let from_str = from_id.to_string();
722
723    let mut outbound: Vec<EdgeRow> = Vec::new();
724    {
725        let mut stmt = conn.prepare(
726            "SELECT id, source_id, target_id, relation, weight, created_at, \
727                    updated_at, deleted_at, target_backend, metadata \
728             FROM graph_edges WHERE namespace = ?1 AND source_id = ?2",
729        )?;
730        let mut rows = stmt.query(rusqlite::params![&namespace, &from_str])?;
731        while let Some(row) = rows.next()? {
732            outbound.push(EdgeRow {
733                id: parse_id(row.get(0)?)?,
734                source_id: parse_id(row.get(1)?)?,
735                target_id: parse_id(row.get(2)?)?,
736                relation: row.get(3)?,
737                weight: row.get(4)?,
738                created_at: row.get(5)?,
739                updated_at: row.get(6)?,
740                deleted_at: row.get(7)?,
741                target_backend: row.get(8)?,
742                metadata: row.get(9)?,
743            });
744        }
745    }
746
747    let mut inbound: Vec<EdgeRow> = Vec::new();
748    {
749        let mut stmt = conn.prepare(
750            "SELECT id, source_id, target_id, relation, weight, created_at, \
751                    updated_at, deleted_at, target_backend, metadata \
752             FROM graph_edges WHERE namespace = ?1 AND target_id = ?2",
753        )?;
754        let mut rows = stmt.query(rusqlite::params![&namespace, &from_str])?;
755        while let Some(row) = rows.next()? {
756            inbound.push(EdgeRow {
757                id: parse_id(row.get(0)?)?,
758                source_id: parse_id(row.get(1)?)?,
759                target_id: parse_id(row.get(2)?)?,
760                relation: row.get(3)?,
761                weight: row.get(4)?,
762                created_at: row.get(5)?,
763                updated_at: row.get(6)?,
764                deleted_at: row.get(7)?,
765                target_backend: row.get(8)?,
766                metadata: row.get(9)?,
767            });
768        }
769    }
770
771    // Deduplicate by edge ID (a self-edge from_id→from_id appears in both lists).
772    let mut seen: HashSet<Uuid> = HashSet::new();
773    let mut all_edges: Vec<EdgeRow> = Vec::new();
774    for edge in outbound.into_iter().chain(inbound) {
775        if seen.insert(edge.id) {
776            all_edges.push(edge);
777        }
778    }
779
780    // --- Merge entity fields ---
781    let (merged_props, properties_merged) =
782        merge_properties(&into_entity.properties, &from_entity.properties, strategy);
783    let merged_name = merge_string_field(&into_entity.name, &from_entity.name, strategy);
784    let merged_description =
785        merge_option_string_field(&into_entity.description, &from_entity.description, strategy);
786    let (merged_tags, tags_unioned) = union_tags(&into_entity.tags, &from_entity.tags);
787
788    let now = chrono::Utc::now().timestamp_micros();
789    let into_str = into_id.to_string();
790    let props_str = merged_props
791        .as_ref()
792        .map(|v| serde_json::to_string(v).unwrap_or_default());
793    let tags_json = serde_json::to_string(&merged_tags).unwrap_or_else(|_| "[]".to_string());
794
795    // --- Rewire edges ---
796    let mut edges_rewired = 0usize;
797    if !dry_run {
798        for edge in all_edges {
799            let raw_src = if edge.source_id == from_id {
800                into_id
801            } else {
802                edge.source_id
803            };
804            let raw_tgt = if edge.target_id == from_id {
805                into_id
806            } else {
807                edge.target_id
808            };
809            // Symmetric relations must be stored with source_uuid < target_uuid.
810            // Apply canonicalization so the conflict check and UPDATE both use the canonical form.
811            let (new_src, new_tgt) = match edge.relation.parse::<EdgeRelation>() {
812                Ok(rel) => canonical_edge_endpoints(rel, raw_src, raw_tgt),
813                Err(_) => (raw_src, raw_tgt),
814            };
815
816            if new_src == new_tgt {
817                conn.execute(
818                    "DELETE FROM graph_edges WHERE namespace = ?1 AND id = ?2",
819                    rusqlite::params![&namespace, edge.id.to_string()],
820                )?;
821                continue;
822            }
823
824            let now_ts = chrono::Utc::now().timestamp();
825            // H3 fix: preserve the original edge ID by updating
826            // source_id/target_id in-place when no conflict exists.
827            //
828            // Two-step approach to handle all cases while keeping the original ID:
829            //   (a) No conflict (new triple): UPDATE source_id/target_id in-place.
830            //       The edge retains its original UUID — callers can still get() it
831            //       by the ID they received from link().
832            //   (b) Conflict: into_id already has an edge with this (source,target,
833            //       relation). Delete the from-edge (it is superseded) and UPDATE
834            //       the existing into-edge to refresh weight/metadata/deleted_at.
835            //       The surviving edge is the into-entity's original edge (correct).
836            //
837            // Check for a conflict: does into_id already have this natural key?
838            let conflict_id: Option<String> = {
839                let conflict_src = new_src.to_string();
840                let conflict_tgt = new_tgt.to_string();
841                conn.query_row(
842                    "SELECT id FROM graph_edges \
843                     WHERE namespace = ?1 AND source_id = ?2 AND target_id = ?3 \
844                     AND relation = ?4 AND id != ?5",
845                    rusqlite::params![
846                        &namespace,
847                        &conflict_src,
848                        &conflict_tgt,
849                        &edge.relation,
850                        edge.id.to_string(),
851                    ],
852                    |row| row.get(0),
853                )
854                .optional()
855                .map_err(SqliteError::Rusqlite)?
856            };
857
858            let changed = if let Some(existing_id) = conflict_id {
859                // Case (b): a live or soft-deleted row already owns this natural key.
860                // Delete the from-edge and refresh the existing row.
861                conn.execute(
862                    "DELETE FROM graph_edges WHERE namespace = ?1 AND id = ?2",
863                    rusqlite::params![&namespace, edge.id.to_string()],
864                )?;
865                conn.execute(
866                    "UPDATE graph_edges SET \
867                     weight = ?1, updated_at = ?2, deleted_at = NULL, \
868                     target_backend = ?3, metadata = ?4 \
869                     WHERE namespace = ?5 AND id = ?6",
870                    rusqlite::params![
871                        edge.weight,
872                        now_ts,
873                        edge.target_backend,
874                        edge.metadata,
875                        &namespace,
876                        &existing_id,
877                    ],
878                )?
879            } else {
880                // Case (a): no conflict — update source_id/target_id in-place,
881                // preserving the original edge ID for callers.
882                conn.execute(
883                    "UPDATE graph_edges SET \
884                     source_id = ?1, target_id = ?2, updated_at = ?3 \
885                     WHERE namespace = ?4 AND id = ?5",
886                    rusqlite::params![
887                        new_src.to_string(),
888                        new_tgt.to_string(),
889                        now_ts,
890                        &namespace,
891                        edge.id.to_string(),
892                    ],
893                )?
894            };
895            if changed > 0 {
896                edges_rewired += 1;
897            }
898        }
899
900        // --- Upsert merged entity ---
901        conn.execute(
902            "INSERT OR REPLACE INTO entities \
903             (id, namespace, kind, name, description, properties, tags, \
904              created_at, updated_at, deleted_at, merged_into, merge_event_id) \
905             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
906            rusqlite::params![
907                &into_str,
908                &namespace,
909                &into_entity.kind,
910                &merged_name,
911                &merged_description,
912                &props_str,
913                &tags_json,
914                into_entity.created_at,
915                now,
916                into_entity.deleted_at,
917                Option::<String>::None,
918                Option::<String>::None,
919            ],
920        )?;
921
922        // --- Reindex into_id in FTS (delete existing, insert updated) ---
923        let fts_body = match &merged_description {
924            Some(d) if !d.is_empty() => format!("{} {}", merged_name, d),
925            _ => merged_name.clone(),
926        };
927        let kind_str = SubstrateKind::Entity.to_string();
928
929        conn.execute(
930            &format!(
931                "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
932                fts_table
933            ),
934            rusqlite::params![&namespace, &into_str],
935        )?;
936        conn.execute(
937            &format!(
938                "INSERT INTO {} \
939                 (subject_id, kind, title, body, tags, namespace, metadata, updated_at) \
940                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
941                fts_table
942            ),
943            rusqlite::params![
944                &into_str,
945                &kind_str,
946                &merged_name,
947                &fts_body,
948                &tags_json,
949                &namespace,
950                &props_str,
951                now,
952            ],
953        )?;
954
955        // --- Delete from_id from FTS ---
956        conn.execute(
957            &format!(
958                "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
959                fts_table
960            ),
961            rusqlite::params![&namespace, &from_str],
962        )?;
963
964        // --- Delete from_id from vector index if configured ---
965        if let Some(ref vec_tbl) = vec_table {
966            conn.execute(
967                &format!(
968                    "DELETE FROM {} WHERE subject_id = ?1 AND namespace = ?2",
969                    vec_tbl
970                ),
971                rusqlite::params![&from_str, &namespace],
972            )?;
973        }
974
975        // --- Tombstone from entity (soft-delete with provenance) ---
976        let merge_event_id = Uuid::new_v4();
977        conn.execute(
978            "UPDATE entities \
979             SET deleted_at = ?1, merged_into = ?2, merge_event_id = ?3, updated_at = ?1 \
980             WHERE namespace = ?4 AND id = ?5 AND deleted_at IS NULL",
981            rusqlite::params![
982                now,
983                into_str,
984                merge_event_id.to_string(),
985                &namespace,
986                &from_str,
987            ],
988        )?;
989    }
990
991    let updated_entity = Entity {
992        id: into_id,
993        namespace,
994        kind: into_entity.kind,
995        entity_type: into_entity.entity_type,
996        name: merged_name,
997        description: merged_description,
998        properties: merged_props,
999        tags: merged_tags,
1000        created_at: into_entity.created_at,
1001        updated_at: now,
1002        deleted_at: into_entity.deleted_at,
1003        merged_into: None,
1004        merge_event_id: None,
1005    };
1006
1007    Ok((
1008        MergeSummary {
1009            kept_id: into_id,
1010            removed_id: from_id,
1011            edges_rewired,
1012            properties_merged,
1013            tags_unioned,
1014            content_appended: false,
1015            dry_run,
1016        },
1017        updated_entity,
1018    ))
1019}
1020
1021// ---------------------------------------------------------------------------
1022// Note merge SQL helpers
1023// ---------------------------------------------------------------------------
1024
1025/// Read one note row by ID within a namespace, returning `SqliteError` on missing/wrong-ns.
1026fn read_merge_note(
1027    conn: &rusqlite::Connection,
1028    id: Uuid,
1029    namespace: &str,
1030) -> Result<khive_storage::note::Note, SqliteError> {
1031    use khive_storage::note::Note;
1032    let id_str = id.to_string();
1033    let mut stmt = conn.prepare(
1034        "SELECT id, namespace, kind, status, name, content, salience, decay_factor, \
1035         expires_at, properties, created_at, updated_at, deleted_at \
1036         FROM notes WHERE id = ?1 AND deleted_at IS NULL",
1037    )?;
1038    let mut rows = stmt.query(rusqlite::params![id_str])?;
1039    let row = rows
1040        .next()?
1041        .ok_or_else(|| SqliteError::InvalidData(format!("note {id} not found")))?;
1042
1043    let id_s: String = row.get(0)?;
1044    let ns: String = row.get(1)?;
1045    let kind: String = row.get(2)?;
1046    let status: String = row.get(3)?;
1047    let name: Option<String> = row.get(4)?;
1048    let content: String = row.get(5)?;
1049    let salience: Option<f64> = row.get(6)?;
1050    let decay_factor: Option<f64> = row.get(7)?;
1051    let expires_at: Option<i64> = row.get(8)?;
1052    let properties_str: Option<String> = row.get(9)?;
1053    let created_at: i64 = row.get(10)?;
1054    let updated_at: i64 = row.get(11)?;
1055    let deleted_at: Option<i64> = row.get(12)?;
1056
1057    if ns != namespace {
1058        return Err(SqliteError::InvalidData(format!(
1059            "note {id} belongs to namespace '{ns}', not '{namespace}'"
1060        )));
1061    }
1062
1063    let note_id = Uuid::parse_str(&id_s).map_err(|e| SqliteError::InvalidData(e.to_string()))?;
1064    let properties: Option<serde_json::Value> = properties_str
1065        .map(|s| serde_json::from_str(&s).map_err(|e| SqliteError::InvalidData(e.to_string())))
1066        .transpose()?;
1067
1068    Ok(Note {
1069        id: note_id,
1070        namespace: ns,
1071        kind,
1072        status,
1073        name,
1074        content,
1075        salience,
1076        decay_factor,
1077        expires_at,
1078        properties,
1079        created_at,
1080        updated_at,
1081        deleted_at,
1082    })
1083}
1084
1085fn max_option_f64(a: Option<f64>, b: Option<f64>) -> Option<f64> {
1086    match (a, b) {
1087        (Some(x), Some(y)) => Some(x.max(y)),
1088        (Some(x), None) => Some(x),
1089        (None, Some(y)) => Some(y),
1090        (None, None) => None,
1091    }
1092}
1093
1094fn append_merge_history(props: Option<Value>, entry: Value) -> Result<Option<Value>, SqliteError> {
1095    use serde_json::{json, Map};
1096    let mut obj: Map<String, Value> = match props {
1097        Some(Value::Object(m)) => m,
1098        Some(other) => {
1099            let mut m = Map::new();
1100            m.insert("_value".into(), other);
1101            m
1102        }
1103        None => Map::new(),
1104    };
1105    let history = obj
1106        .entry("_merge_history".to_string())
1107        .or_insert_with(|| json!([]));
1108    if let Value::Array(arr) = history {
1109        arr.push(entry);
1110    }
1111    Ok(Some(Value::Object(obj)))
1112}
1113
1114/// All note merge SQL on one connection inside a `BEGIN IMMEDIATE` transaction.
1115///
1116/// Reads both notes (must have same `kind`), rewires/drops incident edges, merges content
1117/// per `content_strategy`, tombstones `from`. Returns the updated `into` note for async
1118/// re-embedding.
1119///
1120/// When `dry_run` is true, all reads and computations are performed but no writes are issued.
1121#[allow(clippy::too_many_arguments)]
1122fn merge_note_sql(
1123    conn: &rusqlite::Connection,
1124    namespace: String,
1125    fts_table: String,
1126    vec_table: Option<String>,
1127    into_id: Uuid,
1128    from_id: Uuid,
1129    strategy: EntityDedupMergePolicy,
1130    content_strategy: ContentMergeStrategy,
1131    dry_run: bool,
1132) -> Result<(MergeSummary, khive_storage::note::Note), SqliteError> {
1133    let into_note = read_merge_note(conn, into_id, &namespace)?;
1134    let from_note = read_merge_note(conn, from_id, &namespace)?;
1135
1136    if into_note.kind != from_note.kind {
1137        return Err(SqliteError::InvalidData(format!(
1138            "cannot merge notes of different kinds: {} vs {}",
1139            into_note.kind, from_note.kind
1140        )));
1141    }
1142
1143    let now = chrono::Utc::now().timestamp_micros();
1144    let into_str = into_id.to_string();
1145    let from_str = from_id.to_string();
1146
1147    // Collect edges incident to from_id.
1148    #[allow(dead_code)]
1149    struct EdgeRow {
1150        id: Uuid,
1151        source_id: Uuid,
1152        target_id: Uuid,
1153        relation: String,
1154        weight: f64,
1155        created_at: i64,
1156        updated_at: i64,
1157        deleted_at: Option<i64>,
1158        target_backend: Option<String>,
1159        metadata: Option<String>,
1160    }
1161    let parse_id =
1162        |s: String| Uuid::parse_str(&s).map_err(|e| SqliteError::InvalidData(e.to_string()));
1163
1164    let mut outbound: Vec<EdgeRow> = Vec::new();
1165    {
1166        let mut stmt = conn.prepare(
1167            "SELECT id, source_id, target_id, relation, weight, created_at, updated_at, deleted_at, target_backend, metadata \
1168             FROM graph_edges WHERE namespace = ?1 AND source_id = ?2",
1169        )?;
1170        let mut rows = stmt.query(rusqlite::params![&namespace, &from_str])?;
1171        while let Some(row) = rows.next()? {
1172            outbound.push(EdgeRow {
1173                id: parse_id(row.get(0)?)?,
1174                source_id: parse_id(row.get(1)?)?,
1175                target_id: parse_id(row.get(2)?)?,
1176                relation: row.get(3)?,
1177                weight: row.get(4)?,
1178                created_at: row.get(5)?,
1179                updated_at: row.get(6)?,
1180                deleted_at: row.get(7)?,
1181                target_backend: row.get(8)?,
1182                metadata: row.get(9)?,
1183            });
1184        }
1185    }
1186    let mut inbound: Vec<EdgeRow> = Vec::new();
1187    {
1188        let mut stmt = conn.prepare(
1189            "SELECT id, source_id, target_id, relation, weight, created_at, updated_at, deleted_at, target_backend, metadata \
1190             FROM graph_edges WHERE namespace = ?1 AND target_id = ?2",
1191        )?;
1192        let mut rows = stmt.query(rusqlite::params![&namespace, &from_str])?;
1193        while let Some(row) = rows.next()? {
1194            inbound.push(EdgeRow {
1195                id: parse_id(row.get(0)?)?,
1196                source_id: parse_id(row.get(1)?)?,
1197                target_id: parse_id(row.get(2)?)?,
1198                relation: row.get(3)?,
1199                weight: row.get(4)?,
1200                created_at: row.get(5)?,
1201                updated_at: row.get(6)?,
1202                deleted_at: row.get(7)?,
1203                target_backend: row.get(8)?,
1204                metadata: row.get(9)?,
1205            });
1206        }
1207    }
1208    let mut seen: HashSet<Uuid> = HashSet::new();
1209    let mut all_edges: Vec<EdgeRow> = Vec::new();
1210    for edge in outbound.into_iter().chain(inbound) {
1211        if seen.insert(edge.id) {
1212            all_edges.push(edge);
1213        }
1214    }
1215
1216    // Merge note fields.
1217    let (merged_content, content_appended) = match content_strategy {
1218        ContentMergeStrategy::Append => {
1219            if from_note.content.is_empty() {
1220                (into_note.content.clone(), false)
1221            } else {
1222                (
1223                    format!("{}\n\n---\n\n{}", into_note.content, from_note.content),
1224                    true,
1225                )
1226            }
1227        }
1228        ContentMergeStrategy::PreferInto => (into_note.content.clone(), false),
1229        ContentMergeStrategy::PreferFrom => (from_note.content.clone(), false),
1230    };
1231
1232    let merged_name = match strategy {
1233        EntityDedupMergePolicy::PreferFrom => from_note.name.clone().or(into_note.name.clone()),
1234        _ => into_note.name.clone().or(from_note.name.clone()),
1235    };
1236
1237    let (merged_props, properties_merged) =
1238        merge_properties(&into_note.properties, &from_note.properties, strategy);
1239
1240    // Append merge history to properties.
1241    let merge_history_entry = serde_json::json!({
1242        "merged_from": from_id.to_string(),
1243        "merged_at": now,
1244        "strategy": format!("{:?}", strategy),
1245        "content_strategy": format!("{:?}", content_strategy),
1246    });
1247    let merged_props = append_merge_history(merged_props, merge_history_entry)?;
1248
1249    let merged_salience = max_option_f64(into_note.salience, from_note.salience);
1250    let merged_expires_at = match (into_note.expires_at, from_note.expires_at) {
1251        (Some(a), Some(b)) => Some(a.max(b)),
1252        (Some(a), None) => Some(a),
1253        (None, Some(b)) => Some(b),
1254        (None, None) => None,
1255    };
1256
1257    let props_str = merged_props
1258        .as_ref()
1259        .map(|v| serde_json::to_string(v).unwrap_or_default());
1260
1261    let mut edges_rewired = 0usize;
1262    if !dry_run {
1263        // Rewire and upsert.
1264        for edge in all_edges {
1265            let raw_src = if edge.source_id == from_id {
1266                into_id
1267            } else {
1268                edge.source_id
1269            };
1270            let raw_tgt = if edge.target_id == from_id {
1271                into_id
1272            } else {
1273                edge.target_id
1274            };
1275            // Canonicalize symmetric relations before conflict check + UPDATE.
1276            let (new_src, new_tgt) = match edge.relation.parse::<EdgeRelation>() {
1277                Ok(rel) => canonical_edge_endpoints(rel, raw_src, raw_tgt),
1278                Err(_) => (raw_src, raw_tgt),
1279            };
1280            if new_src == new_tgt {
1281                conn.execute(
1282                    "DELETE FROM graph_edges WHERE namespace = ?1 AND id = ?2",
1283                    rusqlite::params![&namespace, edge.id.to_string()],
1284                )?;
1285                continue;
1286            }
1287            let now_ts = chrono::Utc::now().timestamp();
1288            // Same two-step approach as entity merge rewire: preserve original edge ID
1289            // when no conflict, merge into existing row when conflict exists.
1290            let conflict_id: Option<String> = {
1291                let conflict_src = new_src.to_string();
1292                let conflict_tgt = new_tgt.to_string();
1293                conn.query_row(
1294                    "SELECT id FROM graph_edges \
1295                     WHERE namespace = ?1 AND source_id = ?2 AND target_id = ?3 \
1296                     AND relation = ?4 AND id != ?5",
1297                    rusqlite::params![
1298                        &namespace,
1299                        &conflict_src,
1300                        &conflict_tgt,
1301                        &edge.relation,
1302                        edge.id.to_string(),
1303                    ],
1304                    |row| row.get(0),
1305                )
1306                .optional()
1307                .map_err(SqliteError::Rusqlite)?
1308            };
1309
1310            let changed = if let Some(existing_id) = conflict_id {
1311                conn.execute(
1312                    "DELETE FROM graph_edges WHERE namespace = ?1 AND id = ?2",
1313                    rusqlite::params![&namespace, edge.id.to_string()],
1314                )?;
1315                conn.execute(
1316                    "UPDATE graph_edges SET \
1317                     weight = ?1, updated_at = ?2, deleted_at = NULL, \
1318                     target_backend = ?3, metadata = ?4 \
1319                     WHERE namespace = ?5 AND id = ?6",
1320                    rusqlite::params![
1321                        edge.weight,
1322                        now_ts,
1323                        edge.target_backend,
1324                        edge.metadata,
1325                        &namespace,
1326                        &existing_id,
1327                    ],
1328                )?
1329            } else {
1330                conn.execute(
1331                    "UPDATE graph_edges SET \
1332                     source_id = ?1, target_id = ?2, updated_at = ?3 \
1333                     WHERE namespace = ?4 AND id = ?5",
1334                    rusqlite::params![
1335                        new_src.to_string(),
1336                        new_tgt.to_string(),
1337                        now_ts,
1338                        &namespace,
1339                        edge.id.to_string(),
1340                    ],
1341                )?
1342            };
1343            if changed > 0 {
1344                edges_rewired += 1;
1345            }
1346        }
1347
1348        // Upsert merged into-note.
1349        conn.execute(
1350            "INSERT OR REPLACE INTO notes \
1351             (id, namespace, kind, status, name, content, salience, decay_factor, \
1352              expires_at, properties, created_at, updated_at, deleted_at) \
1353             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
1354            rusqlite::params![
1355                &into_str,
1356                &namespace,
1357                &into_note.kind,
1358                &into_note.status,
1359                &merged_name,
1360                &merged_content,
1361                merged_salience,
1362                into_note.decay_factor,
1363                merged_expires_at,
1364                &props_str,
1365                into_note.created_at,
1366                now,
1367                into_note.deleted_at,
1368            ],
1369        )?;
1370
1371        // Update FTS for into-note.
1372        conn.execute(
1373            &format!(
1374                "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
1375                fts_table
1376            ),
1377            rusqlite::params![&namespace, &into_str],
1378        )?;
1379        conn.execute(
1380            &format!(
1381                "INSERT INTO {} \
1382                 (subject_id, kind, title, body, tags, namespace, metadata, updated_at) \
1383                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1384                fts_table
1385            ),
1386            rusqlite::params![
1387                &into_str,
1388                SubstrateKind::Note.to_string(),
1389                &merged_name,
1390                &merged_content,
1391                "[]",
1392                &namespace,
1393                &props_str,
1394                now,
1395            ],
1396        )?;
1397
1398        // Delete from-note from FTS.
1399        conn.execute(
1400            &format!(
1401                "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
1402                fts_table
1403            ),
1404            rusqlite::params![&namespace, &from_str],
1405        )?;
1406
1407        // Delete from-note from vector index if configured.
1408        if let Some(ref vec_tbl) = vec_table {
1409            conn.execute(
1410                &format!(
1411                    "DELETE FROM {} WHERE subject_id = ?1 AND namespace = ?2",
1412                    vec_tbl
1413                ),
1414                rusqlite::params![&from_str, &namespace],
1415            )?;
1416        }
1417
1418        // Tombstone the from-note.
1419        conn.execute(
1420            "UPDATE notes SET status = 'deleted', deleted_at = ?1, updated_at = ?1 \
1421             WHERE namespace = ?2 AND id = ?3 AND deleted_at IS NULL",
1422            rusqlite::params![now, &namespace, &from_str],
1423        )?;
1424    }
1425
1426    let updated_note = khive_storage::note::Note {
1427        id: into_id,
1428        namespace: namespace.clone(),
1429        kind: into_note.kind.clone(),
1430        status: into_note.status.clone(),
1431        name: merged_name,
1432        content: merged_content,
1433        salience: merged_salience,
1434        decay_factor: into_note.decay_factor,
1435        expires_at: merged_expires_at,
1436        properties: merged_props,
1437        created_at: into_note.created_at,
1438        updated_at: now,
1439        deleted_at: into_note.deleted_at,
1440    };
1441
1442    Ok((
1443        MergeSummary {
1444            kept_id: into_id,
1445            removed_id: from_id,
1446            edges_rewired,
1447            properties_merged,
1448            tags_unioned: 0,
1449            content_appended,
1450            dry_run,
1451        },
1452        updated_note,
1453    ))
1454}
1455
1456// ---------------------------------------------------------------------------
1457// Merge helpers (pure functions — easier to unit test)
1458// ---------------------------------------------------------------------------
1459
1460fn merge_string_field(into: &str, from: &str, strategy: EntityDedupMergePolicy) -> String {
1461    match strategy {
1462        EntityDedupMergePolicy::PreferInto | EntityDedupMergePolicy::Union => into.to_string(),
1463        EntityDedupMergePolicy::PreferFrom => from.to_string(),
1464    }
1465}
1466
1467fn merge_option_string_field(
1468    into: &Option<String>,
1469    from: &Option<String>,
1470    strategy: EntityDedupMergePolicy,
1471) -> Option<String> {
1472    match strategy {
1473        EntityDedupMergePolicy::PreferInto => {
1474            if into.is_some() {
1475                into.clone()
1476            } else {
1477                from.clone()
1478            }
1479        }
1480        EntityDedupMergePolicy::PreferFrom => {
1481            if from.is_some() {
1482                from.clone()
1483            } else {
1484                into.clone()
1485            }
1486        }
1487        EntityDedupMergePolicy::Union => {
1488            // Keep into's description; if empty, append from's.
1489            match (into, from) {
1490                (Some(a), _) if !a.is_empty() => Some(a.clone()),
1491                (_, Some(b)) => Some(b.clone()),
1492                _ => None,
1493            }
1494        }
1495    }
1496}
1497
1498/// Merge two property objects. Returns (merged, count_of_fields_from_from_that_were_added).
1499fn merge_properties(
1500    into: &Option<Value>,
1501    from: &Option<Value>,
1502    strategy: EntityDedupMergePolicy,
1503) -> (Option<Value>, usize) {
1504    match (into, from) {
1505        (None, None) => (None, 0),
1506        (Some(a), None) => (Some(a.clone()), 0),
1507        (None, Some(b)) => {
1508            let count = if let Value::Object(m) = b { m.len() } else { 1 };
1509            (Some(b.clone()), count)
1510        }
1511        (Some(into_val), Some(from_val)) => {
1512            let (merged, added) = merge_json(into_val, from_val, strategy);
1513            (Some(merged), added)
1514        }
1515    }
1516}
1517
1518/// Deep-merge two JSON values per strategy. Returns (merged, keys_contributed_by_from).
1519fn merge_json(into: &Value, from: &Value, strategy: EntityDedupMergePolicy) -> (Value, usize) {
1520    match (into, from, strategy) {
1521        (Value::Object(a), Value::Object(b), EntityDedupMergePolicy::Union) => {
1522            let mut result = a.clone();
1523            let mut added = 0usize;
1524            for (k, v_from) in b {
1525                if let Some(v_into) = a.get(k) {
1526                    let (merged, sub_added) =
1527                        merge_json(v_into, v_from, EntityDedupMergePolicy::Union);
1528                    result.insert(k.clone(), merged);
1529                    added += sub_added;
1530                } else {
1531                    result.insert(k.clone(), v_from.clone());
1532                    added += 1;
1533                }
1534            }
1535            (Value::Object(result), added)
1536        }
1537        (Value::Object(a), Value::Object(b), EntityDedupMergePolicy::PreferInto) => {
1538            let mut result = a.clone();
1539            let mut added = 0usize;
1540            for (k, v) in b {
1541                if !a.contains_key(k) {
1542                    result.insert(k.clone(), v.clone());
1543                    added += 1;
1544                }
1545            }
1546            (Value::Object(result), added)
1547        }
1548        (Value::Object(a), Value::Object(b), EntityDedupMergePolicy::PreferFrom) => {
1549            let mut result = a.clone();
1550            let mut added = 0usize;
1551            for (k, v) in b {
1552                result.insert(k.clone(), v.clone());
1553                if !a.contains_key(k) {
1554                    added += 1;
1555                }
1556            }
1557            (Value::Object(result), added)
1558        }
1559        // Non-object scalars: apply strategy directly.
1560        (_into_val, from_val, EntityDedupMergePolicy::PreferFrom) => (from_val.clone(), 1),
1561        _ => (into.clone(), 0),
1562    }
1563}
1564
1565fn union_tags(into: &[String], from: &[String]) -> (Vec<String>, usize) {
1566    let mut seen: HashSet<&str> = into.iter().map(|s| s.as_str()).collect();
1567    let mut result: Vec<String> = into.to_vec();
1568    let mut added = 0usize;
1569    for tag in from {
1570        if seen.insert(tag.as_str()) {
1571            result.push(tag.clone());
1572            added += 1;
1573        }
1574    }
1575    (result, added)
1576}
1577
1578// ---------------------------------------------------------------------------
1579// INLINE TEST JUSTIFICATION: tests here exercise patch/merge helpers and the
1580// update_note/update_entity paths that share private merge_properties logic.
1581// Moving them to tests/ would require pub-exporting merge_properties, which is
1582// an internal invariant not suitable for the public API surface. Broad
1583// behavioral curation tests live in tests/integration.rs.
1584// ---------------------------------------------------------------------------
1585
1586#[cfg(test)]
1587mod tests {
1588    use super::*;
1589    use crate::runtime::{KhiveRuntime, NamespaceToken};
1590    use khive_storage::types::{Direction, TextFilter, TextQueryMode, TextSearchRequest};
1591
1592    fn rt() -> KhiveRuntime {
1593        KhiveRuntime::memory().unwrap()
1594    }
1595
1596    // Helper: search FTS5 for `query` in a runtime namespace.
1597    async fn fts_hit(rt: &KhiveRuntime, token: &NamespaceToken, query: &str) -> Vec<Uuid> {
1598        let ns = token.namespace().as_str().to_string();
1599        rt.text(token)
1600            .unwrap()
1601            .search(TextSearchRequest {
1602                query: query.to_string(),
1603                mode: TextQueryMode::Plain,
1604                filter: Some(TextFilter {
1605                    namespaces: vec![ns],
1606                    ..Default::default()
1607                }),
1608                top_k: 50,
1609                snippet_chars: 100,
1610            })
1611            .await
1612            .unwrap()
1613            .into_iter()
1614            .map(|h| h.subject_id)
1615            .collect()
1616    }
1617
1618    #[tokio::test]
1619    async fn update_entity_patch_changes_only_specified_fields() {
1620        let rt = rt();
1621        let tok = NamespaceToken::local();
1622        let entity = rt
1623            .create_entity(
1624                &tok,
1625                "concept",
1626                None,
1627                "OriginalName",
1628                Some("orig desc"),
1629                Some(serde_json::json!({"k":"v"})),
1630                vec![],
1631            )
1632            .await
1633            .unwrap();
1634
1635        let updated = rt
1636            .update_entity(
1637                &tok,
1638                entity.id,
1639                EntityPatch {
1640                    description: Some(Some("new desc".to_string())),
1641                    ..Default::default()
1642                },
1643            )
1644            .await
1645            .unwrap();
1646
1647        assert_eq!(updated.name, "OriginalName");
1648        assert_eq!(updated.description.as_deref(), Some("new desc"));
1649        assert_eq!(updated.properties, Some(serde_json::json!({"k":"v"})));
1650    }
1651
1652    #[tokio::test]
1653    async fn update_entity_clear_description_with_some_none() {
1654        let rt = rt();
1655        let tok = NamespaceToken::local();
1656        let entity = rt
1657            .create_entity(
1658                &tok,
1659                "concept",
1660                None,
1661                "ClearDesc",
1662                Some("has description"),
1663                None,
1664                vec![],
1665            )
1666            .await
1667            .unwrap();
1668
1669        let updated = rt
1670            .update_entity(
1671                &tok,
1672                entity.id,
1673                EntityPatch {
1674                    description: Some(None),
1675                    ..Default::default()
1676                },
1677            )
1678            .await
1679            .unwrap();
1680
1681        assert!(
1682            updated.description.is_none(),
1683            "description should be cleared"
1684        );
1685    }
1686
1687    #[tokio::test]
1688    async fn update_entity_reindexes_when_name_changes() {
1689        let rt = rt();
1690        let tok = NamespaceToken::local();
1691        let entity = rt
1692            .create_entity(&tok, "concept", None, "OldName", None, None, vec![])
1693            .await
1694            .unwrap();
1695
1696        // Old name is findable.
1697        let hits_before = fts_hit(&rt, &tok, "OldName").await;
1698        assert!(
1699            hits_before.contains(&entity.id),
1700            "entity should be findable by old name"
1701        );
1702
1703        rt.update_entity(
1704            &tok,
1705            entity.id,
1706            EntityPatch {
1707                name: Some("NewName".to_string()),
1708                ..Default::default()
1709            },
1710        )
1711        .await
1712        .unwrap();
1713
1714        let hits_old = fts_hit(&rt, &tok, "OldName").await;
1715        let hits_new = fts_hit(&rt, &tok, "NewName").await;
1716
1717        // After rename, old name no longer matches this entity (FTS index updated).
1718        assert!(
1719            !hits_old.contains(&entity.id),
1720            "old name should no longer match after rename"
1721        );
1722        assert!(
1723            hits_new.contains(&entity.id),
1724            "new name should be findable after rename"
1725        );
1726    }
1727
1728    #[tokio::test]
1729    async fn update_entity_properties_merges_preserving_existing_keys() {
1730        let rt = rt();
1731        let tok = NamespaceToken::local();
1732        let entity = rt
1733            .create_entity(
1734                &tok,
1735                "concept",
1736                None,
1737                "MergeProps",
1738                None,
1739                Some(serde_json::json!({
1740                    "domain": "inference",
1741                    "repo": "lattice",
1742                    "status": "researched",
1743                })),
1744                vec![],
1745            )
1746            .await
1747            .unwrap();
1748
1749        let updated = rt
1750            .update_entity(
1751                &tok,
1752                entity.id,
1753                EntityPatch {
1754                    properties: Some(serde_json::json!({"status": "implemented"})),
1755                    ..Default::default()
1756                },
1757            )
1758            .await
1759            .unwrap();
1760
1761        let props = updated.properties.expect("properties should remain set");
1762        assert_eq!(props["domain"], "inference", "domain key must be preserved");
1763        assert_eq!(props["repo"], "lattice", "repo key must be preserved");
1764        assert_eq!(
1765            props["status"], "implemented",
1766            "status key must be updated by patch"
1767        );
1768    }
1769
1770    #[tokio::test]
1771    async fn update_entity_skips_reindex_when_only_properties_change() {
1772        let rt = rt();
1773        let tok = NamespaceToken::local();
1774        let entity = rt
1775            .create_entity(&tok, "concept", None, "StableIndexed", None, None, vec![])
1776            .await
1777            .unwrap();
1778
1779        // Verify it's in the index before.
1780        let hits_before = fts_hit(&rt, &tok, "StableIndexed").await;
1781        assert!(hits_before.contains(&entity.id));
1782
1783        // Only patch properties — text index should be untouched (still findable).
1784        rt.update_entity(
1785            &tok,
1786            entity.id,
1787            EntityPatch {
1788                properties: Some(serde_json::json!({"new": "prop"})),
1789                ..Default::default()
1790            },
1791        )
1792        .await
1793        .unwrap();
1794
1795        let hits_after = fts_hit(&rt, &tok, "StableIndexed").await;
1796        assert!(
1797            hits_after.contains(&entity.id),
1798            "still findable after props-only patch"
1799        );
1800    }
1801
1802    #[tokio::test]
1803    async fn merge_entity_rewires_edges() {
1804        let rt = rt();
1805        let tok = NamespaceToken::local();
1806        let a = rt
1807            .create_entity(&tok, "concept", None, "A", None, None, vec![])
1808            .await
1809            .unwrap();
1810        let b = rt
1811            .create_entity(&tok, "concept", None, "B", None, None, vec![])
1812            .await
1813            .unwrap();
1814        let c = rt
1815            .create_entity(&tok, "concept", None, "C", None, None, vec![])
1816            .await
1817            .unwrap();
1818        let d = rt
1819            .create_entity(&tok, "concept", None, "D", None, None, vec![])
1820            .await
1821            .unwrap();
1822
1823        // A→B and C→B; merge B into D → should become A→D and C→D.
1824        rt.link(&tok, a.id, b.id, EdgeRelation::Extends, 1.0, None)
1825            .await
1826            .unwrap();
1827        rt.link(&tok, c.id, b.id, EdgeRelation::Extends, 1.0, None)
1828            .await
1829            .unwrap();
1830
1831        let summary = rt
1832            .merge_entity(&tok, d.id, b.id, EntityDedupMergePolicy::PreferInto, false)
1833            .await
1834            .unwrap();
1835
1836        assert_eq!(summary.kept_id, d.id);
1837        assert_eq!(summary.removed_id, b.id);
1838        assert_eq!(summary.edges_rewired, 2);
1839
1840        // Verify edges now point to D.
1841        let a_neighbors = rt
1842            .neighbors(&tok, a.id, Direction::Out, None, None)
1843            .await
1844            .unwrap();
1845        assert_eq!(a_neighbors.len(), 1);
1846        assert_eq!(a_neighbors[0].node_id, d.id);
1847
1848        let c_neighbors = rt
1849            .neighbors(&tok, c.id, Direction::Out, None, None)
1850            .await
1851            .unwrap();
1852        assert_eq!(c_neighbors.len(), 1);
1853        assert_eq!(c_neighbors[0].node_id, d.id);
1854    }
1855
1856    #[tokio::test]
1857    async fn merge_entity_self_merge_rejected() {
1858        let rt = rt();
1859        let tok = NamespaceToken::local();
1860        let a = rt
1861            .create_entity(&tok, "concept", None, "A", None, None, vec![])
1862            .await
1863            .unwrap();
1864        let err = rt
1865            .merge_entity(&tok, a.id, a.id, EntityDedupMergePolicy::PreferInto, false)
1866            .await
1867            .unwrap_err();
1868        assert!(
1869            format!("{err:?}").contains("cannot merge an entity into itself"),
1870            "expected self-merge rejection, got: {err:?}"
1871        );
1872    }
1873
1874    #[tokio::test]
1875    async fn merge_entity_prefer_into_strategy() {
1876        let rt = rt();
1877        let tok = NamespaceToken::local();
1878        let into = rt
1879            .create_entity(
1880                &tok,
1881                "concept",
1882                None,
1883                "Into",
1884                None,
1885                Some(serde_json::json!({"a": 1})),
1886                vec![],
1887            )
1888            .await
1889            .unwrap();
1890        let from = rt
1891            .create_entity(
1892                &tok,
1893                "concept",
1894                None,
1895                "From",
1896                None,
1897                Some(serde_json::json!({"a": 2, "b": 3})),
1898                vec![],
1899            )
1900            .await
1901            .unwrap();
1902
1903        rt.merge_entity(
1904            &tok,
1905            into.id,
1906            from.id,
1907            EntityDedupMergePolicy::PreferInto,
1908            false,
1909        )
1910        .await
1911        .unwrap();
1912
1913        let kept = rt.get_entity(&tok, into.id).await.unwrap();
1914        let props = kept.properties.unwrap();
1915        // a stays as 1 (into wins), b is added from from.
1916        assert_eq!(props["a"], 1);
1917        assert_eq!(props["b"], 3);
1918    }
1919
1920    #[tokio::test]
1921    async fn merge_entity_prefer_from_strategy() {
1922        let rt = rt();
1923        let tok = NamespaceToken::local();
1924        let into = rt
1925            .create_entity(
1926                &tok,
1927                "concept",
1928                None,
1929                "Into",
1930                None,
1931                Some(serde_json::json!({"a": 1})),
1932                vec![],
1933            )
1934            .await
1935            .unwrap();
1936        let from = rt
1937            .create_entity(
1938                &tok,
1939                "concept",
1940                None,
1941                "From",
1942                None,
1943                Some(serde_json::json!({"a": 2, "b": 3})),
1944                vec![],
1945            )
1946            .await
1947            .unwrap();
1948
1949        rt.merge_entity(
1950            &tok,
1951            into.id,
1952            from.id,
1953            EntityDedupMergePolicy::PreferFrom,
1954            false,
1955        )
1956        .await
1957        .unwrap();
1958
1959        let kept = rt.get_entity(&tok, into.id).await.unwrap();
1960        let props = kept.properties.unwrap();
1961        // from wins on a, b also from from.
1962        assert_eq!(props["a"], 2);
1963        assert_eq!(props["b"], 3);
1964    }
1965
1966    #[tokio::test]
1967    async fn merge_entity_union_strategy() {
1968        let rt = rt();
1969        let tok = NamespaceToken::local();
1970        let into = rt
1971            .create_entity(
1972                &tok,
1973                "concept",
1974                None,
1975                "Into",
1976                None,
1977                Some(serde_json::json!({"a": 1})),
1978                vec![],
1979            )
1980            .await
1981            .unwrap();
1982        let from = rt
1983            .create_entity(
1984                &tok,
1985                "concept",
1986                None,
1987                "From",
1988                None,
1989                Some(serde_json::json!({"a": 2, "b": 3})),
1990                vec![],
1991            )
1992            .await
1993            .unwrap();
1994
1995        rt.merge_entity(&tok, into.id, from.id, EntityDedupMergePolicy::Union, false)
1996            .await
1997            .unwrap();
1998
1999        let kept = rt.get_entity(&tok, into.id).await.unwrap();
2000        let props = kept.properties.unwrap();
2001        // Scalar conflict: into wins → a=1. b added from from.
2002        assert_eq!(props["a"], 1);
2003        assert_eq!(props["b"], 3);
2004    }
2005
2006    #[tokio::test]
2007    async fn merge_entity_unions_tags() {
2008        let rt = rt();
2009        let tok = NamespaceToken::local();
2010        let into = rt
2011            .create_entity(
2012                &tok,
2013                "concept",
2014                None,
2015                "Into",
2016                None,
2017                None,
2018                vec!["x".to_string(), "y".to_string()],
2019            )
2020            .await
2021            .unwrap();
2022        let from = rt
2023            .create_entity(
2024                &tok,
2025                "concept",
2026                None,
2027                "From",
2028                None,
2029                None,
2030                vec!["y".to_string(), "z".to_string()],
2031            )
2032            .await
2033            .unwrap();
2034
2035        rt.merge_entity(
2036            &tok,
2037            into.id,
2038            from.id,
2039            EntityDedupMergePolicy::PreferInto,
2040            false,
2041        )
2042        .await
2043        .unwrap();
2044
2045        let kept = rt.get_entity(&tok, into.id).await.unwrap();
2046        let mut tags = kept.tags.clone();
2047        tags.sort();
2048        assert_eq!(tags, vec!["x", "y", "z"]);
2049    }
2050
2051    #[tokio::test]
2052    async fn merge_entity_drops_self_loops() {
2053        let rt = rt();
2054        let tok = NamespaceToken::local();
2055        let a = rt
2056            .create_entity(&tok, "concept", None, "A", None, None, vec![])
2057            .await
2058            .unwrap();
2059        let b = rt
2060            .create_entity(&tok, "concept", None, "B", None, None, vec![])
2061            .await
2062            .unwrap();
2063
2064        // A `extends` B — merging B into A would produce A `extends` A → drop it.
2065        rt.link(&tok, a.id, b.id, EdgeRelation::Extends, 1.0, None)
2066            .await
2067            .unwrap();
2068
2069        let summary = rt
2070            .merge_entity(&tok, a.id, b.id, EntityDedupMergePolicy::PreferInto, false)
2071            .await
2072            .unwrap();
2073
2074        assert_eq!(
2075            summary.edges_rewired, 0,
2076            "self-loop should be dropped, not rewired"
2077        );
2078
2079        let a_out = rt
2080            .neighbors(&tok, a.id, Direction::Out, None, None)
2081            .await
2082            .unwrap();
2083        assert!(a_out.is_empty(), "no self-loop should remain");
2084    }
2085
2086    // ---- merge helper unit tests ----
2087
2088    #[test]
2089    fn union_tags_deduplicates() {
2090        let (tags, added) = union_tags(
2091            &["x".to_string(), "y".to_string()],
2092            &["y".to_string(), "z".to_string()],
2093        );
2094        let mut sorted = tags.clone();
2095        sorted.sort();
2096        assert_eq!(sorted, vec!["x", "y", "z"]);
2097        assert_eq!(added, 1);
2098    }
2099
2100    #[test]
2101    fn merge_properties_prefer_into_fills_missing_keys() {
2102        let a = serde_json::json!({"a": 1});
2103        let b = serde_json::json!({"a": 99, "b": 2});
2104        let (merged, added) =
2105            merge_properties(&Some(a), &Some(b), EntityDedupMergePolicy::PreferInto);
2106        let m = merged.unwrap();
2107        assert_eq!(m["a"], 1);
2108        assert_eq!(m["b"], 2);
2109        assert_eq!(added, 1);
2110    }
2111
2112    // ---- tombstone and note merge tests ----
2113
2114    #[tokio::test]
2115    async fn merge_entity_tombstones_source_with_provenance() {
2116        let rt = rt();
2117        let tok = NamespaceToken::local();
2118        let into = rt
2119            .create_entity(&tok, "concept", None, "Into", None, None, vec![])
2120            .await
2121            .unwrap();
2122        let from = rt
2123            .create_entity(&tok, "concept", None, "From", None, None, vec![])
2124            .await
2125            .unwrap();
2126        let from_id = from.id;
2127
2128        rt.merge_entity(
2129            &tok,
2130            into.id,
2131            from_id,
2132            EntityDedupMergePolicy::PreferInto,
2133            false,
2134        )
2135        .await
2136        .unwrap();
2137
2138        // After merge, get_entity returns an error (soft-deleted rows are excluded).
2139        assert!(
2140            rt.get_entity(&tok, from_id).await.is_err(),
2141            "tombstoned source should not be returned by get_entity"
2142        );
2143
2144        // Verify the source row still exists in SQL with provenance.
2145        let pool = rt.backend().pool_arc();
2146        let (deleted_at, merged_into): (Option<i64>, Option<String>) =
2147            tokio::task::spawn_blocking(move || {
2148                let guard = pool.writer().unwrap();
2149                guard
2150                    .conn()
2151                    .query_row(
2152                        "SELECT deleted_at, merged_into FROM entities WHERE id = ?1",
2153                        [from_id.to_string()],
2154                        |row| Ok((row.get(0)?, row.get(1)?)),
2155                    )
2156                    .unwrap()
2157            })
2158            .await
2159            .unwrap();
2160        assert!(
2161            deleted_at.is_some(),
2162            "tombstoned entity must have deleted_at set"
2163        );
2164        assert_eq!(
2165            merged_into.as_deref(),
2166            Some(into.id.to_string().as_str()),
2167            "merged_into must point to into_id"
2168        );
2169    }
2170
2171    #[tokio::test]
2172    async fn merge_note_same_kind_appends_content() {
2173        let rt = rt();
2174        let tok = NamespaceToken::local();
2175        let into = rt
2176            .create_note(
2177                &tok,
2178                "observation",
2179                None,
2180                "Into content",
2181                None,
2182                None,
2183                vec![],
2184            )
2185            .await
2186            .unwrap();
2187        let from = rt
2188            .create_note(
2189                &tok,
2190                "observation",
2191                None,
2192                "From content",
2193                None,
2194                None,
2195                vec![],
2196            )
2197            .await
2198            .unwrap();
2199        let from_id = from.id;
2200
2201        let summary = rt
2202            .merge_note(
2203                &tok,
2204                into.id,
2205                from_id,
2206                EntityDedupMergePolicy::PreferInto,
2207                ContentMergeStrategy::Append,
2208                false,
2209            )
2210            .await
2211            .unwrap();
2212
2213        assert_eq!(summary.kept_id, into.id);
2214        assert_eq!(summary.removed_id, from_id);
2215        assert!(summary.content_appended);
2216        assert!(!summary.dry_run);
2217
2218        // Source is no longer findable.
2219        let from_store = rt.notes(&tok).unwrap();
2220        assert!(
2221            from_store.get_note(from_id).await.unwrap().is_none(),
2222            "merged-from note should be soft-deleted"
2223        );
2224    }
2225
2226    #[tokio::test]
2227    async fn merge_note_different_kinds_rejected() {
2228        let rt = rt();
2229        let tok = NamespaceToken::local();
2230        let into = rt
2231            .create_note(&tok, "observation", None, "Into", None, None, vec![])
2232            .await
2233            .unwrap();
2234        let from = rt
2235            .create_note(&tok, "decision", None, "From", None, None, vec![])
2236            .await
2237            .unwrap();
2238
2239        let result = rt
2240            .merge_note(
2241                &tok,
2242                into.id,
2243                from.id,
2244                EntityDedupMergePolicy::PreferInto,
2245                ContentMergeStrategy::Append,
2246                false,
2247            )
2248            .await;
2249        assert!(result.is_err(), "merging different note kinds must fail");
2250    }
2251
2252    #[tokio::test]
2253    async fn merge_note_dry_run_leaves_notes_unchanged() {
2254        let rt = rt();
2255        let tok = NamespaceToken::local();
2256        let into = rt
2257            .create_note(
2258                &tok,
2259                "observation",
2260                None,
2261                "Into content",
2262                None,
2263                None,
2264                vec![],
2265            )
2266            .await
2267            .unwrap();
2268        let from = rt
2269            .create_note(
2270                &tok,
2271                "observation",
2272                None,
2273                "From content",
2274                None,
2275                None,
2276                vec![],
2277            )
2278            .await
2279            .unwrap();
2280        let into_id = into.id;
2281        let from_id = from.id;
2282
2283        let summary = rt
2284            .merge_note(
2285                &tok,
2286                into_id,
2287                from_id,
2288                EntityDedupMergePolicy::PreferInto,
2289                ContentMergeStrategy::Append,
2290                true,
2291            )
2292            .await
2293            .unwrap();
2294
2295        assert!(summary.dry_run);
2296
2297        // Both notes still exist unchanged.
2298        let store = rt.notes(&tok).unwrap();
2299        let into_after = store.get_note(into_id).await.unwrap().unwrap();
2300        let from_after = store.get_note(from_id).await.unwrap().unwrap();
2301        assert_eq!(
2302            into_after.content, "Into content",
2303            "dry_run must not mutate into-note"
2304        );
2305        assert_eq!(
2306            from_after.content, "From content",
2307            "dry_run must not mutate from-note"
2308        );
2309    }
2310
2311    #[tokio::test]
2312    async fn update_edge_updates_properties() {
2313        use khive_storage::EdgeRelation;
2314        let rt = rt();
2315        let tok = NamespaceToken::local();
2316        let a = rt
2317            .create_entity(&tok, "concept", None, "A", None, None, vec![])
2318            .await
2319            .unwrap();
2320        let b = rt
2321            .create_entity(&tok, "concept", None, "B", None, None, vec![])
2322            .await
2323            .unwrap();
2324        let edge = rt
2325            .link(&tok, a.id, b.id, EdgeRelation::Extends, 0.5, None)
2326            .await
2327            .unwrap();
2328        let edge_id: Uuid = edge.id.into();
2329
2330        let updated = rt
2331            .update_edge(
2332                &tok,
2333                edge_id,
2334                EdgePatch {
2335                    properties: Some(serde_json::json!({"source": "manual"})),
2336                    ..Default::default()
2337                },
2338            )
2339            .await
2340            .unwrap();
2341
2342        assert_eq!(updated.metadata.as_ref().unwrap()["source"], "manual");
2343        assert!((updated.weight - 0.5).abs() < 0.001, "weight unchanged");
2344    }
2345
2346    // scenario-kg-maintenance C1 regression: merge must not crash when both
2347    // entities share a common third-party edge (duplicate triple after rewire).
2348    // Before the fix, the double-ON-CONFLICT INSERT raised a UNIQUE constraint
2349    // error at the SQLite layer and the merge aborted mid-transaction.
2350    #[tokio::test]
2351    async fn merge_entity_survives_shared_edge_to_third_party() {
2352        use khive_storage::EdgeRelation;
2353        let rt = rt();
2354        let tok = NamespaceToken::local();
2355
2356        // Create three entities: A and B will be merged; shared is the common target.
2357        // Use `extends` (concept→concept) which is a valid endpoint combination.
2358        let a = rt
2359            .create_entity(&tok, "concept", None, "A", None, None, vec![])
2360            .await
2361            .unwrap();
2362        let b = rt
2363            .create_entity(&tok, "concept", None, "B", None, None, vec![])
2364            .await
2365            .unwrap();
2366        let shared = rt
2367            .create_entity(&tok, "concept", None, "Shared", None, None, vec![])
2368            .await
2369            .unwrap();
2370
2371        // Both A and B extend the same shared concept — this creates a duplicate
2372        // triple (A/B → shared, extends) that triggers the crash on rewire.
2373        rt.link(&tok, a.id, shared.id, EdgeRelation::Extends, 1.0, None)
2374            .await
2375            .unwrap();
2376        rt.link(&tok, b.id, shared.id, EdgeRelation::Extends, 1.0, None)
2377            .await
2378            .unwrap();
2379
2380        // Before the fix this would return Err with "UNIQUE constraint failed".
2381        let summary = rt
2382            .merge_entity(
2383                &tok,
2384                a.id,
2385                b.id,
2386                crate::EntityDedupMergePolicy::PreferInto,
2387                false,
2388            )
2389            .await
2390            .expect(
2391                "C1: merge must succeed even when both entities share an edge to a third party",
2392            );
2393
2394        assert_eq!(summary.kept_id, a.id);
2395        assert_eq!(summary.removed_id, b.id);
2396        // A already had the Extends edge to shared; when B→shared is rewired to
2397        // A→shared, the ON CONFLICT DO UPDATE refreshes the existing row (clears
2398        // deleted_at, updates weight). rusqlite reports this as 1 change, so
2399        // edges_rewired will be >= 0. The important invariant is that the merge
2400        // did NOT crash and exactly one live edge A→shared remains.
2401
2402        // One live edge A→shared must exist after merge.
2403        let a_edges = rt
2404            .list_edges(
2405                &tok,
2406                crate::EdgeListFilter {
2407                    source_id: Some(a.id),
2408                    target_id: Some(shared.id),
2409                    relations: vec![EdgeRelation::Extends],
2410                    ..Default::default()
2411                },
2412                10,
2413            )
2414            .await
2415            .unwrap();
2416        assert_eq!(
2417            a_edges.len(),
2418            1,
2419            "C1: exactly one live A→shared Extends edge must exist after merge; got: {a_edges:?}"
2420        );
2421
2422        // Tombstone check: B must be soft-deleted after successful merge (C3).
2423        // get_entity filters deleted_at IS NULL, so a tombstoned entity returns None.
2424        let b_after = rt.entities(&tok).unwrap().get_entity(b.id).await.unwrap();
2425        assert!(
2426            b_after.is_none(),
2427            "C3: from_entity must be tombstoned (get_entity returns None for deleted) after merge; got: {b_after:?}"
2428        );
2429    }
2430
2431    // H2 regression: merge_entity at the runtime level must reject cross-kind merges.
2432    // Before the H2 fix, only the pack handler had this guard; a direct runtime caller
2433    // could still merge concept+project, silently tombstoning the source entity.
2434    #[tokio::test]
2435    async fn merge_entity_cross_kind_rejected_at_runtime() {
2436        let rt = rt();
2437        let tok = NamespaceToken::local();
2438
2439        let concept = rt
2440            .create_entity(&tok, "concept", None, "H2Concept", None, None, vec![])
2441            .await
2442            .unwrap();
2443        let project = rt
2444            .create_entity(&tok, "project", None, "H2Project", None, None, vec![])
2445            .await
2446            .unwrap();
2447
2448        // Cross-kind merge must return InvalidInput at the runtime level.
2449        let err = rt
2450            .merge_entity(
2451                &tok,
2452                concept.id,
2453                project.id,
2454                crate::EntityDedupMergePolicy::PreferInto,
2455                false,
2456            )
2457            .await
2458            .expect_err("H2: cross-kind merge must be rejected by runtime");
2459        assert!(
2460            matches!(err, crate::RuntimeError::InvalidInput(_)),
2461            "H2: expected InvalidInput, got: {err:?}"
2462        );
2463
2464        // Both entities must survive the failed merge attempt with no tombstone.
2465        let concept_after = rt.get_entity(&tok, concept.id).await;
2466        let project_after = rt.get_entity(&tok, project.id).await;
2467        assert!(
2468            concept_after.is_ok(),
2469            "H2: concept must remain live after rejected merge; got: {concept_after:?}"
2470        );
2471        assert!(
2472            project_after.is_ok(),
2473            "H2: project must remain live after rejected merge; got: {project_after:?}"
2474        );
2475    }
2476
2477    // scenario-kg-maintenance C2 regression: same-kind merge must succeed.
2478    #[tokio::test]
2479    async fn merge_entity_same_kind_succeeds() {
2480        let rt = rt();
2481        let tok = NamespaceToken::local();
2482
2483        let c1 = rt
2484            .create_entity(&tok, "concept", None, "Concept1", None, None, vec![])
2485            .await
2486            .unwrap();
2487        let c2 = rt
2488            .create_entity(&tok, "concept", None, "Concept2", None, None, vec![])
2489            .await
2490            .unwrap();
2491
2492        let summary = rt
2493            .merge_entity(
2494                &tok,
2495                c1.id,
2496                c2.id,
2497                crate::EntityDedupMergePolicy::PreferInto,
2498                false,
2499            )
2500            .await
2501            .expect("same-kind merge must succeed");
2502        assert_eq!(summary.kept_id, c1.id);
2503        assert_eq!(summary.removed_id, c2.id);
2504
2505        // c2 must be tombstoned.
2506        let c2_after = rt.entities(&tok).unwrap().get_entity(c2.id).await.unwrap();
2507        assert!(c2_after.is_none(), "from_entity must be tombstoned");
2508    }
2509
2510    // ── #567 regression: cross-namespace merge_note must be denied on either ID ──
2511
2512    #[tokio::test]
2513    async fn merge_note_cross_namespace_either_id_returns_not_found() {
2514        use crate::error::RuntimeError;
2515        use crate::Namespace;
2516
2517        let rt = rt();
2518        let ns_a = NamespaceToken::for_namespace(Namespace::parse("ns-a").unwrap());
2519        let ns_b = NamespaceToken::for_namespace(Namespace::parse("ns-b").unwrap());
2520
2521        let into_a = rt
2522            .create_note(&ns_a, "observation", None, "Into A", None, None, vec![])
2523            .await
2524            .unwrap();
2525        let from_a = rt
2526            .create_note(&ns_a, "observation", None, "From A", None, None, vec![])
2527            .await
2528            .unwrap();
2529        let note_b = rt
2530            .create_note(&ns_b, "observation", None, "Note B", None, None, vec![])
2531            .await
2532            .unwrap();
2533
2534        // foreign into_id: note_b belongs to ns_b, caller token is ns_a
2535        let foreign_into = rt
2536            .merge_note(
2537                &ns_a,
2538                note_b.id,
2539                from_a.id,
2540                EntityDedupMergePolicy::PreferInto,
2541                ContentMergeStrategy::Append,
2542                false,
2543            )
2544            .await;
2545        assert!(
2546            matches!(foreign_into, Err(RuntimeError::NotFound(_))),
2547            "foreign into_id must be denied before merge, got {foreign_into:?}"
2548        );
2549
2550        // foreign from_id: note_b belongs to ns_b, caller token is ns_a
2551        let foreign_from = rt
2552            .merge_note(
2553                &ns_a,
2554                into_a.id,
2555                note_b.id,
2556                EntityDedupMergePolicy::PreferInto,
2557                ContentMergeStrategy::Append,
2558                false,
2559            )
2560            .await;
2561        assert!(
2562            matches!(foreign_from, Err(RuntimeError::NotFound(_))),
2563            "foreign from_id must be denied before merge, got {foreign_from:?}"
2564        );
2565    }
2566
2567    // ── #hardening Item 5: cross-namespace entity update/merge must be denied ──
2568
2569    #[tokio::test]
2570    async fn update_entity_cross_namespace_returns_not_found_and_preserves_source() {
2571        use crate::error::RuntimeError;
2572        use crate::Namespace;
2573
2574        let rt = rt();
2575        let ns_a = NamespaceToken::for_namespace(Namespace::parse("ns-a").unwrap());
2576        let ns_b = NamespaceToken::for_namespace(Namespace::parse("ns-b").unwrap());
2577
2578        let entity = rt
2579            .create_entity(
2580                &ns_a,
2581                "concept",
2582                None,
2583                "Alpha",
2584                Some("original"),
2585                None,
2586                vec![],
2587            )
2588            .await
2589            .unwrap();
2590
2591        let err = rt
2592            .update_entity(
2593                &ns_b,
2594                entity.id,
2595                EntityPatch {
2596                    name: Some("Compromised".into()),
2597                    ..Default::default()
2598                },
2599            )
2600            .await;
2601
2602        assert!(
2603            matches!(err, Err(RuntimeError::NotFound(_))),
2604            "cross-namespace update must return opaque NotFound, got {err:?}"
2605        );
2606
2607        let after = rt.get_entity(&ns_a, entity.id).await.unwrap();
2608        assert_eq!(
2609            after.name, "Alpha",
2610            "foreign update must not mutate source row"
2611        );
2612        assert_eq!(after.description.as_deref(), Some("original"));
2613    }
2614
2615    #[tokio::test]
2616    async fn merge_entity_cross_namespace_either_id_returns_not_found() {
2617        use crate::error::RuntimeError;
2618        use crate::Namespace;
2619
2620        let rt = rt();
2621        let ns_a = NamespaceToken::for_namespace(Namespace::parse("ns-a").unwrap());
2622        let ns_b = NamespaceToken::for_namespace(Namespace::parse("ns-b").unwrap());
2623
2624        let into_a = rt
2625            .create_entity(&ns_a, "concept", None, "Into A", None, None, vec![])
2626            .await
2627            .unwrap();
2628        let from_a = rt
2629            .create_entity(&ns_a, "concept", None, "From A", None, None, vec![])
2630            .await
2631            .unwrap();
2632        let foreign_b = rt
2633            .create_entity(&ns_b, "concept", None, "Foreign B", None, None, vec![])
2634            .await
2635            .unwrap();
2636
2637        // foreign into_id: foreign_b belongs to ns_b, caller token is ns_a
2638        let foreign_into = rt
2639            .merge_entity(
2640                &ns_a,
2641                foreign_b.id,
2642                from_a.id,
2643                EntityDedupMergePolicy::PreferInto,
2644                false,
2645            )
2646            .await;
2647        assert!(
2648            matches!(foreign_into, Err(RuntimeError::NotFound(_))),
2649            "foreign into_id must be denied before merge, got {foreign_into:?}"
2650        );
2651
2652        // foreign from_id: foreign_b belongs to ns_b, caller token is ns_a
2653        let foreign_from = rt
2654            .merge_entity(
2655                &ns_a,
2656                into_a.id,
2657                foreign_b.id,
2658                EntityDedupMergePolicy::PreferInto,
2659                false,
2660            )
2661            .await;
2662        assert!(
2663            matches!(foreign_from, Err(RuntimeError::NotFound(_))),
2664            "foreign from_id must be denied before merge, got {foreign_from:?}"
2665        );
2666
2667        // All three entities survive the failed merges.
2668        assert!(rt.get_entity(&ns_a, into_a.id).await.is_ok());
2669        assert!(rt.get_entity(&ns_a, from_a.id).await.is_ok());
2670        assert!(rt.get_entity(&ns_b, foreign_b.id).await.is_ok());
2671    }
2672}