Skip to main content

khive_runtime/
curation.rs

1// Licensed under the Apache License, Version 2.0.
2
3//! Curation operations: entity update/merge and edge-list filter type.
4//!
5//! See ADR-014 for the full specification and semantics.
6
7use std::collections::HashSet;
8
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use uuid::Uuid;
12
13use khive_db::SqliteError;
14use khive_storage::types::{EdgeFilter, TextDocument};
15use khive_storage::{EdgeRelation, Entity, SubstrateKind};
16use khive_types::EventKind;
17
18use crate::error::{RuntimeError, RuntimeResult};
19use crate::runtime::{KhiveRuntime, NamespaceToken};
20
21// ---------------------------------------------------------------------------
22// Public types
23// ---------------------------------------------------------------------------
24
25/// Patch for `update_entity`. Only `Some(_)` fields are applied; `None` means "leave unchanged".
26///
27/// For `description`:
28/// - `None` (outer) — leave the current description as-is
29/// - `Some(None)` — clear the description (set to NULL)
30/// - `Some(Some(s))` — set the description to `s`
31///
32/// For `properties` (deep-merge semantics):
33/// - `None` — leave properties as-is
34/// - `Some(value)` — deep-merge `value` into existing properties. Keys present in
35///   the patch overwrite existing keys; keys absent from the patch are preserved.
36///   Removing a key requires explicit replacement of the parent object (or a future
37///   `unset`/`null-marker` extension).
38///
39/// For `tags` — replace semantics: `Some(vec)` sets tags to exactly `vec`. To add
40/// a tag without losing existing tags, read the entity first, push the new tag,
41/// and pass the full list back.
42#[derive(Clone, Debug, Default)]
43pub struct EntityPatch {
44    pub name: Option<String>,
45    pub description: Option<Option<String>>,
46    pub properties: Option<Value>,
47    pub tags: Option<Vec<String>>,
48}
49
50/// Policy used when deduplicating two entities.
51#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
52#[serde(rename_all = "snake_case")]
53pub enum EntityDedupMergePolicy {
54    /// `into` values win on conflict. Tags are unioned. Properties from `from` fill in
55    /// keys that `into` doesn't have. This is the default.
56    #[default]
57    PreferInto,
58    /// `from` values win on conflict.
59    PreferFrom,
60    /// Deep-merge: object properties merge recursively. Scalar conflicts go to `into`.
61    Union,
62}
63
64/// Strategy for merging note content when two notes are combined.
65#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
66#[serde(rename_all = "snake_case")]
67pub enum ContentMergeStrategy {
68    #[default]
69    Append,
70    PreferInto,
71    PreferFrom,
72}
73
74/// Result returned by `merge_entity` / `merge_note`.
75#[derive(Clone, Debug, Serialize, Deserialize)]
76pub struct MergeSummary {
77    pub kept_id: Uuid,
78    pub removed_id: Uuid,
79    pub edges_rewired: usize,
80    pub properties_merged: usize,
81    pub tags_unioned: usize,
82    pub content_appended: bool,
83    pub dry_run: bool,
84}
85
86/// Patch for `update_edge`. Only `Some(_)` fields are applied; `None` means "leave unchanged".
87///
88/// For `properties` — replacement semantics (not deep merge): `Some(value)` replaces
89/// the entire metadata object. `None` leaves metadata unchanged.
90#[derive(Clone, Debug, Default)]
91pub struct EdgePatch {
92    pub relation: Option<EdgeRelation>,
93    pub weight: Option<f64>,
94    pub properties: Option<Value>,
95}
96
97/// Patch for `update_note`. Only `Some(_)` fields are applied; `None` means "leave unchanged".
98///
99/// For `salience`/`decay_factor`:
100/// - `None` (outer) — leave unchanged
101/// - `Some(None)` — clear the value
102/// - `Some(Some(v))` — set to v
103#[derive(Clone, Debug, Default)]
104pub struct NotePatch {
105    pub name: Option<Option<String>>,
106    pub content: Option<String>,
107    pub salience: Option<Option<f64>>,
108    pub decay_factor: Option<Option<f64>>,
109    pub properties: Option<Value>,
110    pub(crate) kind_status: Option<String>,
111}
112
113impl NotePatch {
114    /// Construct a `NotePatch` from the public fields only.
115    /// Use this from external crates; `kind_status` is set to `None`.
116    pub fn new(
117        name: Option<Option<String>>,
118        content: Option<String>,
119        salience: Option<Option<f64>>,
120        decay_factor: Option<Option<f64>>,
121        properties: Option<Value>,
122    ) -> Self {
123        Self {
124            name,
125            content,
126            salience,
127            decay_factor,
128            properties,
129            kind_status: None,
130        }
131    }
132}
133
134/// Filter for `list_edges` / `count_edges`.
135#[derive(Clone, Debug, Default)]
136pub struct EdgeListFilter {
137    pub source_id: Option<Uuid>,
138    pub target_id: Option<Uuid>,
139    /// Empty = any relation.
140    pub relations: Vec<EdgeRelation>,
141    pub min_weight: Option<f64>,
142    pub max_weight: Option<f64>,
143}
144
145impl From<EdgeListFilter> for EdgeFilter {
146    fn from(f: EdgeListFilter) -> Self {
147        EdgeFilter {
148            source_ids: f.source_id.into_iter().collect(),
149            target_ids: f.target_id.into_iter().collect(),
150            relations: f.relations,
151            min_weight: f.min_weight,
152            max_weight: f.max_weight,
153            ..Default::default()
154        }
155    }
156}
157
158// ---------------------------------------------------------------------------
159// Implementation
160// ---------------------------------------------------------------------------
161
162impl KhiveRuntime {
163    /// Patch-style entity update.
164    ///
165    /// Only fields set to `Some(_)` are changed. Re-indexes FTS5 (and vectors if configured)
166    /// when `name` or `description` changes; skips re-indexing for property/tag-only patches.
167    ///
168    /// Returns `RuntimeError::NotFound` if the entity does not exist or belongs to a different
169    /// namespace. This enforces ADR-007 namespace isolation at the runtime layer.
170    pub async fn update_entity(
171        &self,
172        token: &NamespaceToken,
173        id: Uuid,
174        patch: EntityPatch,
175    ) -> RuntimeResult<Entity> {
176        let store = self.entities(token)?;
177        let mut entity = store
178            .get_entity(id)
179            .await?
180            .ok_or_else(|| RuntimeError::NotFound(format!("entity {id}")))?;
181
182        self.ensure_namespace(&entity.namespace, token, id)?;
183
184        let mut text_changed = false;
185        let mut changed_fields: Vec<&'static str> = Vec::new();
186
187        if let Some(name) = patch.name {
188            text_changed |= entity.name != name;
189            entity.name = name;
190            changed_fields.push("name");
191        }
192        if let Some(desc_patch) = patch.description {
193            text_changed |= entity.description != desc_patch;
194            entity.description = desc_patch;
195            changed_fields.push("description");
196        }
197        if let Some(props) = patch.properties {
198            let (merged, _) = merge_properties(
199                &entity.properties,
200                &Some(props),
201                EntityDedupMergePolicy::PreferFrom,
202            );
203            entity.properties = merged;
204            changed_fields.push("properties");
205        }
206        if let Some(tags) = patch.tags {
207            entity.tags = tags;
208            changed_fields.push("tags");
209        }
210
211        entity.updated_at = chrono::Utc::now().timestamp_micros();
212        store.upsert_entity(entity.clone()).await?;
213
214        if text_changed {
215            self.reindex_entity(token, &entity).await?;
216        }
217
218        let event_store = self.events(token)?;
219        let event = khive_storage::event::Event::new(
220            entity.namespace.clone(),
221            "update",
222            EventKind::EntityUpdated,
223            SubstrateKind::Entity,
224            "",
225        )
226        .with_target(entity.id)
227        .with_payload(serde_json::json!({
228            "id": entity.id,
229            "namespace": entity.namespace,
230            "changed_fields": changed_fields,
231        }));
232        event_store.append_event(event).await.map_err(|e| {
233            RuntimeError::Internal(format!("update_entity: event store write failed: {e}"))
234        })?;
235
236        Ok(entity)
237    }
238
239    /// Merge `from_id` into `into_id`.
240    ///
241    /// All edges incident to `from_id` are rewired to `into_id`. Self-loops that would
242    /// result from the rewire are dropped. Properties and tags are merged per `strategy`.
243    /// `from_id` is tombstoned with merge provenance and removed from indexes. Returns a summary.
244    ///
245    /// If `dry_run` is true, computes and returns the planned summary without mutating any rows.
246    ///
247    /// Atomic: all SQL (entity reads/writes, edge rewires, FTS updates, vec-index delete)
248    /// runs on a single pool connection inside one `BEGIN IMMEDIATE` transaction via
249    /// `merge_entity_sql`. If embedding vectors are configured, the vector re-insert for
250    /// `into_id` is performed after the transaction (requires async embedding computation).
251    pub async fn merge_entity(
252        &self,
253        token: &NamespaceToken,
254        into_id: Uuid,
255        from_id: Uuid,
256        strategy: EntityDedupMergePolicy,
257        dry_run: bool,
258    ) -> RuntimeResult<MergeSummary> {
259        if into_id == from_id {
260            return Err(RuntimeError::InvalidInput(
261                "cannot merge an entity into itself".into(),
262            ));
263        }
264        let ns = token.namespace().as_str().to_owned();
265        let sanitized_ns: String = ns
266            .chars()
267            .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
268            .collect();
269        let fts_table = format!("fts_entities_{}", sanitized_ns);
270        let vec_table = self.config().embedding_model.map(|model| {
271            let key: String = model
272                .to_string()
273                .chars()
274                .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
275                .collect();
276            format!("vec_{}", key)
277        });
278
279        // Ensure all required tables exist before entering the transaction.
280        // Each accessor applies its DDL idempotently via `CREATE TABLE IF NOT EXISTS`.
281        let _ = self.entities(token)?;
282        let _ = self.graph(token)?;
283        let _ = self.text(token)?;
284        if self.config().embedding_model.is_some() {
285            let _ = self.vectors(token)?;
286        }
287
288        let pool = self.backend().pool_arc();
289
290        let (summary, updated_entity) = tokio::task::spawn_blocking(move || {
291            let guard = pool.writer()?;
292            guard.transaction(|conn| {
293                merge_entity_sql(
294                    conn, ns, fts_table, vec_table, into_id, from_id, strategy, dry_run,
295                )
296            })
297        })
298        .await
299        .map_err(|e| RuntimeError::Internal(e.to_string()))??;
300
301        // If vectors are configured, reindex into_entity (requires async embedding).
302        // FTS and vec-delete were already committed inside the transaction above.
303        if !dry_run && self.config().embedding_model.is_some() {
304            self.reindex_entity(token, &updated_entity).await?;
305        }
306
307        let event_store = self.events(token)?;
308        // Mirror the wire-level strategy spelling from MergeParams so consumers
309        // can round-trip the policy string back into a request.
310        let policy_str = match strategy {
311            EntityDedupMergePolicy::PreferInto => "prefer_into",
312            EntityDedupMergePolicy::PreferFrom => "prefer_from",
313            EntityDedupMergePolicy::Union => "union",
314        };
315        let event = khive_storage::event::Event::new(
316            updated_entity.namespace.clone(),
317            "merge",
318            EventKind::EntityMerged,
319            SubstrateKind::Entity,
320            "",
321        )
322        .with_target(summary.kept_id)
323        .with_payload(serde_json::json!({
324            "into_id": summary.kept_id,
325            "from_id": summary.removed_id,
326            "policy": policy_str,
327            "edges_rewired": summary.edges_rewired,
328        }));
329        event_store.append_event(event).await.map_err(|e| {
330            RuntimeError::Internal(format!("merge_entity: event store write failed: {e}"))
331        })?;
332
333        Ok(summary)
334    }
335
336    // ---- Internal helpers ----
337
338    /// Re-upsert FTS5 document (and vector if model configured) for the entity.
339    ///
340    /// Uses `entity.namespace` — the authoritative namespace stored on the record — rather
341    /// than the caller-supplied `namespace` parameter. This prevents a cross-namespace
342    /// reindex from writing the search document into the wrong namespace's FTS index.
343    pub(crate) async fn reindex_entity(
344        &self,
345        token: &NamespaceToken,
346        entity: &Entity,
347    ) -> RuntimeResult<()> {
348        let body = match &entity.description {
349            Some(d) if !d.is_empty() => format!("{} {}", entity.name, d),
350            _ => entity.name.clone(),
351        };
352        // Use entity.namespace (authoritative) rather than token.namespace().as_str() (caller claim).
353        let ns = entity.namespace.clone();
354        self.text(token)?
355            .upsert_document(TextDocument {
356                subject_id: entity.id,
357                kind: SubstrateKind::Entity,
358                title: Some(entity.name.clone()),
359                body: body.clone(),
360                tags: entity.tags.clone(),
361                namespace: ns.clone(),
362                metadata: entity.properties.clone(),
363                updated_at: chrono::Utc::now(),
364            })
365            .await?;
366
367        if self.config().embedding_model.is_some() {
368            let vector = self.embed(&body).await?;
369            self.vectors(token)?
370                .insert(
371                    entity.id,
372                    SubstrateKind::Entity,
373                    &ns,
374                    "entity.body",
375                    vec![vector],
376                )
377                .await?;
378        }
379
380        Ok(())
381    }
382
383    /// Remove an entity from FTS5 and (if configured) vector indexes.
384    pub(crate) async fn remove_from_indexes(
385        &self,
386        token: &NamespaceToken,
387        id: Uuid,
388    ) -> RuntimeResult<()> {
389        let ns = token.namespace().as_str().to_owned();
390        self.text(token)?.delete_document(&ns, id).await?;
391        if self.config().embedding_model.is_some() {
392            self.vectors(token)?.delete(id).await?;
393        }
394        Ok(())
395    }
396
397    /// Re-upsert FTS5 document (and vector if model configured) for the note.
398    pub(crate) async fn reindex_note(
399        &self,
400        token: &NamespaceToken,
401        note: &khive_storage::note::Note,
402    ) -> RuntimeResult<()> {
403        let ns = note.namespace.clone();
404        self.text_for_notes(token)?
405            .upsert_document(TextDocument {
406                subject_id: note.id,
407                kind: SubstrateKind::Note,
408                title: note.name.clone(),
409                body: note.content.clone(),
410                tags: Vec::new(),
411                namespace: ns.clone(),
412                metadata: note.properties.clone(),
413                updated_at: chrono::Utc::now(),
414            })
415            .await?;
416
417        if self.config().embedding_model.is_some() {
418            let vector = self.embed(&note.content).await?;
419            self.vectors(token)?
420                .insert(
421                    note.id,
422                    SubstrateKind::Note,
423                    &ns,
424                    "note.content",
425                    vec![vector],
426                )
427                .await?;
428        }
429        Ok(())
430    }
431
432    /// Patch-style note update.
433    pub async fn update_note(
434        &self,
435        token: &NamespaceToken,
436        id: Uuid,
437        patch: NotePatch,
438    ) -> RuntimeResult<khive_storage::note::Note> {
439        let store = self.notes(token)?;
440        let mut note = store
441            .get_note(id)
442            .await?
443            .ok_or_else(|| RuntimeError::NotFound(format!("note {id}")))?;
444
445        if note.namespace != token.namespace().as_str() {
446            return Err(RuntimeError::NotFound(format!("note {id}")));
447        }
448
449        let mut text_changed = false;
450
451        if let Some(name_patch) = patch.name {
452            text_changed |= note.name != name_patch;
453            note.name = name_patch;
454        }
455        if let Some(content) = patch.content {
456            text_changed |= note.content != content;
457            note.content = content;
458        }
459        if let Some(salience_patch) = patch.salience {
460            note.salience = salience_patch.map(|s| s.clamp(0.0, 1.0));
461        }
462        if let Some(decay_patch) = patch.decay_factor {
463            note.decay_factor = decay_patch.map(|d| d.max(0.0));
464        }
465        if let Some(props) = patch.properties {
466            let (merged, _) = merge_properties(
467                &note.properties,
468                &Some(props),
469                EntityDedupMergePolicy::PreferFrom,
470            );
471            note.properties = merged;
472        }
473        if let Some(status) = patch.kind_status {
474            note.status = status;
475        }
476
477        note.updated_at = chrono::Utc::now().timestamp_micros();
478        store.upsert_note(note.clone()).await?;
479
480        if text_changed {
481            self.reindex_note(token, &note).await?;
482        }
483
484        Ok(note)
485    }
486
487    /// Merge `from_id` note into `into_id` note.
488    ///
489    /// Both notes must exist in the namespace and have the same `kind`. Content is merged
490    /// per `content_strategy`. Properties are merged per `strategy`. `from_id` is
491    /// tombstoned (status='deleted', deleted_at set). Returns a summary.
492    ///
493    /// If `dry_run` is true, computes and returns the planned summary without mutating
494    /// any rows, edges, or indexes.
495    pub async fn merge_note(
496        &self,
497        token: &NamespaceToken,
498        into_id: Uuid,
499        from_id: Uuid,
500        strategy: EntityDedupMergePolicy,
501        content_strategy: ContentMergeStrategy,
502        dry_run: bool,
503    ) -> RuntimeResult<MergeSummary> {
504        if into_id == from_id {
505            return Err(RuntimeError::InvalidInput(
506                "cannot merge a note into itself".into(),
507            ));
508        }
509        let ns = token.namespace().as_str().to_string();
510        let sanitized_ns: String = ns
511            .chars()
512            .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
513            .collect();
514        let fts_table = format!("fts_notes_{}", sanitized_ns);
515        let vec_table = self.config().embedding_model.map(|model| {
516            let key: String = model
517                .to_string()
518                .chars()
519                .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
520                .collect();
521            format!("vec_{}", key)
522        });
523
524        let _ = self.notes(token)?;
525        let _ = self.graph(token)?;
526        let _ = self.text_for_notes(token)?;
527        if self.config().embedding_model.is_some() {
528            let _ = self.vectors(token)?;
529        }
530
531        let pool = self.backend().pool_arc();
532        let (summary, updated_note) = tokio::task::spawn_blocking(move || {
533            let guard = pool.writer()?;
534            guard.transaction(|conn| {
535                merge_note_sql(
536                    conn,
537                    ns,
538                    fts_table,
539                    vec_table,
540                    into_id,
541                    from_id,
542                    strategy,
543                    content_strategy,
544                    dry_run,
545                )
546            })
547        })
548        .await
549        .map_err(|e| RuntimeError::Internal(e.to_string()))??;
550
551        if !dry_run && self.config().embedding_model.is_some() {
552            self.reindex_note(token, &updated_note).await?;
553        }
554        Ok(summary)
555    }
556}
557
558// ---------------------------------------------------------------------------
559// Transactional merge SQL helpers
560// ---------------------------------------------------------------------------
561
562/// Read one entity row by ID within a namespace, returning `SqliteError` on missing/wrong-ns.
563fn read_merge_entity(
564    conn: &rusqlite::Connection,
565    id: Uuid,
566    namespace: &str,
567) -> Result<Entity, SqliteError> {
568    let id_str = id.to_string();
569    let mut stmt = conn.prepare(
570        "SELECT id, namespace, kind, entity_type, name, description, properties, tags, \
571         created_at, updated_at, deleted_at, merged_into, merge_event_id \
572         FROM entities WHERE id = ?1 AND deleted_at IS NULL",
573    )?;
574    let mut rows = stmt.query(rusqlite::params![id_str])?;
575    let row = rows
576        .next()?
577        .ok_or_else(|| SqliteError::InvalidData(format!("entity {id} not found")))?;
578
579    let id_s: String = row.get(0)?;
580    let ns: String = row.get(1)?;
581    let kind: String = row.get(2)?;
582    let entity_type: Option<String> = row.get(3)?;
583    let name: String = row.get(4)?;
584    let description: Option<String> = row.get(5)?;
585    let properties_str: Option<String> = row.get(6)?;
586    let tags_str: String = row.get(7)?;
587    let created_at: i64 = row.get(8)?;
588    let updated_at: i64 = row.get(9)?;
589    let deleted_at: Option<i64> = row.get(10)?;
590    let merged_into_str: Option<String> = row.get(11)?;
591    let merge_event_id_str: Option<String> = row.get(12)?;
592
593    if ns != namespace {
594        return Err(SqliteError::InvalidData(format!(
595            "entity {id} belongs to namespace '{ns}', not '{namespace}'"
596        )));
597    }
598
599    let entity_id = Uuid::parse_str(&id_s).map_err(|e| SqliteError::InvalidData(e.to_string()))?;
600    let properties: Option<Value> = properties_str
601        .map(|s| {
602            serde_json::from_str::<Value>(&s).map_err(|e| SqliteError::InvalidData(e.to_string()))
603        })
604        .transpose()?;
605    let tags: Vec<String> =
606        serde_json::from_str(&tags_str).map_err(|e| SqliteError::InvalidData(e.to_string()))?;
607    let merged_into = merged_into_str
608        .as_deref()
609        .map(Uuid::parse_str)
610        .transpose()
611        .map_err(|e| SqliteError::InvalidData(e.to_string()))?;
612    let merge_event_id = merge_event_id_str
613        .as_deref()
614        .map(Uuid::parse_str)
615        .transpose()
616        .map_err(|e| SqliteError::InvalidData(e.to_string()))?;
617
618    Ok(Entity {
619        id: entity_id,
620        namespace: ns,
621        kind,
622        entity_type,
623        name,
624        description,
625        properties,
626        tags,
627        created_at,
628        updated_at,
629        deleted_at,
630        merged_into,
631        merge_event_id,
632    })
633}
634
635/// All merge SQL on one connection inside an already-open `BEGIN IMMEDIATE` transaction.
636///
637/// Reads both entities, rewires/drops incident edges, merges entity fields, updates FTS,
638/// deletes the `from` vec entry (if `vec_table` is Some), and tombstones `from` with merge
639/// provenance.  Returns the updated `into` entity so the caller can do the async vec re-insert.
640///
641/// When `dry_run` is true, all reads and computations are performed but no writes are issued.
642#[allow(clippy::too_many_arguments)]
643fn merge_entity_sql(
644    conn: &rusqlite::Connection,
645    namespace: String,
646    fts_table: String,
647    vec_table: Option<String>,
648    into_id: Uuid,
649    from_id: Uuid,
650    strategy: EntityDedupMergePolicy,
651    dry_run: bool,
652) -> Result<(MergeSummary, Entity), SqliteError> {
653    let into_entity = read_merge_entity(conn, into_id, &namespace)?;
654    let from_entity = read_merge_entity(conn, from_id, &namespace)?;
655
656    // --- Collect edges incident to from_id ---
657    #[allow(dead_code)]
658    struct EdgeRow {
659        id: Uuid,
660        source_id: Uuid,
661        target_id: Uuid,
662        relation: String,
663        weight: f64,
664        created_at: i64,
665        updated_at: i64,
666        deleted_at: Option<i64>,
667        target_backend: Option<String>,
668        metadata: Option<String>,
669    }
670
671    let parse_id =
672        |s: String| Uuid::parse_str(&s).map_err(|e| SqliteError::InvalidData(e.to_string()));
673
674    let from_str = from_id.to_string();
675
676    let mut outbound: Vec<EdgeRow> = Vec::new();
677    {
678        let mut stmt = conn.prepare(
679            "SELECT id, source_id, target_id, relation, weight, created_at, \
680                    updated_at, deleted_at, target_backend, metadata \
681             FROM graph_edges WHERE namespace = ?1 AND source_id = ?2",
682        )?;
683        let mut rows = stmt.query(rusqlite::params![&namespace, &from_str])?;
684        while let Some(row) = rows.next()? {
685            outbound.push(EdgeRow {
686                id: parse_id(row.get(0)?)?,
687                source_id: parse_id(row.get(1)?)?,
688                target_id: parse_id(row.get(2)?)?,
689                relation: row.get(3)?,
690                weight: row.get(4)?,
691                created_at: row.get(5)?,
692                updated_at: row.get(6)?,
693                deleted_at: row.get(7)?,
694                target_backend: row.get(8)?,
695                metadata: row.get(9)?,
696            });
697        }
698    }
699
700    let mut inbound: Vec<EdgeRow> = Vec::new();
701    {
702        let mut stmt = conn.prepare(
703            "SELECT id, source_id, target_id, relation, weight, created_at, \
704                    updated_at, deleted_at, target_backend, metadata \
705             FROM graph_edges WHERE namespace = ?1 AND target_id = ?2",
706        )?;
707        let mut rows = stmt.query(rusqlite::params![&namespace, &from_str])?;
708        while let Some(row) = rows.next()? {
709            inbound.push(EdgeRow {
710                id: parse_id(row.get(0)?)?,
711                source_id: parse_id(row.get(1)?)?,
712                target_id: parse_id(row.get(2)?)?,
713                relation: row.get(3)?,
714                weight: row.get(4)?,
715                created_at: row.get(5)?,
716                updated_at: row.get(6)?,
717                deleted_at: row.get(7)?,
718                target_backend: row.get(8)?,
719                metadata: row.get(9)?,
720            });
721        }
722    }
723
724    // Deduplicate by edge ID (a self-edge from_id→from_id appears in both lists).
725    let mut seen: HashSet<Uuid> = HashSet::new();
726    let mut all_edges: Vec<EdgeRow> = Vec::new();
727    for edge in outbound.into_iter().chain(inbound) {
728        if seen.insert(edge.id) {
729            all_edges.push(edge);
730        }
731    }
732
733    // --- Merge entity fields ---
734    let (merged_props, properties_merged) =
735        merge_properties(&into_entity.properties, &from_entity.properties, strategy);
736    let merged_name = merge_string_field(&into_entity.name, &from_entity.name, strategy);
737    let merged_description =
738        merge_option_string_field(&into_entity.description, &from_entity.description, strategy);
739    let (merged_tags, tags_unioned) = union_tags(&into_entity.tags, &from_entity.tags);
740
741    let now = chrono::Utc::now().timestamp_micros();
742    let into_str = into_id.to_string();
743    let props_str = merged_props
744        .as_ref()
745        .map(|v| serde_json::to_string(v).unwrap_or_default());
746    let tags_json = serde_json::to_string(&merged_tags).unwrap_or_else(|_| "[]".to_string());
747
748    // --- Rewire edges ---
749    let mut edges_rewired = 0usize;
750    if !dry_run {
751        for edge in all_edges {
752            let new_src = if edge.source_id == from_id {
753                into_id
754            } else {
755                edge.source_id
756            };
757            let new_tgt = if edge.target_id == from_id {
758                into_id
759            } else {
760                edge.target_id
761            };
762
763            if new_src == new_tgt {
764                conn.execute(
765                    "DELETE FROM graph_edges WHERE namespace = ?1 AND id = ?2",
766                    rusqlite::params![&namespace, edge.id.to_string()],
767                )?;
768                continue;
769            }
770
771            let now_ts = chrono::Utc::now().timestamp();
772            conn.execute(
773                "INSERT INTO graph_edges \
774                 (namespace, id, source_id, target_id, relation, weight, created_at, updated_at, deleted_at, target_backend, metadata) \
775                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11) \
776                 ON CONFLICT(namespace, id) DO UPDATE SET \
777                     source_id = excluded.source_id, \
778                     target_id = excluded.target_id, \
779                     relation = excluded.relation, \
780                     weight = excluded.weight, \
781                     updated_at = excluded.updated_at, \
782                     metadata = excluded.metadata \
783                 ON CONFLICT(namespace, source_id, target_id, relation) DO NOTHING",
784                rusqlite::params![
785                    &namespace,
786                    edge.id.to_string(),
787                    new_src.to_string(),
788                    new_tgt.to_string(),
789                    &edge.relation,
790                    edge.weight,
791                    edge.created_at,
792                    now_ts,
793                    edge.deleted_at,
794                    edge.target_backend,
795                    edge.metadata,
796                ],
797            )?;
798            edges_rewired += 1;
799        }
800
801        // --- Upsert merged entity ---
802        conn.execute(
803            "INSERT OR REPLACE INTO entities \
804             (id, namespace, kind, name, description, properties, tags, \
805              created_at, updated_at, deleted_at, merged_into, merge_event_id) \
806             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
807            rusqlite::params![
808                &into_str,
809                &namespace,
810                &into_entity.kind,
811                &merged_name,
812                &merged_description,
813                &props_str,
814                &tags_json,
815                into_entity.created_at,
816                now,
817                into_entity.deleted_at,
818                Option::<String>::None,
819                Option::<String>::None,
820            ],
821        )?;
822
823        // --- Reindex into_id in FTS (delete existing, insert updated) ---
824        let fts_body = match &merged_description {
825            Some(d) if !d.is_empty() => format!("{} {}", merged_name, d),
826            _ => merged_name.clone(),
827        };
828        let kind_str = SubstrateKind::Entity.to_string();
829
830        conn.execute(
831            &format!(
832                "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
833                fts_table
834            ),
835            rusqlite::params![&namespace, &into_str],
836        )?;
837        conn.execute(
838            &format!(
839                "INSERT INTO {} \
840                 (subject_id, kind, title, body, tags, namespace, metadata, updated_at) \
841                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
842                fts_table
843            ),
844            rusqlite::params![
845                &into_str,
846                &kind_str,
847                &merged_name,
848                &fts_body,
849                &tags_json,
850                &namespace,
851                &props_str,
852                now,
853            ],
854        )?;
855
856        // --- Delete from_id from FTS ---
857        conn.execute(
858            &format!(
859                "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
860                fts_table
861            ),
862            rusqlite::params![&namespace, &from_str],
863        )?;
864
865        // --- Delete from_id from vector index if configured ---
866        if let Some(ref vec_tbl) = vec_table {
867            conn.execute(
868                &format!(
869                    "DELETE FROM {} WHERE subject_id = ?1 AND namespace = ?2",
870                    vec_tbl
871                ),
872                rusqlite::params![&from_str, &namespace],
873            )?;
874        }
875
876        // --- Tombstone from entity (ADR-014: soft-delete with provenance) ---
877        let merge_event_id = Uuid::new_v4();
878        conn.execute(
879            "UPDATE entities \
880             SET deleted_at = ?1, merged_into = ?2, merge_event_id = ?3, updated_at = ?1 \
881             WHERE namespace = ?4 AND id = ?5 AND deleted_at IS NULL",
882            rusqlite::params![
883                now,
884                into_str,
885                merge_event_id.to_string(),
886                &namespace,
887                &from_str,
888            ],
889        )?;
890    }
891
892    let updated_entity = Entity {
893        id: into_id,
894        namespace,
895        kind: into_entity.kind,
896        entity_type: into_entity.entity_type,
897        name: merged_name,
898        description: merged_description,
899        properties: merged_props,
900        tags: merged_tags,
901        created_at: into_entity.created_at,
902        updated_at: now,
903        deleted_at: into_entity.deleted_at,
904        merged_into: None,
905        merge_event_id: None,
906    };
907
908    Ok((
909        MergeSummary {
910            kept_id: into_id,
911            removed_id: from_id,
912            edges_rewired,
913            properties_merged,
914            tags_unioned,
915            content_appended: false,
916            dry_run,
917        },
918        updated_entity,
919    ))
920}
921
922// ---------------------------------------------------------------------------
923// Note merge SQL helpers
924// ---------------------------------------------------------------------------
925
926/// Read one note row by ID within a namespace, returning `SqliteError` on missing/wrong-ns.
927fn read_merge_note(
928    conn: &rusqlite::Connection,
929    id: Uuid,
930    namespace: &str,
931) -> Result<khive_storage::note::Note, SqliteError> {
932    use khive_storage::note::Note;
933    let id_str = id.to_string();
934    let mut stmt = conn.prepare(
935        "SELECT id, namespace, kind, status, name, content, salience, decay_factor, \
936         expires_at, properties, created_at, updated_at, deleted_at \
937         FROM notes WHERE id = ?1 AND deleted_at IS NULL",
938    )?;
939    let mut rows = stmt.query(rusqlite::params![id_str])?;
940    let row = rows
941        .next()?
942        .ok_or_else(|| SqliteError::InvalidData(format!("note {id} not found")))?;
943
944    let id_s: String = row.get(0)?;
945    let ns: String = row.get(1)?;
946    let kind: String = row.get(2)?;
947    let status: String = row.get(3)?;
948    let name: Option<String> = row.get(4)?;
949    let content: String = row.get(5)?;
950    let salience: Option<f64> = row.get(6)?;
951    let decay_factor: Option<f64> = row.get(7)?;
952    let expires_at: Option<i64> = row.get(8)?;
953    let properties_str: Option<String> = row.get(9)?;
954    let created_at: i64 = row.get(10)?;
955    let updated_at: i64 = row.get(11)?;
956    let deleted_at: Option<i64> = row.get(12)?;
957
958    if ns != namespace {
959        return Err(SqliteError::InvalidData(format!(
960            "note {id} belongs to namespace '{ns}', not '{namespace}'"
961        )));
962    }
963
964    let note_id = Uuid::parse_str(&id_s).map_err(|e| SqliteError::InvalidData(e.to_string()))?;
965    let properties: Option<serde_json::Value> = properties_str
966        .map(|s| serde_json::from_str(&s).map_err(|e| SqliteError::InvalidData(e.to_string())))
967        .transpose()?;
968
969    Ok(Note {
970        id: note_id,
971        namespace: ns,
972        kind,
973        status,
974        name,
975        content,
976        salience,
977        decay_factor,
978        expires_at,
979        properties,
980        created_at,
981        updated_at,
982        deleted_at,
983    })
984}
985
986fn max_option_f64(a: Option<f64>, b: Option<f64>) -> Option<f64> {
987    match (a, b) {
988        (Some(x), Some(y)) => Some(x.max(y)),
989        (Some(x), None) => Some(x),
990        (None, Some(y)) => Some(y),
991        (None, None) => None,
992    }
993}
994
995fn append_merge_history(props: Option<Value>, entry: Value) -> Result<Option<Value>, SqliteError> {
996    use serde_json::{json, Map};
997    let mut obj: Map<String, Value> = match props {
998        Some(Value::Object(m)) => m,
999        Some(other) => {
1000            let mut m = Map::new();
1001            m.insert("_value".into(), other);
1002            m
1003        }
1004        None => Map::new(),
1005    };
1006    let history = obj
1007        .entry("_merge_history".to_string())
1008        .or_insert_with(|| json!([]));
1009    if let Value::Array(arr) = history {
1010        arr.push(entry);
1011    }
1012    Ok(Some(Value::Object(obj)))
1013}
1014
1015/// All note merge SQL on one connection inside a `BEGIN IMMEDIATE` transaction.
1016///
1017/// Reads both notes (must have same `kind`), rewires/drops incident edges, merges content
1018/// per `content_strategy`, tombstones `from`. Returns the updated `into` note for async
1019/// re-embedding.
1020///
1021/// When `dry_run` is true, all reads and computations are performed but no writes are issued.
1022#[allow(clippy::too_many_arguments)]
1023fn merge_note_sql(
1024    conn: &rusqlite::Connection,
1025    namespace: String,
1026    fts_table: String,
1027    vec_table: Option<String>,
1028    into_id: Uuid,
1029    from_id: Uuid,
1030    strategy: EntityDedupMergePolicy,
1031    content_strategy: ContentMergeStrategy,
1032    dry_run: bool,
1033) -> Result<(MergeSummary, khive_storage::note::Note), SqliteError> {
1034    let into_note = read_merge_note(conn, into_id, &namespace)?;
1035    let from_note = read_merge_note(conn, from_id, &namespace)?;
1036
1037    if into_note.kind != from_note.kind {
1038        return Err(SqliteError::InvalidData(format!(
1039            "cannot merge notes of different kinds: {} vs {}",
1040            into_note.kind, from_note.kind
1041        )));
1042    }
1043
1044    let now = chrono::Utc::now().timestamp_micros();
1045    let into_str = into_id.to_string();
1046    let from_str = from_id.to_string();
1047
1048    // Collect edges incident to from_id.
1049    #[allow(dead_code)]
1050    struct EdgeRow {
1051        id: Uuid,
1052        source_id: Uuid,
1053        target_id: Uuid,
1054        relation: String,
1055        weight: f64,
1056        created_at: i64,
1057        updated_at: i64,
1058        deleted_at: Option<i64>,
1059        target_backend: Option<String>,
1060        metadata: Option<String>,
1061    }
1062    let parse_id =
1063        |s: String| Uuid::parse_str(&s).map_err(|e| SqliteError::InvalidData(e.to_string()));
1064
1065    let mut outbound: Vec<EdgeRow> = Vec::new();
1066    {
1067        let mut stmt = conn.prepare(
1068            "SELECT id, source_id, target_id, relation, weight, created_at, updated_at, deleted_at, target_backend, metadata \
1069             FROM graph_edges WHERE namespace = ?1 AND source_id = ?2",
1070        )?;
1071        let mut rows = stmt.query(rusqlite::params![&namespace, &from_str])?;
1072        while let Some(row) = rows.next()? {
1073            outbound.push(EdgeRow {
1074                id: parse_id(row.get(0)?)?,
1075                source_id: parse_id(row.get(1)?)?,
1076                target_id: parse_id(row.get(2)?)?,
1077                relation: row.get(3)?,
1078                weight: row.get(4)?,
1079                created_at: row.get(5)?,
1080                updated_at: row.get(6)?,
1081                deleted_at: row.get(7)?,
1082                target_backend: row.get(8)?,
1083                metadata: row.get(9)?,
1084            });
1085        }
1086    }
1087    let mut inbound: Vec<EdgeRow> = Vec::new();
1088    {
1089        let mut stmt = conn.prepare(
1090            "SELECT id, source_id, target_id, relation, weight, created_at, updated_at, deleted_at, target_backend, metadata \
1091             FROM graph_edges WHERE namespace = ?1 AND target_id = ?2",
1092        )?;
1093        let mut rows = stmt.query(rusqlite::params![&namespace, &from_str])?;
1094        while let Some(row) = rows.next()? {
1095            inbound.push(EdgeRow {
1096                id: parse_id(row.get(0)?)?,
1097                source_id: parse_id(row.get(1)?)?,
1098                target_id: parse_id(row.get(2)?)?,
1099                relation: row.get(3)?,
1100                weight: row.get(4)?,
1101                created_at: row.get(5)?,
1102                updated_at: row.get(6)?,
1103                deleted_at: row.get(7)?,
1104                target_backend: row.get(8)?,
1105                metadata: row.get(9)?,
1106            });
1107        }
1108    }
1109    let mut seen: HashSet<Uuid> = HashSet::new();
1110    let mut all_edges: Vec<EdgeRow> = Vec::new();
1111    for edge in outbound.into_iter().chain(inbound) {
1112        if seen.insert(edge.id) {
1113            all_edges.push(edge);
1114        }
1115    }
1116
1117    // Merge note fields.
1118    let (merged_content, content_appended) = match content_strategy {
1119        ContentMergeStrategy::Append => {
1120            if from_note.content.is_empty() {
1121                (into_note.content.clone(), false)
1122            } else {
1123                (
1124                    format!("{}\n\n---\n\n{}", into_note.content, from_note.content),
1125                    true,
1126                )
1127            }
1128        }
1129        ContentMergeStrategy::PreferInto => (into_note.content.clone(), false),
1130        ContentMergeStrategy::PreferFrom => (from_note.content.clone(), false),
1131    };
1132
1133    let merged_name = match strategy {
1134        EntityDedupMergePolicy::PreferFrom => from_note.name.clone().or(into_note.name.clone()),
1135        _ => into_note.name.clone().or(from_note.name.clone()),
1136    };
1137
1138    let (merged_props, properties_merged) =
1139        merge_properties(&into_note.properties, &from_note.properties, strategy);
1140
1141    // Append merge history to properties.
1142    let merge_history_entry = serde_json::json!({
1143        "merged_from": from_id.to_string(),
1144        "merged_at": now,
1145        "strategy": format!("{:?}", strategy),
1146        "content_strategy": format!("{:?}", content_strategy),
1147    });
1148    let merged_props = append_merge_history(merged_props, merge_history_entry)?;
1149
1150    let merged_salience = max_option_f64(into_note.salience, from_note.salience);
1151    let merged_expires_at = match (into_note.expires_at, from_note.expires_at) {
1152        (Some(a), Some(b)) => Some(a.max(b)),
1153        (Some(a), None) => Some(a),
1154        (None, Some(b)) => Some(b),
1155        (None, None) => None,
1156    };
1157
1158    let props_str = merged_props
1159        .as_ref()
1160        .map(|v| serde_json::to_string(v).unwrap_or_default());
1161
1162    let mut edges_rewired = 0usize;
1163    if !dry_run {
1164        // Rewire and upsert.
1165        for edge in all_edges {
1166            let new_src = if edge.source_id == from_id {
1167                into_id
1168            } else {
1169                edge.source_id
1170            };
1171            let new_tgt = if edge.target_id == from_id {
1172                into_id
1173            } else {
1174                edge.target_id
1175            };
1176            if new_src == new_tgt {
1177                conn.execute(
1178                    "DELETE FROM graph_edges WHERE namespace = ?1 AND id = ?2",
1179                    rusqlite::params![&namespace, edge.id.to_string()],
1180                )?;
1181                continue;
1182            }
1183            let now_ts = chrono::Utc::now().timestamp();
1184            conn.execute(
1185                "INSERT INTO graph_edges \
1186                 (namespace, id, source_id, target_id, relation, weight, created_at, updated_at, deleted_at, target_backend, metadata) \
1187                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11) \
1188                 ON CONFLICT(namespace, id) DO UPDATE SET \
1189                     source_id = excluded.source_id, \
1190                     target_id = excluded.target_id, \
1191                     relation = excluded.relation, \
1192                     weight = excluded.weight, \
1193                     updated_at = excluded.updated_at, \
1194                     metadata = excluded.metadata \
1195                 ON CONFLICT(namespace, source_id, target_id, relation) DO NOTHING",
1196                rusqlite::params![
1197                    &namespace,
1198                    edge.id.to_string(),
1199                    new_src.to_string(),
1200                    new_tgt.to_string(),
1201                    &edge.relation,
1202                    edge.weight,
1203                    edge.created_at,
1204                    now_ts,
1205                    edge.deleted_at,
1206                    edge.target_backend,
1207                    edge.metadata,
1208                ],
1209            )?;
1210            edges_rewired += 1;
1211        }
1212
1213        // Upsert merged into-note.
1214        conn.execute(
1215            "INSERT OR REPLACE INTO notes \
1216             (id, namespace, kind, status, name, content, salience, decay_factor, \
1217              expires_at, properties, created_at, updated_at, deleted_at) \
1218             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
1219            rusqlite::params![
1220                &into_str,
1221                &namespace,
1222                &into_note.kind,
1223                &into_note.status,
1224                &merged_name,
1225                &merged_content,
1226                merged_salience,
1227                into_note.decay_factor,
1228                merged_expires_at,
1229                &props_str,
1230                into_note.created_at,
1231                now,
1232                into_note.deleted_at,
1233            ],
1234        )?;
1235
1236        // Update FTS for into-note.
1237        conn.execute(
1238            &format!(
1239                "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
1240                fts_table
1241            ),
1242            rusqlite::params![&namespace, &into_str],
1243        )?;
1244        conn.execute(
1245            &format!(
1246                "INSERT INTO {} \
1247                 (subject_id, kind, title, body, tags, namespace, metadata, updated_at) \
1248                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1249                fts_table
1250            ),
1251            rusqlite::params![
1252                &into_str,
1253                SubstrateKind::Note.to_string(),
1254                &merged_name,
1255                &merged_content,
1256                "[]",
1257                &namespace,
1258                &props_str,
1259                now,
1260            ],
1261        )?;
1262
1263        // Delete from-note from FTS.
1264        conn.execute(
1265            &format!(
1266                "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
1267                fts_table
1268            ),
1269            rusqlite::params![&namespace, &from_str],
1270        )?;
1271
1272        // Delete from-note from vector index if configured.
1273        if let Some(ref vec_tbl) = vec_table {
1274            conn.execute(
1275                &format!(
1276                    "DELETE FROM {} WHERE subject_id = ?1 AND namespace = ?2",
1277                    vec_tbl
1278                ),
1279                rusqlite::params![&from_str, &namespace],
1280            )?;
1281        }
1282
1283        // Tombstone the from-note.
1284        conn.execute(
1285            "UPDATE notes SET status = 'deleted', deleted_at = ?1, updated_at = ?1 \
1286             WHERE namespace = ?2 AND id = ?3 AND deleted_at IS NULL",
1287            rusqlite::params![now, &namespace, &from_str],
1288        )?;
1289    }
1290
1291    let updated_note = khive_storage::note::Note {
1292        id: into_id,
1293        namespace: namespace.clone(),
1294        kind: into_note.kind.clone(),
1295        status: into_note.status.clone(),
1296        name: merged_name,
1297        content: merged_content,
1298        salience: merged_salience,
1299        decay_factor: into_note.decay_factor,
1300        expires_at: merged_expires_at,
1301        properties: merged_props,
1302        created_at: into_note.created_at,
1303        updated_at: now,
1304        deleted_at: into_note.deleted_at,
1305    };
1306
1307    Ok((
1308        MergeSummary {
1309            kept_id: into_id,
1310            removed_id: from_id,
1311            edges_rewired,
1312            properties_merged,
1313            tags_unioned: 0,
1314            content_appended,
1315            dry_run,
1316        },
1317        updated_note,
1318    ))
1319}
1320
1321// ---------------------------------------------------------------------------
1322// Merge helpers (pure functions — easier to unit test)
1323// ---------------------------------------------------------------------------
1324
1325fn merge_string_field(into: &str, from: &str, strategy: EntityDedupMergePolicy) -> String {
1326    match strategy {
1327        EntityDedupMergePolicy::PreferInto | EntityDedupMergePolicy::Union => into.to_string(),
1328        EntityDedupMergePolicy::PreferFrom => from.to_string(),
1329    }
1330}
1331
1332fn merge_option_string_field(
1333    into: &Option<String>,
1334    from: &Option<String>,
1335    strategy: EntityDedupMergePolicy,
1336) -> Option<String> {
1337    match strategy {
1338        EntityDedupMergePolicy::PreferInto => {
1339            if into.is_some() {
1340                into.clone()
1341            } else {
1342                from.clone()
1343            }
1344        }
1345        EntityDedupMergePolicy::PreferFrom => {
1346            if from.is_some() {
1347                from.clone()
1348            } else {
1349                into.clone()
1350            }
1351        }
1352        EntityDedupMergePolicy::Union => {
1353            // Keep into's description; if empty, append from's.
1354            match (into, from) {
1355                (Some(a), _) if !a.is_empty() => Some(a.clone()),
1356                (_, Some(b)) => Some(b.clone()),
1357                _ => None,
1358            }
1359        }
1360    }
1361}
1362
1363/// Merge two property objects. Returns (merged, count_of_fields_from_from_that_were_added).
1364fn merge_properties(
1365    into: &Option<Value>,
1366    from: &Option<Value>,
1367    strategy: EntityDedupMergePolicy,
1368) -> (Option<Value>, usize) {
1369    match (into, from) {
1370        (None, None) => (None, 0),
1371        (Some(a), None) => (Some(a.clone()), 0),
1372        (None, Some(b)) => {
1373            let count = if let Value::Object(m) = b { m.len() } else { 1 };
1374            (Some(b.clone()), count)
1375        }
1376        (Some(into_val), Some(from_val)) => {
1377            let (merged, added) = merge_json(into_val, from_val, strategy);
1378            (Some(merged), added)
1379        }
1380    }
1381}
1382
1383/// Deep-merge two JSON values per strategy. Returns (merged, keys_contributed_by_from).
1384fn merge_json(into: &Value, from: &Value, strategy: EntityDedupMergePolicy) -> (Value, usize) {
1385    match (into, from, strategy) {
1386        (Value::Object(a), Value::Object(b), EntityDedupMergePolicy::Union) => {
1387            let mut result = a.clone();
1388            let mut added = 0usize;
1389            for (k, v_from) in b {
1390                if let Some(v_into) = a.get(k) {
1391                    let (merged, sub_added) =
1392                        merge_json(v_into, v_from, EntityDedupMergePolicy::Union);
1393                    result.insert(k.clone(), merged);
1394                    added += sub_added;
1395                } else {
1396                    result.insert(k.clone(), v_from.clone());
1397                    added += 1;
1398                }
1399            }
1400            (Value::Object(result), added)
1401        }
1402        (Value::Object(a), Value::Object(b), EntityDedupMergePolicy::PreferInto) => {
1403            let mut result = a.clone();
1404            let mut added = 0usize;
1405            for (k, v) in b {
1406                if !a.contains_key(k) {
1407                    result.insert(k.clone(), v.clone());
1408                    added += 1;
1409                }
1410            }
1411            (Value::Object(result), added)
1412        }
1413        (Value::Object(a), Value::Object(b), EntityDedupMergePolicy::PreferFrom) => {
1414            let mut result = a.clone();
1415            let mut added = 0usize;
1416            for (k, v) in b {
1417                result.insert(k.clone(), v.clone());
1418                if !a.contains_key(k) {
1419                    added += 1;
1420                }
1421            }
1422            (Value::Object(result), added)
1423        }
1424        // Non-object scalars: apply strategy directly.
1425        (_into_val, from_val, EntityDedupMergePolicy::PreferFrom) => (from_val.clone(), 1),
1426        _ => (into.clone(), 0),
1427    }
1428}
1429
1430fn union_tags(into: &[String], from: &[String]) -> (Vec<String>, usize) {
1431    let mut seen: HashSet<&str> = into.iter().map(|s| s.as_str()).collect();
1432    let mut result: Vec<String> = into.to_vec();
1433    let mut added = 0usize;
1434    for tag in from {
1435        if seen.insert(tag.as_str()) {
1436            result.push(tag.clone());
1437            added += 1;
1438        }
1439    }
1440    (result, added)
1441}
1442
1443// ---------------------------------------------------------------------------
1444// Unit tests
1445// ---------------------------------------------------------------------------
1446
1447#[cfg(test)]
1448mod tests {
1449    use super::*;
1450    use crate::runtime::{KhiveRuntime, NamespaceToken};
1451    use khive_storage::types::{Direction, TextFilter, TextQueryMode, TextSearchRequest};
1452
1453    fn rt() -> KhiveRuntime {
1454        KhiveRuntime::memory().unwrap()
1455    }
1456
1457    // Helper: search FTS5 for `query` in a runtime namespace.
1458    async fn fts_hit(rt: &KhiveRuntime, token: &NamespaceToken, query: &str) -> Vec<Uuid> {
1459        let ns = token.namespace().as_str().to_string();
1460        rt.text(token)
1461            .unwrap()
1462            .search(TextSearchRequest {
1463                query: query.to_string(),
1464                mode: TextQueryMode::Plain,
1465                filter: Some(TextFilter {
1466                    namespaces: vec![ns],
1467                    ..Default::default()
1468                }),
1469                top_k: 50,
1470                snippet_chars: 100,
1471            })
1472            .await
1473            .unwrap()
1474            .into_iter()
1475            .map(|h| h.subject_id)
1476            .collect()
1477    }
1478
1479    #[tokio::test]
1480    async fn update_entity_patch_changes_only_specified_fields() {
1481        let rt = rt();
1482        let tok = NamespaceToken::local();
1483        let entity = rt
1484            .create_entity(
1485                &tok,
1486                "concept",
1487                None,
1488                "OriginalName",
1489                Some("orig desc"),
1490                Some(serde_json::json!({"k":"v"})),
1491                vec![],
1492            )
1493            .await
1494            .unwrap();
1495
1496        let updated = rt
1497            .update_entity(
1498                &tok,
1499                entity.id,
1500                EntityPatch {
1501                    description: Some(Some("new desc".to_string())),
1502                    ..Default::default()
1503                },
1504            )
1505            .await
1506            .unwrap();
1507
1508        assert_eq!(updated.name, "OriginalName");
1509        assert_eq!(updated.description.as_deref(), Some("new desc"));
1510        assert_eq!(updated.properties, Some(serde_json::json!({"k":"v"})));
1511    }
1512
1513    #[tokio::test]
1514    async fn update_entity_clear_description_with_some_none() {
1515        let rt = rt();
1516        let tok = NamespaceToken::local();
1517        let entity = rt
1518            .create_entity(
1519                &tok,
1520                "concept",
1521                None,
1522                "ClearDesc",
1523                Some("has description"),
1524                None,
1525                vec![],
1526            )
1527            .await
1528            .unwrap();
1529
1530        let updated = rt
1531            .update_entity(
1532                &tok,
1533                entity.id,
1534                EntityPatch {
1535                    description: Some(None),
1536                    ..Default::default()
1537                },
1538            )
1539            .await
1540            .unwrap();
1541
1542        assert!(
1543            updated.description.is_none(),
1544            "description should be cleared"
1545        );
1546    }
1547
1548    #[tokio::test]
1549    async fn update_entity_reindexes_when_name_changes() {
1550        let rt = rt();
1551        let tok = NamespaceToken::local();
1552        let entity = rt
1553            .create_entity(&tok, "concept", None, "OldName", None, None, vec![])
1554            .await
1555            .unwrap();
1556
1557        // Old name is findable.
1558        let hits_before = fts_hit(&rt, &tok, "OldName").await;
1559        assert!(
1560            hits_before.contains(&entity.id),
1561            "entity should be findable by old name"
1562        );
1563
1564        rt.update_entity(
1565            &tok,
1566            entity.id,
1567            EntityPatch {
1568                name: Some("NewName".to_string()),
1569                ..Default::default()
1570            },
1571        )
1572        .await
1573        .unwrap();
1574
1575        let hits_old = fts_hit(&rt, &tok, "OldName").await;
1576        let hits_new = fts_hit(&rt, &tok, "NewName").await;
1577
1578        // After rename, old name no longer matches this entity (FTS index updated).
1579        assert!(
1580            !hits_old.contains(&entity.id),
1581            "old name should no longer match after rename"
1582        );
1583        assert!(
1584            hits_new.contains(&entity.id),
1585            "new name should be findable after rename"
1586        );
1587    }
1588
1589    #[tokio::test]
1590    async fn update_entity_properties_merges_preserving_existing_keys() {
1591        let rt = rt();
1592        let tok = NamespaceToken::local();
1593        let entity = rt
1594            .create_entity(
1595                &tok,
1596                "concept",
1597                None,
1598                "MergeProps",
1599                None,
1600                Some(serde_json::json!({
1601                    "domain": "inference",
1602                    "repo": "lattice",
1603                    "status": "researched",
1604                })),
1605                vec![],
1606            )
1607            .await
1608            .unwrap();
1609
1610        let updated = rt
1611            .update_entity(
1612                &tok,
1613                entity.id,
1614                EntityPatch {
1615                    properties: Some(serde_json::json!({"status": "implemented"})),
1616                    ..Default::default()
1617                },
1618            )
1619            .await
1620            .unwrap();
1621
1622        let props = updated.properties.expect("properties should remain set");
1623        assert_eq!(props["domain"], "inference", "domain key must be preserved");
1624        assert_eq!(props["repo"], "lattice", "repo key must be preserved");
1625        assert_eq!(
1626            props["status"], "implemented",
1627            "status key must be updated by patch"
1628        );
1629    }
1630
1631    #[tokio::test]
1632    async fn update_entity_skips_reindex_when_only_properties_change() {
1633        let rt = rt();
1634        let tok = NamespaceToken::local();
1635        let entity = rt
1636            .create_entity(&tok, "concept", None, "StableIndexed", None, None, vec![])
1637            .await
1638            .unwrap();
1639
1640        // Verify it's in the index before.
1641        let hits_before = fts_hit(&rt, &tok, "StableIndexed").await;
1642        assert!(hits_before.contains(&entity.id));
1643
1644        // Only patch properties — text index should be untouched (still findable).
1645        rt.update_entity(
1646            &tok,
1647            entity.id,
1648            EntityPatch {
1649                properties: Some(serde_json::json!({"new": "prop"})),
1650                ..Default::default()
1651            },
1652        )
1653        .await
1654        .unwrap();
1655
1656        let hits_after = fts_hit(&rt, &tok, "StableIndexed").await;
1657        assert!(
1658            hits_after.contains(&entity.id),
1659            "still findable after props-only patch"
1660        );
1661    }
1662
1663    #[tokio::test]
1664    async fn merge_entity_rewires_edges() {
1665        let rt = rt();
1666        let tok = NamespaceToken::local();
1667        let a = rt
1668            .create_entity(&tok, "concept", None, "A", None, None, vec![])
1669            .await
1670            .unwrap();
1671        let b = rt
1672            .create_entity(&tok, "concept", None, "B", None, None, vec![])
1673            .await
1674            .unwrap();
1675        let c = rt
1676            .create_entity(&tok, "concept", None, "C", None, None, vec![])
1677            .await
1678            .unwrap();
1679        let d = rt
1680            .create_entity(&tok, "concept", None, "D", None, None, vec![])
1681            .await
1682            .unwrap();
1683
1684        // A→B and C→B; merge B into D → should become A→D and C→D.
1685        rt.link(&tok, a.id, b.id, EdgeRelation::Extends, 1.0, None)
1686            .await
1687            .unwrap();
1688        rt.link(&tok, c.id, b.id, EdgeRelation::Extends, 1.0, None)
1689            .await
1690            .unwrap();
1691
1692        let summary = rt
1693            .merge_entity(&tok, d.id, b.id, EntityDedupMergePolicy::PreferInto, false)
1694            .await
1695            .unwrap();
1696
1697        assert_eq!(summary.kept_id, d.id);
1698        assert_eq!(summary.removed_id, b.id);
1699        assert_eq!(summary.edges_rewired, 2);
1700
1701        // Verify edges now point to D.
1702        let a_neighbors = rt
1703            .neighbors(&tok, a.id, Direction::Out, None, None)
1704            .await
1705            .unwrap();
1706        assert_eq!(a_neighbors.len(), 1);
1707        assert_eq!(a_neighbors[0].node_id, d.id);
1708
1709        let c_neighbors = rt
1710            .neighbors(&tok, c.id, Direction::Out, None, None)
1711            .await
1712            .unwrap();
1713        assert_eq!(c_neighbors.len(), 1);
1714        assert_eq!(c_neighbors[0].node_id, d.id);
1715    }
1716
1717    #[tokio::test]
1718    async fn merge_entity_self_merge_rejected() {
1719        let rt = rt();
1720        let tok = NamespaceToken::local();
1721        let a = rt
1722            .create_entity(&tok, "concept", None, "A", None, None, vec![])
1723            .await
1724            .unwrap();
1725        let err = rt
1726            .merge_entity(&tok, a.id, a.id, EntityDedupMergePolicy::PreferInto, false)
1727            .await
1728            .unwrap_err();
1729        assert!(
1730            format!("{err:?}").contains("cannot merge an entity into itself"),
1731            "expected self-merge rejection, got: {err:?}"
1732        );
1733    }
1734
1735    #[tokio::test]
1736    async fn merge_entity_prefer_into_strategy() {
1737        let rt = rt();
1738        let tok = NamespaceToken::local();
1739        let into = rt
1740            .create_entity(
1741                &tok,
1742                "concept",
1743                None,
1744                "Into",
1745                None,
1746                Some(serde_json::json!({"a": 1})),
1747                vec![],
1748            )
1749            .await
1750            .unwrap();
1751        let from = rt
1752            .create_entity(
1753                &tok,
1754                "concept",
1755                None,
1756                "From",
1757                None,
1758                Some(serde_json::json!({"a": 2, "b": 3})),
1759                vec![],
1760            )
1761            .await
1762            .unwrap();
1763
1764        rt.merge_entity(
1765            &tok,
1766            into.id,
1767            from.id,
1768            EntityDedupMergePolicy::PreferInto,
1769            false,
1770        )
1771        .await
1772        .unwrap();
1773
1774        let kept = rt.get_entity(&tok, into.id).await.unwrap();
1775        let props = kept.properties.unwrap();
1776        // a stays as 1 (into wins), b is added from from.
1777        assert_eq!(props["a"], 1);
1778        assert_eq!(props["b"], 3);
1779    }
1780
1781    #[tokio::test]
1782    async fn merge_entity_prefer_from_strategy() {
1783        let rt = rt();
1784        let tok = NamespaceToken::local();
1785        let into = rt
1786            .create_entity(
1787                &tok,
1788                "concept",
1789                None,
1790                "Into",
1791                None,
1792                Some(serde_json::json!({"a": 1})),
1793                vec![],
1794            )
1795            .await
1796            .unwrap();
1797        let from = rt
1798            .create_entity(
1799                &tok,
1800                "concept",
1801                None,
1802                "From",
1803                None,
1804                Some(serde_json::json!({"a": 2, "b": 3})),
1805                vec![],
1806            )
1807            .await
1808            .unwrap();
1809
1810        rt.merge_entity(
1811            &tok,
1812            into.id,
1813            from.id,
1814            EntityDedupMergePolicy::PreferFrom,
1815            false,
1816        )
1817        .await
1818        .unwrap();
1819
1820        let kept = rt.get_entity(&tok, into.id).await.unwrap();
1821        let props = kept.properties.unwrap();
1822        // from wins on a, b also from from.
1823        assert_eq!(props["a"], 2);
1824        assert_eq!(props["b"], 3);
1825    }
1826
1827    #[tokio::test]
1828    async fn merge_entity_union_strategy() {
1829        let rt = rt();
1830        let tok = NamespaceToken::local();
1831        let into = rt
1832            .create_entity(
1833                &tok,
1834                "concept",
1835                None,
1836                "Into",
1837                None,
1838                Some(serde_json::json!({"a": 1})),
1839                vec![],
1840            )
1841            .await
1842            .unwrap();
1843        let from = rt
1844            .create_entity(
1845                &tok,
1846                "concept",
1847                None,
1848                "From",
1849                None,
1850                Some(serde_json::json!({"a": 2, "b": 3})),
1851                vec![],
1852            )
1853            .await
1854            .unwrap();
1855
1856        rt.merge_entity(&tok, into.id, from.id, EntityDedupMergePolicy::Union, false)
1857            .await
1858            .unwrap();
1859
1860        let kept = rt.get_entity(&tok, into.id).await.unwrap();
1861        let props = kept.properties.unwrap();
1862        // Scalar conflict: into wins → a=1. b added from from.
1863        assert_eq!(props["a"], 1);
1864        assert_eq!(props["b"], 3);
1865    }
1866
1867    #[tokio::test]
1868    async fn merge_entity_unions_tags() {
1869        let rt = rt();
1870        let tok = NamespaceToken::local();
1871        let into = rt
1872            .create_entity(
1873                &tok,
1874                "concept",
1875                None,
1876                "Into",
1877                None,
1878                None,
1879                vec!["x".to_string(), "y".to_string()],
1880            )
1881            .await
1882            .unwrap();
1883        let from = rt
1884            .create_entity(
1885                &tok,
1886                "concept",
1887                None,
1888                "From",
1889                None,
1890                None,
1891                vec!["y".to_string(), "z".to_string()],
1892            )
1893            .await
1894            .unwrap();
1895
1896        rt.merge_entity(
1897            &tok,
1898            into.id,
1899            from.id,
1900            EntityDedupMergePolicy::PreferInto,
1901            false,
1902        )
1903        .await
1904        .unwrap();
1905
1906        let kept = rt.get_entity(&tok, into.id).await.unwrap();
1907        let mut tags = kept.tags.clone();
1908        tags.sort();
1909        assert_eq!(tags, vec!["x", "y", "z"]);
1910    }
1911
1912    #[tokio::test]
1913    async fn merge_entity_drops_self_loops() {
1914        let rt = rt();
1915        let tok = NamespaceToken::local();
1916        let a = rt
1917            .create_entity(&tok, "concept", None, "A", None, None, vec![])
1918            .await
1919            .unwrap();
1920        let b = rt
1921            .create_entity(&tok, "concept", None, "B", None, None, vec![])
1922            .await
1923            .unwrap();
1924
1925        // A `extends` B — merging B into A would produce A `extends` A → drop it.
1926        rt.link(&tok, a.id, b.id, EdgeRelation::Extends, 1.0, None)
1927            .await
1928            .unwrap();
1929
1930        let summary = rt
1931            .merge_entity(&tok, a.id, b.id, EntityDedupMergePolicy::PreferInto, false)
1932            .await
1933            .unwrap();
1934
1935        assert_eq!(
1936            summary.edges_rewired, 0,
1937            "self-loop should be dropped, not rewired"
1938        );
1939
1940        let a_out = rt
1941            .neighbors(&tok, a.id, Direction::Out, None, None)
1942            .await
1943            .unwrap();
1944        assert!(a_out.is_empty(), "no self-loop should remain");
1945    }
1946
1947    // ---- merge helper unit tests ----
1948
1949    #[test]
1950    fn union_tags_deduplicates() {
1951        let (tags, added) = union_tags(
1952            &["x".to_string(), "y".to_string()],
1953            &["y".to_string(), "z".to_string()],
1954        );
1955        let mut sorted = tags.clone();
1956        sorted.sort();
1957        assert_eq!(sorted, vec!["x", "y", "z"]);
1958        assert_eq!(added, 1);
1959    }
1960
1961    #[test]
1962    fn merge_properties_prefer_into_fills_missing_keys() {
1963        let a = serde_json::json!({"a": 1});
1964        let b = serde_json::json!({"a": 99, "b": 2});
1965        let (merged, added) =
1966            merge_properties(&Some(a), &Some(b), EntityDedupMergePolicy::PreferInto);
1967        let m = merged.unwrap();
1968        assert_eq!(m["a"], 1);
1969        assert_eq!(m["b"], 2);
1970        assert_eq!(added, 1);
1971    }
1972
1973    // ---- tombstone and note merge tests ----
1974
1975    #[tokio::test]
1976    async fn merge_entity_tombstones_source_with_provenance() {
1977        let rt = rt();
1978        let tok = NamespaceToken::local();
1979        let into = rt
1980            .create_entity(&tok, "concept", None, "Into", None, None, vec![])
1981            .await
1982            .unwrap();
1983        let from = rt
1984            .create_entity(&tok, "concept", None, "From", None, None, vec![])
1985            .await
1986            .unwrap();
1987        let from_id = from.id;
1988
1989        rt.merge_entity(
1990            &tok,
1991            into.id,
1992            from_id,
1993            EntityDedupMergePolicy::PreferInto,
1994            false,
1995        )
1996        .await
1997        .unwrap();
1998
1999        // After merge, get_entity returns an error (soft-deleted rows are excluded).
2000        assert!(
2001            rt.get_entity(&tok, from_id).await.is_err(),
2002            "tombstoned source should not be returned by get_entity"
2003        );
2004
2005        // Verify the source row still exists in SQL with provenance.
2006        let pool = rt.backend().pool_arc();
2007        let (deleted_at, merged_into): (Option<i64>, Option<String>) =
2008            tokio::task::spawn_blocking(move || {
2009                let guard = pool.writer().unwrap();
2010                guard
2011                    .conn()
2012                    .query_row(
2013                        "SELECT deleted_at, merged_into FROM entities WHERE id = ?1",
2014                        [from_id.to_string()],
2015                        |row| Ok((row.get(0)?, row.get(1)?)),
2016                    )
2017                    .unwrap()
2018            })
2019            .await
2020            .unwrap();
2021        assert!(
2022            deleted_at.is_some(),
2023            "tombstoned entity must have deleted_at set"
2024        );
2025        assert_eq!(
2026            merged_into.as_deref(),
2027            Some(into.id.to_string().as_str()),
2028            "merged_into must point to into_id"
2029        );
2030    }
2031
2032    #[tokio::test]
2033    async fn merge_note_same_kind_appends_content() {
2034        let rt = rt();
2035        let tok = NamespaceToken::local();
2036        let into = rt
2037            .create_note(
2038                &tok,
2039                "observation",
2040                None,
2041                "Into content",
2042                None,
2043                None,
2044                vec![],
2045            )
2046            .await
2047            .unwrap();
2048        let from = rt
2049            .create_note(
2050                &tok,
2051                "observation",
2052                None,
2053                "From content",
2054                None,
2055                None,
2056                vec![],
2057            )
2058            .await
2059            .unwrap();
2060        let from_id = from.id;
2061
2062        let summary = rt
2063            .merge_note(
2064                &tok,
2065                into.id,
2066                from_id,
2067                EntityDedupMergePolicy::PreferInto,
2068                ContentMergeStrategy::Append,
2069                false,
2070            )
2071            .await
2072            .unwrap();
2073
2074        assert_eq!(summary.kept_id, into.id);
2075        assert_eq!(summary.removed_id, from_id);
2076        assert!(summary.content_appended);
2077        assert!(!summary.dry_run);
2078
2079        // Source is no longer findable.
2080        let from_store = rt.notes(&tok).unwrap();
2081        assert!(
2082            from_store.get_note(from_id).await.unwrap().is_none(),
2083            "merged-from note should be soft-deleted"
2084        );
2085    }
2086
2087    #[tokio::test]
2088    async fn merge_note_different_kinds_rejected() {
2089        let rt = rt();
2090        let tok = NamespaceToken::local();
2091        let into = rt
2092            .create_note(&tok, "observation", None, "Into", None, None, vec![])
2093            .await
2094            .unwrap();
2095        let from = rt
2096            .create_note(&tok, "decision", None, "From", None, None, vec![])
2097            .await
2098            .unwrap();
2099
2100        let result = rt
2101            .merge_note(
2102                &tok,
2103                into.id,
2104                from.id,
2105                EntityDedupMergePolicy::PreferInto,
2106                ContentMergeStrategy::Append,
2107                false,
2108            )
2109            .await;
2110        assert!(result.is_err(), "merging different note kinds must fail");
2111    }
2112
2113    #[tokio::test]
2114    async fn merge_note_dry_run_leaves_notes_unchanged() {
2115        let rt = rt();
2116        let tok = NamespaceToken::local();
2117        let into = rt
2118            .create_note(
2119                &tok,
2120                "observation",
2121                None,
2122                "Into content",
2123                None,
2124                None,
2125                vec![],
2126            )
2127            .await
2128            .unwrap();
2129        let from = rt
2130            .create_note(
2131                &tok,
2132                "observation",
2133                None,
2134                "From content",
2135                None,
2136                None,
2137                vec![],
2138            )
2139            .await
2140            .unwrap();
2141        let into_id = into.id;
2142        let from_id = from.id;
2143
2144        let summary = rt
2145            .merge_note(
2146                &tok,
2147                into_id,
2148                from_id,
2149                EntityDedupMergePolicy::PreferInto,
2150                ContentMergeStrategy::Append,
2151                true,
2152            )
2153            .await
2154            .unwrap();
2155
2156        assert!(summary.dry_run);
2157
2158        // Both notes still exist unchanged.
2159        let store = rt.notes(&tok).unwrap();
2160        let into_after = store.get_note(into_id).await.unwrap().unwrap();
2161        let from_after = store.get_note(from_id).await.unwrap().unwrap();
2162        assert_eq!(
2163            into_after.content, "Into content",
2164            "dry_run must not mutate into-note"
2165        );
2166        assert_eq!(
2167            from_after.content, "From content",
2168            "dry_run must not mutate from-note"
2169        );
2170    }
2171
2172    #[tokio::test]
2173    async fn update_edge_updates_properties() {
2174        use khive_storage::EdgeRelation;
2175        let rt = rt();
2176        let tok = NamespaceToken::local();
2177        let a = rt
2178            .create_entity(&tok, "concept", None, "A", None, None, vec![])
2179            .await
2180            .unwrap();
2181        let b = rt
2182            .create_entity(&tok, "concept", None, "B", None, None, vec![])
2183            .await
2184            .unwrap();
2185        let edge = rt
2186            .link(&tok, a.id, b.id, EdgeRelation::Extends, 0.5, None)
2187            .await
2188            .unwrap();
2189        let edge_id: Uuid = edge.id.into();
2190
2191        let updated = rt
2192            .update_edge(
2193                &tok,
2194                edge_id,
2195                EdgePatch {
2196                    properties: Some(serde_json::json!({"source": "manual"})),
2197                    ..Default::default()
2198                },
2199            )
2200            .await
2201            .unwrap();
2202
2203        assert_eq!(updated.metadata.as_ref().unwrap()["source"], "manual");
2204        assert!((updated.weight - 0.5).abs() < 0.001, "weight unchanged");
2205    }
2206}