Skip to main content

khive_runtime/
curation.rs

1// Licensed under the Apache License, Version 2.0.
2
3// FILE SIZE JUSTIFICATION: curation.rs holds entity/note/edge patch types alongside
4// their update and merge implementations. The implementations share private helpers
5// (merge_properties, namespace checks, dedup policy) that need pub(crate) access to
6// runtime internals. Inline tests cover merge semantics that require direct access to
7// those helpers. Split plan: extract patch types into `curation/patch.rs` and merge
8// logic into `curation/merge.rs` once the dedup policy API stabilises.
9//! Curation operations: entity update/merge and edge-list filter type.
10
11use std::collections::HashSet;
12
13use serde::{Deserialize, Serialize};
14use serde_json::Value;
15use uuid::Uuid;
16
17use khive_db::SqliteError;
18use khive_storage::note::Note;
19use khive_storage::types::{EdgeFilter, TextDocument};
20use khive_storage::{EdgeRelation, Entity, SubstrateKind};
21use khive_types::EventKind;
22use rusqlite::OptionalExtension;
23
24use crate::error::{RuntimeError, RuntimeResult};
25use crate::operations::canonical_edge_endpoints;
26use crate::runtime::{KhiveRuntime, NamespaceToken};
27
28// ---------------------------------------------------------------------------
29// Public types
30// ---------------------------------------------------------------------------
31
32/// Patch for `update_entity`. Only `Some(_)` fields are applied; `None` means "leave unchanged".
33///
34/// For `description`:
35/// - `None` (outer) — leave the current description as-is
36/// - `Some(None)` — clear the description (set to NULL)
37/// - `Some(Some(s))` — set the description to `s`
38///
39/// For `properties` (deep-merge semantics):
40/// - `None` — leave properties as-is
41/// - `Some(value)` — deep-merge `value` into existing properties. Keys present in
42///   the patch overwrite existing keys; keys absent from the patch are preserved.
43///   Removing a key requires explicit replacement of the parent object (or a future
44///   `unset`/`null-marker` extension).
45///
46/// For `tags` — replace semantics: `Some(vec)` sets tags to exactly `vec`. To add
47/// a tag without losing existing tags, read the entity first, push the new tag,
48/// and pass the full list back.
49#[derive(Clone, Debug, Default)]
50pub struct EntityPatch {
51    pub name: Option<String>,
52    pub description: Option<Option<String>>,
53    pub properties: Option<Value>,
54    pub tags: Option<Vec<String>>,
55}
56
57/// Policy used when deduplicating two entities.
58#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
59#[serde(rename_all = "snake_case")]
60pub enum EntityDedupMergePolicy {
61    /// `into` values win on conflict. Tags are unioned. Properties from `from` fill in
62    /// keys that `into` doesn't have. This is the default.
63    #[default]
64    PreferInto,
65    /// `from` values win on conflict.
66    PreferFrom,
67    /// Deep-merge: object properties merge recursively. Scalar conflicts go to `into`.
68    Union,
69}
70
71/// Strategy for merging note content when two notes are combined.
72#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
73#[serde(rename_all = "snake_case")]
74pub enum ContentMergeStrategy {
75    #[default]
76    Append,
77    PreferInto,
78    PreferFrom,
79}
80
81/// Result returned by `merge_entity` / `merge_note`.
82#[derive(Clone, Debug, Serialize, Deserialize)]
83pub struct MergeSummary {
84    pub kept_id: Uuid,
85    pub removed_id: Uuid,
86    pub edges_rewired: usize,
87    pub properties_merged: usize,
88    pub tags_unioned: usize,
89    pub content_appended: bool,
90    pub dry_run: bool,
91}
92
93/// Patch for `update_edge`. Only `Some(_)` fields are applied; `None` means "leave unchanged".
94///
95/// For `properties` — replacement semantics (not deep merge): `Some(value)` replaces
96/// the entire metadata object. `None` leaves metadata unchanged.
97#[derive(Clone, Debug, Default)]
98pub struct EdgePatch {
99    pub relation: Option<EdgeRelation>,
100    pub weight: Option<f64>,
101    pub properties: Option<Value>,
102}
103
104/// Patch for `update_note`. Only `Some(_)` fields are applied; `None` means "leave unchanged".
105///
106/// For `salience`/`decay_factor`:
107/// - `None` (outer) — leave unchanged
108/// - `Some(None)` — clear the value
109/// - `Some(Some(v))` — set to v
110#[derive(Clone, Debug, Default)]
111pub struct NotePatch {
112    pub name: Option<Option<String>>,
113    pub content: Option<String>,
114    pub salience: Option<Option<f64>>,
115    pub decay_factor: Option<Option<f64>>,
116    pub properties: Option<Value>,
117    pub(crate) kind_status: Option<String>,
118}
119
120impl NotePatch {
121    /// Construct a `NotePatch` from the public fields only.
122    /// Use this from external crates; `kind_status` is set to `None`.
123    pub fn new(
124        name: Option<Option<String>>,
125        content: Option<String>,
126        salience: Option<Option<f64>>,
127        decay_factor: Option<Option<f64>>,
128        properties: Option<Value>,
129    ) -> Self {
130        Self {
131            name,
132            content,
133            salience,
134            decay_factor,
135            properties,
136            kind_status: None,
137        }
138    }
139}
140
141/// Filter for `list_edges` / `count_edges`.
142#[derive(Clone, Debug, Default)]
143pub struct EdgeListFilter {
144    pub source_id: Option<Uuid>,
145    pub target_id: Option<Uuid>,
146    /// Empty = any relation.
147    pub relations: Vec<EdgeRelation>,
148    pub min_weight: Option<f64>,
149    pub max_weight: Option<f64>,
150}
151
152impl From<EdgeListFilter> for EdgeFilter {
153    fn from(f: EdgeListFilter) -> Self {
154        EdgeFilter {
155            source_ids: f.source_id.into_iter().collect(),
156            target_ids: f.target_id.into_iter().collect(),
157            relations: f.relations,
158            min_weight: f.min_weight,
159            max_weight: f.max_weight,
160            ..Default::default()
161        }
162    }
163}
164
165// ---------------------------------------------------------------------------
166// Implementation
167// ---------------------------------------------------------------------------
168
169impl KhiveRuntime {
170    /// Patch-style entity update.
171    ///
172    /// Only fields set to `Some(_)` are changed. Re-indexes FTS5 (and vectors if configured)
173    /// when `name` or `description` changes; skips re-indexing for property/tag-only patches.
174    ///
175    /// Returns `RuntimeError::NotFound` if the entity does not exist or belongs to a different
176    /// namespace. Namespace isolation is enforced at the runtime layer.
177    pub async fn update_entity(
178        &self,
179        token: &NamespaceToken,
180        id: Uuid,
181        patch: EntityPatch,
182    ) -> RuntimeResult<Entity> {
183        // Secret gate: scan incoming text fields, properties, and tags.
184        if let Some(ref name) = patch.name {
185            crate::secret_gate::check(name)?;
186        }
187        if let Some(Some(ref desc)) = patch.description {
188            crate::secret_gate::check(desc)?;
189        }
190        if let Some(ref props) = patch.properties {
191            crate::secret_gate::check_json(props)?;
192        }
193        if let Some(ref tags) = patch.tags {
194            crate::secret_gate::check_tags(tags)?;
195        }
196        let store = self.entities(token)?;
197        let mut entity = store
198            .get_entity(id)
199            .await?
200            .ok_or_else(|| RuntimeError::NotFound(format!("entity {id}")))?;
201
202        Self::ensure_namespace(&entity.namespace, token.namespace().as_str())?;
203
204        let mut text_changed = false;
205        let mut changed_fields: Vec<&'static str> = Vec::new();
206
207        if let Some(name) = patch.name {
208            text_changed |= entity.name != name;
209            entity.name = name;
210            changed_fields.push("name");
211        }
212        if let Some(desc_patch) = patch.description {
213            text_changed |= entity.description != desc_patch;
214            entity.description = desc_patch;
215            changed_fields.push("description");
216        }
217        if let Some(props) = patch.properties {
218            let (merged, _) = merge_properties(
219                &entity.properties,
220                &Some(props),
221                EntityDedupMergePolicy::PreferFrom,
222            );
223            entity.properties = merged;
224            changed_fields.push("properties");
225        }
226        if let Some(tags) = patch.tags {
227            entity.tags = tags;
228            changed_fields.push("tags");
229        }
230
231        entity.updated_at = chrono::Utc::now().timestamp_micros();
232        store.upsert_entity(entity.clone()).await?;
233
234        if text_changed {
235            self.reindex_entity(token, &entity).await?;
236        }
237
238        let event_store = self.events(token)?;
239        let event = khive_storage::event::Event::new(
240            entity.namespace.clone(),
241            "update",
242            EventKind::EntityUpdated,
243            SubstrateKind::Entity,
244            "",
245        )
246        .with_target(entity.id)
247        .with_payload(serde_json::json!({
248            "id": entity.id,
249            "namespace": entity.namespace,
250            "changed_fields": changed_fields,
251        }));
252        event_store.append_event(event).await.map_err(|e| {
253            RuntimeError::Internal(format!("update_entity: event store write failed: {e}"))
254        })?;
255
256        Ok(entity)
257    }
258
259    /// Merge `from_id` into `into_id`.
260    ///
261    /// All edges incident to `from_id` are rewired to `into_id`. Self-loops that would
262    /// result from the rewire are dropped. Properties and tags are merged per `strategy`.
263    /// `from_id` is tombstoned with merge provenance and removed from indexes. Returns a summary.
264    ///
265    /// If `dry_run` is true, computes and returns the planned summary without mutating any rows.
266    ///
267    /// Atomic: all SQL (entity reads/writes, edge rewires, FTS updates, vec-index delete)
268    /// runs on a single pool connection inside one `BEGIN IMMEDIATE` transaction via
269    /// `merge_entity_sql`. If embedding vectors are configured, the vector re-insert for
270    /// `into_id` is performed after the transaction (requires async embedding computation).
271    pub async fn merge_entity(
272        &self,
273        token: &NamespaceToken,
274        into_id: Uuid,
275        from_id: Uuid,
276        strategy: EntityDedupMergePolicy,
277        dry_run: bool,
278    ) -> RuntimeResult<MergeSummary> {
279        if into_id == from_id {
280            return Err(RuntimeError::InvalidInput(
281                "cannot merge an entity into itself".into(),
282            ));
283        }
284        // H2 fix: enforce same-kind constraint at the runtime layer.
285        // The handler also checks this, but any direct runtime caller (CLI, tests,
286        // future SDK) would bypass the handler guard without this check here.
287        {
288            let into_entity = self.get_entity(token, into_id).await?;
289            let from_entity = self.get_entity(token, from_id).await?;
290            if into_entity.kind != from_entity.kind {
291                return Err(RuntimeError::InvalidInput(format!(
292                    "cannot merge entities of different kinds: into={} ({}), from={} ({}); \
293                     merge requires both entities to share the same kind",
294                    into_id, into_entity.kind, from_id, from_entity.kind
295                )));
296            }
297        }
298        let ns = token.namespace().as_str().to_owned();
299        let sanitized_ns: String = ns
300            .chars()
301            .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
302            .collect();
303        let fts_table = format!("fts_entities_{}", sanitized_ns);
304        let vec_table = self.config().embedding_model.map(|model| {
305            let key: String = model
306                .to_string()
307                .chars()
308                .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
309                .collect();
310            format!("vec_{}", key)
311        });
312
313        // Ensure all required tables exist before entering the transaction.
314        // Each accessor applies its DDL idempotently via `CREATE TABLE IF NOT EXISTS`.
315        let _ = self.entities(token)?;
316        let _ = self.graph(token)?;
317        let _ = self.text(token)?;
318        if self.config().embedding_model.is_some() {
319            let _ = self.vectors(token)?;
320        }
321
322        let pool = self.backend().pool_arc();
323
324        let (summary, updated_entity) = tokio::task::spawn_blocking(move || {
325            let guard = pool.writer()?;
326            guard.transaction(|conn| {
327                merge_entity_sql(
328                    conn, ns, fts_table, vec_table, into_id, from_id, strategy, dry_run,
329                )
330            })
331        })
332        .await
333        .map_err(|e| RuntimeError::Internal(e.to_string()))??;
334
335        // If vectors are configured, reindex into_entity (requires async embedding).
336        // FTS and vec-delete were already committed inside the transaction above.
337        if !dry_run && self.config().embedding_model.is_some() {
338            self.reindex_entity(token, &updated_entity).await?;
339        }
340
341        let event_store = self.events(token)?;
342        // Mirror the wire-level strategy spelling from MergeParams so consumers
343        // can round-trip the policy string back into a request.
344        let policy_str = match strategy {
345            EntityDedupMergePolicy::PreferInto => "prefer_into",
346            EntityDedupMergePolicy::PreferFrom => "prefer_from",
347            EntityDedupMergePolicy::Union => "union",
348        };
349        let event = khive_storage::event::Event::new(
350            updated_entity.namespace.clone(),
351            "merge",
352            EventKind::EntityMerged,
353            SubstrateKind::Entity,
354            "",
355        )
356        .with_target(summary.kept_id)
357        .with_payload(serde_json::json!({
358            "into_id": summary.kept_id,
359            "from_id": summary.removed_id,
360            "policy": policy_str,
361            "edges_rewired": summary.edges_rewired,
362        }));
363        event_store.append_event(event).await.map_err(|e| {
364            RuntimeError::Internal(format!("merge_entity: event store write failed: {e}"))
365        })?;
366
367        Ok(summary)
368    }
369
370    // ---- Internal helpers ----
371
372    /// Re-upsert FTS5 document (and vector if model configured) for the entity.
373    ///
374    /// Uses `entity.namespace` — the authoritative namespace stored on the record — rather
375    /// than the caller-supplied `namespace` parameter. This prevents a cross-namespace
376    /// reindex from writing the search document into the wrong namespace's FTS index.
377    pub(crate) async fn reindex_entity(
378        &self,
379        token: &NamespaceToken,
380        entity: &Entity,
381    ) -> RuntimeResult<()> {
382        // Use entity.namespace (authoritative) rather than token.namespace().as_str() (caller claim).
383        let ns = entity.namespace.clone();
384        let doc = entity_fts_document(entity);
385        let embed_body = doc.body.clone();
386        self.text(token)?.upsert_document(doc).await?;
387
388        if self.config().embedding_model.is_some() {
389            let vector = self.embed_document(&embed_body).await?;
390            self.vectors(token)?
391                .insert(
392                    entity.id,
393                    SubstrateKind::Entity,
394                    &ns,
395                    "entity.body",
396                    vec![vector],
397                )
398                .await?;
399        }
400
401        Ok(())
402    }
403
404    /// Remove an entity from FTS5 and (if configured) vector indexes.
405    pub(crate) async fn remove_from_indexes(
406        &self,
407        token: &NamespaceToken,
408        id: Uuid,
409    ) -> RuntimeResult<()> {
410        let ns = token.namespace().as_str().to_owned();
411        self.text(token)?.delete_document(&ns, id).await?;
412        if self.config().embedding_model.is_some() {
413            self.vectors(token)?.delete(id).await?;
414        }
415        Ok(())
416    }
417
418    /// Re-upsert FTS5 document (and vector if model configured) for the note.
419    pub(crate) async fn reindex_note(
420        &self,
421        token: &NamespaceToken,
422        note: &khive_storage::note::Note,
423    ) -> RuntimeResult<()> {
424        self.text_for_notes(token)?
425            .upsert_document(note_fts_document(note))
426            .await?;
427
428        if self.config().embedding_model.is_some() {
429            let ns = note.namespace.clone();
430            let vector = self.embed_document(&note.content).await?;
431            self.vectors(token)?
432                .insert(
433                    note.id,
434                    SubstrateKind::Note,
435                    &ns,
436                    "note.content",
437                    vec![vector],
438                )
439                .await?;
440        }
441        Ok(())
442    }
443
444    /// Patch-style note update.
445    pub async fn update_note(
446        &self,
447        token: &NamespaceToken,
448        id: Uuid,
449        patch: NotePatch,
450    ) -> RuntimeResult<khive_storage::note::Note> {
451        // Secret gate: scan incoming text fields and structured properties.
452        if let Some(ref content) = patch.content {
453            crate::secret_gate::check(content)?;
454        }
455        if let Some(Some(ref name)) = patch.name {
456            crate::secret_gate::check(name)?;
457        }
458        if let Some(ref props) = patch.properties {
459            crate::secret_gate::check_json(props)?;
460        }
461        let store = self.notes(token)?;
462        let mut note = store
463            .get_note(id)
464            .await?
465            .ok_or_else(|| RuntimeError::NotFound(format!("note {id}")))?;
466
467        Self::ensure_namespace(&note.namespace, token.namespace().as_str())?;
468
469        let mut text_changed = false;
470
471        if let Some(name_patch) = patch.name {
472            text_changed |= note.name != name_patch;
473            note.name = name_patch;
474        }
475        if let Some(content) = patch.content {
476            text_changed |= note.content != content;
477            note.content = content;
478        }
479        if let Some(salience_patch) = patch.salience {
480            // Reject non-finite or out-of-range salience at the runtime boundary
481            // rather than silently clamping invalid caller input (coding-standards §608-622).
482            if let Some(s) = salience_patch {
483                if !s.is_finite() || !(0.0..=1.0).contains(&s) {
484                    return Err(crate::RuntimeError::InvalidInput(format!(
485                        "salience must be a finite value in [0.0, 1.0]; got {s}"
486                    )));
487                }
488            }
489            note.salience = salience_patch;
490        }
491        if let Some(decay_patch) = patch.decay_factor {
492            // Reject non-finite or negative decay_factor at the runtime boundary.
493            if let Some(d) = decay_patch {
494                if !d.is_finite() || d < 0.0 {
495                    return Err(crate::RuntimeError::InvalidInput(format!(
496                        "decay_factor must be a finite value >= 0.0; got {d}"
497                    )));
498                }
499            }
500            note.decay_factor = decay_patch;
501        }
502        if let Some(props) = patch.properties {
503            let (merged, _) = merge_properties(
504                &note.properties,
505                &Some(props),
506                EntityDedupMergePolicy::PreferFrom,
507            );
508            note.properties = merged;
509        }
510        if let Some(status) = patch.kind_status {
511            note.status = status;
512        }
513
514        note.updated_at = chrono::Utc::now().timestamp_micros();
515        store.upsert_note(note.clone()).await?;
516
517        if text_changed {
518            self.reindex_note(token, &note).await?;
519        }
520
521        Ok(note)
522    }
523
524    /// Merge `from_id` note into `into_id` note.
525    ///
526    /// Both notes must exist in the namespace and have the same `kind`. Content is merged
527    /// per `content_strategy`. Properties are merged per `strategy`. `from_id` is
528    /// tombstoned (status='deleted', deleted_at set). Returns a summary.
529    ///
530    /// If `dry_run` is true, computes and returns the planned summary without mutating
531    /// any rows, edges, or indexes.
532    pub async fn merge_note(
533        &self,
534        token: &NamespaceToken,
535        into_id: Uuid,
536        from_id: Uuid,
537        strategy: EntityDedupMergePolicy,
538        content_strategy: ContentMergeStrategy,
539        dry_run: bool,
540    ) -> RuntimeResult<MergeSummary> {
541        if into_id == from_id {
542            return Err(RuntimeError::InvalidInput(
543                "cannot merge a note into itself".into(),
544            ));
545        }
546        let ns = token.namespace().as_str().to_string();
547        let sanitized_ns: String = ns
548            .chars()
549            .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
550            .collect();
551        let fts_table = format!("fts_notes_{}", sanitized_ns);
552        let vec_table = self.config().embedding_model.map(|model| {
553            let key: String = model
554                .to_string()
555                .chars()
556                .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
557                .collect();
558            format!("vec_{}", key)
559        });
560
561        let note_store = self.notes(token)?;
562        let into_note = note_store
563            .get_note(into_id)
564            .await?
565            .ok_or_else(|| RuntimeError::NotFound("not found in this namespace".into()))?;
566        Self::ensure_namespace(&into_note.namespace, &ns)?;
567
568        let from_note = note_store
569            .get_note(from_id)
570            .await?
571            .ok_or_else(|| RuntimeError::NotFound("not found in this namespace".into()))?;
572        Self::ensure_namespace(&from_note.namespace, &ns)?;
573
574        let _ = self.graph(token)?;
575        let _ = self.text_for_notes(token)?;
576        if self.config().embedding_model.is_some() {
577            let _ = self.vectors(token)?;
578        }
579
580        let pool = self.backend().pool_arc();
581        let (summary, updated_note) = tokio::task::spawn_blocking(move || {
582            let guard = pool.writer()?;
583            guard.transaction(|conn| {
584                merge_note_sql(
585                    conn,
586                    ns,
587                    fts_table,
588                    vec_table,
589                    into_id,
590                    from_id,
591                    strategy,
592                    content_strategy,
593                    dry_run,
594                )
595            })
596        })
597        .await
598        .map_err(|e| RuntimeError::Internal(e.to_string()))??;
599
600        if !dry_run && self.config().embedding_model.is_some() {
601            self.reindex_note(token, &updated_note).await?;
602        }
603        Ok(summary)
604    }
605}
606
607// ---------------------------------------------------------------------------
608// FTS document construction
609// ---------------------------------------------------------------------------
610
611/// Build the `TextDocument` for an entity. This is the single source of truth for
612/// entity FTS document shape; all write paths (create, update, merge, reindex, backfill)
613/// must go through this function so search parity is guaranteed.
614///
615/// Body rule: when the entity has a non-empty description, prepend the name
616/// (`"<name> <description>"`). Otherwise the body is just the name. This
617/// matches the FTS index contract: `title` and `body` are the ranked columns;
618/// `tags`, `metadata`, and `namespace` are UNINDEXED.
619///
620/// `updated_at` is taken from the entity's own timestamp so that backfill and
621/// reindex runs record the entity's actual mutation time rather than the
622/// reindex execution time.
623pub fn entity_fts_document(entity: &Entity) -> TextDocument {
624    let body = match &entity.description {
625        Some(d) if !d.is_empty() => format!("{} {}", entity.name, d),
626        _ => entity.name.clone(),
627    };
628    let updated_at =
629        chrono::DateTime::from_timestamp_micros(entity.updated_at).unwrap_or_else(chrono::Utc::now);
630    TextDocument {
631        subject_id: entity.id,
632        kind: SubstrateKind::Entity,
633        title: Some(entity.name.clone()),
634        body,
635        tags: entity.tags.clone(),
636        namespace: entity.namespace.clone(),
637        metadata: entity.properties.clone(),
638        updated_at,
639    }
640}
641
642/// Build the `TextDocument` for a note. This is the single source of truth for
643/// note FTS document shape; all write paths (create, update, reindex) must go
644/// through this function so recall parity is guaranteed. Changes here apply to
645/// every caller automatically.
646///
647/// Body rule: when the note has a `name`, prepend it to the content
648/// (`"<name> <content>"`). This matches the FTS index contract: title and body
649/// both contribute to ranking, and the name is the most salient signal.
650///
651/// `updated_at` is taken from the note's own timestamp (not `Utc::now()`) so
652/// that backfill and reindex runs record the note's actual mutation time rather
653/// than the reindex execution time.
654pub fn note_fts_document(note: &Note) -> TextDocument {
655    let body = match &note.name {
656        Some(n) => format!("{n} {}", note.content),
657        None => note.content.clone(),
658    };
659    let updated_at =
660        chrono::DateTime::from_timestamp_micros(note.updated_at).unwrap_or_else(chrono::Utc::now);
661    TextDocument {
662        subject_id: note.id,
663        kind: SubstrateKind::Note,
664        title: note.name.clone(),
665        body,
666        tags: vec![],
667        namespace: note.namespace.clone(),
668        metadata: note.properties.clone(),
669        updated_at,
670    }
671}
672
673/// SQL-bind–ready scalars derived from [`note_fts_document`].
674///
675/// Used by `merge_note_sql` to guarantee that the raw SQL FTS INSERT stores
676/// exactly what [`Fts5TextSearch::upsert_document`] would write, preventing
677/// null/empty-string divergence on the `title` column for nameless notes.
678pub(crate) struct NoteFtsScalars {
679    /// Empty string when `note.name` is `None` — matches the `unwrap_or("")` in
680    /// `Fts5TextSearch::upsert_document`.
681    pub title: String,
682    pub body: String,
683    /// Always the JSON array `"[]"`.
684    pub tags: String,
685    /// Serialised `note.properties`, or `None` when properties are absent.
686    pub metadata: Option<String>,
687    /// `note.updated_at` converted to `DateTime<Utc>` timestamp_micros.
688    pub updated_at_micros: i64,
689}
690
691/// Derive [`NoteFtsScalars`] from a [`Note`].
692///
693/// All values match the encoding that [`Fts5TextSearch::upsert_document`]
694/// applies when given the output of [`note_fts_document`].
695pub(crate) fn note_fts_scalars(note: &Note) -> NoteFtsScalars {
696    let doc = note_fts_document(note);
697    NoteFtsScalars {
698        title: doc.title.unwrap_or_default(),
699        body: doc.body,
700        tags: "[]".to_string(),
701        metadata: doc
702            .metadata
703            .as_ref()
704            .map(|v| serde_json::to_string(v).unwrap_or_default()),
705        updated_at_micros: doc.updated_at.timestamp_micros(),
706    }
707}
708
709// ---------------------------------------------------------------------------
710// Transactional merge SQL helpers
711// ---------------------------------------------------------------------------
712
713/// Read one entity row by ID within a namespace, returning `SqliteError` on missing/wrong-ns.
714fn read_merge_entity(
715    conn: &rusqlite::Connection,
716    id: Uuid,
717    namespace: &str,
718) -> Result<Entity, SqliteError> {
719    let id_str = id.to_string();
720    let mut stmt = conn.prepare(
721        "SELECT id, namespace, kind, entity_type, name, description, properties, tags, \
722         created_at, updated_at, deleted_at, merged_into, merge_event_id \
723         FROM entities WHERE id = ?1 AND deleted_at IS NULL",
724    )?;
725    let mut rows = stmt.query(rusqlite::params![id_str])?;
726    let row = rows
727        .next()?
728        .ok_or_else(|| SqliteError::InvalidData(format!("entity {id} not found")))?;
729
730    let id_s: String = row.get(0)?;
731    let ns: String = row.get(1)?;
732    let kind: String = row.get(2)?;
733    let entity_type: Option<String> = row.get(3)?;
734    let name: String = row.get(4)?;
735    let description: Option<String> = row.get(5)?;
736    let properties_str: Option<String> = row.get(6)?;
737    let tags_str: String = row.get(7)?;
738    let created_at: i64 = row.get(8)?;
739    let updated_at: i64 = row.get(9)?;
740    let deleted_at: Option<i64> = row.get(10)?;
741    let merged_into_str: Option<String> = row.get(11)?;
742    let merge_event_id_str: Option<String> = row.get(12)?;
743
744    if ns != namespace {
745        return Err(SqliteError::InvalidData(format!(
746            "entity {id} belongs to namespace '{ns}', not '{namespace}'"
747        )));
748    }
749
750    let entity_id = Uuid::parse_str(&id_s).map_err(|e| SqliteError::InvalidData(e.to_string()))?;
751    let properties: Option<Value> = properties_str
752        .map(|s| {
753            serde_json::from_str::<Value>(&s).map_err(|e| SqliteError::InvalidData(e.to_string()))
754        })
755        .transpose()?;
756    let tags: Vec<String> =
757        serde_json::from_str(&tags_str).map_err(|e| SqliteError::InvalidData(e.to_string()))?;
758    let merged_into = merged_into_str
759        .as_deref()
760        .map(Uuid::parse_str)
761        .transpose()
762        .map_err(|e| SqliteError::InvalidData(e.to_string()))?;
763    let merge_event_id = merge_event_id_str
764        .as_deref()
765        .map(Uuid::parse_str)
766        .transpose()
767        .map_err(|e| SqliteError::InvalidData(e.to_string()))?;
768
769    Ok(Entity {
770        id: entity_id,
771        namespace: ns,
772        kind,
773        entity_type,
774        name,
775        description,
776        properties,
777        tags,
778        created_at,
779        updated_at,
780        deleted_at,
781        merged_into,
782        merge_event_id,
783    })
784}
785
786/// All merge SQL on one connection inside an already-open `BEGIN IMMEDIATE` transaction.
787///
788/// Reads both entities, rewires/drops incident edges, merges entity fields, updates FTS,
789/// deletes the `from` vec entry (if `vec_table` is Some), and tombstones `from` with merge
790/// provenance.  Returns the updated `into` entity so the caller can do the async vec re-insert.
791///
792/// When `dry_run` is true, all reads and computations are performed but no writes are issued.
793// REASON: merge requires both entity IDs, the namespace, FTS and vec table names, merge
794// policy, and dry-run flag — all are load-bearing; reducing to a struct would obscure
795// the sync/async boundary split that keeps this function off the async runtime.
796#[allow(clippy::too_many_arguments)]
797fn merge_entity_sql(
798    conn: &rusqlite::Connection,
799    namespace: String,
800    fts_table: String,
801    vec_table: Option<String>,
802    into_id: Uuid,
803    from_id: Uuid,
804    strategy: EntityDedupMergePolicy,
805    dry_run: bool,
806) -> Result<(MergeSummary, Entity), SqliteError> {
807    let into_entity = read_merge_entity(conn, into_id, &namespace)?;
808    let from_entity = read_merge_entity(conn, from_id, &namespace)?;
809
810    // --- Collect edges incident to from_id ---
811    // REASON: EdgeRow fields are populated via rusqlite row mapping; the struct is fully
812    // constructed even though not all fields are read back after construction — the
813    // complete mapping guards against column-order bugs when the schema changes.
814    #[allow(dead_code)]
815    struct EdgeRow {
816        id: Uuid,
817        source_id: Uuid,
818        target_id: Uuid,
819        relation: String,
820        weight: f64,
821        created_at: i64,
822        updated_at: i64,
823        deleted_at: Option<i64>,
824        target_backend: Option<String>,
825        metadata: Option<String>,
826    }
827
828    let parse_id =
829        |s: String| Uuid::parse_str(&s).map_err(|e| SqliteError::InvalidData(e.to_string()));
830
831    let from_str = from_id.to_string();
832
833    let mut outbound: Vec<EdgeRow> = Vec::new();
834    {
835        let mut stmt = conn.prepare(
836            "SELECT id, source_id, target_id, relation, weight, created_at, \
837                    updated_at, deleted_at, target_backend, metadata \
838             FROM graph_edges WHERE namespace = ?1 AND source_id = ?2",
839        )?;
840        let mut rows = stmt.query(rusqlite::params![&namespace, &from_str])?;
841        while let Some(row) = rows.next()? {
842            outbound.push(EdgeRow {
843                id: parse_id(row.get(0)?)?,
844                source_id: parse_id(row.get(1)?)?,
845                target_id: parse_id(row.get(2)?)?,
846                relation: row.get(3)?,
847                weight: row.get(4)?,
848                created_at: row.get(5)?,
849                updated_at: row.get(6)?,
850                deleted_at: row.get(7)?,
851                target_backend: row.get(8)?,
852                metadata: row.get(9)?,
853            });
854        }
855    }
856
857    let mut inbound: Vec<EdgeRow> = Vec::new();
858    {
859        let mut stmt = conn.prepare(
860            "SELECT id, source_id, target_id, relation, weight, created_at, \
861                    updated_at, deleted_at, target_backend, metadata \
862             FROM graph_edges WHERE namespace = ?1 AND target_id = ?2",
863        )?;
864        let mut rows = stmt.query(rusqlite::params![&namespace, &from_str])?;
865        while let Some(row) = rows.next()? {
866            inbound.push(EdgeRow {
867                id: parse_id(row.get(0)?)?,
868                source_id: parse_id(row.get(1)?)?,
869                target_id: parse_id(row.get(2)?)?,
870                relation: row.get(3)?,
871                weight: row.get(4)?,
872                created_at: row.get(5)?,
873                updated_at: row.get(6)?,
874                deleted_at: row.get(7)?,
875                target_backend: row.get(8)?,
876                metadata: row.get(9)?,
877            });
878        }
879    }
880
881    // Deduplicate by edge ID (a self-edge from_id→from_id appears in both lists).
882    let mut seen: HashSet<Uuid> = HashSet::new();
883    let mut all_edges: Vec<EdgeRow> = Vec::new();
884    for edge in outbound.into_iter().chain(inbound) {
885        if seen.insert(edge.id) {
886            all_edges.push(edge);
887        }
888    }
889
890    // --- Merge entity fields ---
891    let (merged_props, properties_merged) =
892        merge_properties(&into_entity.properties, &from_entity.properties, strategy);
893    let merged_name = merge_string_field(&into_entity.name, &from_entity.name, strategy);
894    let merged_description =
895        merge_option_string_field(&into_entity.description, &from_entity.description, strategy);
896    let (merged_tags, tags_unioned) = union_tags(&into_entity.tags, &from_entity.tags);
897
898    let now = chrono::Utc::now().timestamp_micros();
899    let into_str = into_id.to_string();
900    let props_str = merged_props
901        .as_ref()
902        .map(|v| serde_json::to_string(v).unwrap_or_default());
903    let tags_json = serde_json::to_string(&merged_tags).unwrap_or_else(|_| "[]".to_string());
904
905    // --- Rewire edges ---
906    let mut edges_rewired = 0usize;
907    if !dry_run {
908        for edge in all_edges {
909            let raw_src = if edge.source_id == from_id {
910                into_id
911            } else {
912                edge.source_id
913            };
914            let raw_tgt = if edge.target_id == from_id {
915                into_id
916            } else {
917                edge.target_id
918            };
919            // Symmetric relations must be stored with source_uuid < target_uuid.
920            // Apply canonicalization so the conflict check and UPDATE both use the canonical form.
921            let (new_src, new_tgt) = match edge.relation.parse::<EdgeRelation>() {
922                Ok(rel) => canonical_edge_endpoints(rel, raw_src, raw_tgt),
923                Err(_) => (raw_src, raw_tgt),
924            };
925
926            if new_src == new_tgt {
927                conn.execute(
928                    "DELETE FROM graph_edges WHERE namespace = ?1 AND id = ?2",
929                    rusqlite::params![&namespace, edge.id.to_string()],
930                )?;
931                continue;
932            }
933
934            let now_ts = chrono::Utc::now().timestamp();
935            // H3 fix: preserve the original edge ID by updating
936            // source_id/target_id in-place when no conflict exists.
937            //
938            // Two-step approach to handle all cases while keeping the original ID:
939            //   (a) No conflict (new triple): UPDATE source_id/target_id in-place.
940            //       The edge retains its original UUID — callers can still get() it
941            //       by the ID they received from link().
942            //   (b) Conflict: into_id already has an edge with this (source,target,
943            //       relation). Delete the from-edge (it is superseded) and UPDATE
944            //       the existing into-edge to refresh weight/metadata/deleted_at.
945            //       The surviving edge is the into-entity's original edge (correct).
946            //
947            // Check for a conflict: does into_id already have this natural key?
948            let conflict_id: Option<String> = {
949                let conflict_src = new_src.to_string();
950                let conflict_tgt = new_tgt.to_string();
951                conn.query_row(
952                    "SELECT id FROM graph_edges \
953                     WHERE namespace = ?1 AND source_id = ?2 AND target_id = ?3 \
954                     AND relation = ?4 AND id != ?5",
955                    rusqlite::params![
956                        &namespace,
957                        &conflict_src,
958                        &conflict_tgt,
959                        &edge.relation,
960                        edge.id.to_string(),
961                    ],
962                    |row| row.get(0),
963                )
964                .optional()
965                .map_err(SqliteError::Rusqlite)?
966            };
967
968            let changed = if let Some(existing_id) = conflict_id {
969                // Case (b): a live or soft-deleted row already owns this natural key.
970                // Delete the from-edge and refresh the existing row.
971                conn.execute(
972                    "DELETE FROM graph_edges WHERE namespace = ?1 AND id = ?2",
973                    rusqlite::params![&namespace, edge.id.to_string()],
974                )?;
975                conn.execute(
976                    "UPDATE graph_edges SET \
977                     weight = ?1, updated_at = ?2, deleted_at = NULL, \
978                     target_backend = ?3, metadata = ?4 \
979                     WHERE namespace = ?5 AND id = ?6",
980                    rusqlite::params![
981                        edge.weight,
982                        now_ts,
983                        edge.target_backend,
984                        edge.metadata,
985                        &namespace,
986                        &existing_id,
987                    ],
988                )?
989            } else {
990                // Case (a): no conflict — update source_id/target_id in-place,
991                // preserving the original edge ID for callers.
992                conn.execute(
993                    "UPDATE graph_edges SET \
994                     source_id = ?1, target_id = ?2, updated_at = ?3 \
995                     WHERE namespace = ?4 AND id = ?5",
996                    rusqlite::params![
997                        new_src.to_string(),
998                        new_tgt.to_string(),
999                        now_ts,
1000                        &namespace,
1001                        edge.id.to_string(),
1002                    ],
1003                )?
1004            };
1005            if changed > 0 {
1006                edges_rewired += 1;
1007            }
1008        }
1009
1010        // --- Upsert merged entity ---
1011        conn.execute(
1012            "INSERT OR REPLACE INTO entities \
1013             (id, namespace, kind, name, description, properties, tags, \
1014              created_at, updated_at, deleted_at, merged_into, merge_event_id) \
1015             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
1016            rusqlite::params![
1017                &into_str,
1018                &namespace,
1019                &into_entity.kind,
1020                &merged_name,
1021                &merged_description,
1022                &props_str,
1023                &tags_json,
1024                into_entity.created_at,
1025                now,
1026                into_entity.deleted_at,
1027                Option::<String>::None,
1028                Option::<String>::None,
1029            ],
1030        )?;
1031
1032        // --- Reindex into_id in FTS (delete existing, insert updated) ---
1033        // Body formula mirrors entity_fts_document (the canonical constructor) —
1034        // this path is sync/spawn_blocking so it cannot call entity_fts_document
1035        // directly, but must stay field-identical.
1036        let fts_body = match &merged_description {
1037            Some(d) if !d.is_empty() => format!("{} {}", merged_name, d),
1038            _ => merged_name.clone(),
1039        };
1040        let kind_str = SubstrateKind::Entity.to_string();
1041
1042        conn.execute(
1043            &format!(
1044                "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
1045                fts_table
1046            ),
1047            rusqlite::params![&namespace, &into_str],
1048        )?;
1049        conn.execute(
1050            &format!(
1051                "INSERT INTO {} \
1052                 (subject_id, kind, title, body, tags, namespace, metadata, updated_at) \
1053                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1054                fts_table
1055            ),
1056            rusqlite::params![
1057                &into_str,
1058                &kind_str,
1059                &merged_name,
1060                &fts_body,
1061                &tags_json,
1062                &namespace,
1063                &props_str,
1064                now,
1065            ],
1066        )?;
1067
1068        // --- Delete from_id from FTS ---
1069        conn.execute(
1070            &format!(
1071                "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
1072                fts_table
1073            ),
1074            rusqlite::params![&namespace, &from_str],
1075        )?;
1076
1077        // --- Delete from_id from vector index if configured ---
1078        if let Some(ref vec_tbl) = vec_table {
1079            conn.execute(
1080                &format!(
1081                    "DELETE FROM {} WHERE subject_id = ?1 AND namespace = ?2",
1082                    vec_tbl
1083                ),
1084                rusqlite::params![&from_str, &namespace],
1085            )?;
1086        }
1087
1088        // --- Tombstone from entity (soft-delete with provenance) ---
1089        let merge_event_id = Uuid::new_v4();
1090        conn.execute(
1091            "UPDATE entities \
1092             SET deleted_at = ?1, merged_into = ?2, merge_event_id = ?3, updated_at = ?1 \
1093             WHERE namespace = ?4 AND id = ?5 AND deleted_at IS NULL",
1094            rusqlite::params![
1095                now,
1096                into_str,
1097                merge_event_id.to_string(),
1098                &namespace,
1099                &from_str,
1100            ],
1101        )?;
1102    }
1103
1104    let updated_entity = Entity {
1105        id: into_id,
1106        namespace,
1107        kind: into_entity.kind,
1108        entity_type: into_entity.entity_type,
1109        name: merged_name,
1110        description: merged_description,
1111        properties: merged_props,
1112        tags: merged_tags,
1113        created_at: into_entity.created_at,
1114        updated_at: now,
1115        deleted_at: into_entity.deleted_at,
1116        merged_into: None,
1117        merge_event_id: None,
1118    };
1119
1120    Ok((
1121        MergeSummary {
1122            kept_id: into_id,
1123            removed_id: from_id,
1124            edges_rewired,
1125            properties_merged,
1126            tags_unioned,
1127            content_appended: false,
1128            dry_run,
1129        },
1130        updated_entity,
1131    ))
1132}
1133
1134// ---------------------------------------------------------------------------
1135// Note merge SQL helpers
1136// ---------------------------------------------------------------------------
1137
1138/// Read one note row by ID within a namespace, returning `SqliteError` on missing/wrong-ns.
1139fn read_merge_note(
1140    conn: &rusqlite::Connection,
1141    id: Uuid,
1142    namespace: &str,
1143) -> Result<khive_storage::note::Note, SqliteError> {
1144    use khive_storage::note::Note;
1145    let id_str = id.to_string();
1146    let mut stmt = conn.prepare(
1147        "SELECT id, namespace, kind, status, name, content, salience, decay_factor, \
1148         expires_at, properties, created_at, updated_at, deleted_at \
1149         FROM notes WHERE id = ?1 AND deleted_at IS NULL",
1150    )?;
1151    let mut rows = stmt.query(rusqlite::params![id_str])?;
1152    let row = rows
1153        .next()?
1154        .ok_or_else(|| SqliteError::InvalidData(format!("note {id} not found")))?;
1155
1156    let id_s: String = row.get(0)?;
1157    let ns: String = row.get(1)?;
1158    let kind: String = row.get(2)?;
1159    let status: String = row.get(3)?;
1160    let name: Option<String> = row.get(4)?;
1161    let content: String = row.get(5)?;
1162    let salience: Option<f64> = row.get(6)?;
1163    let decay_factor: Option<f64> = row.get(7)?;
1164    let expires_at: Option<i64> = row.get(8)?;
1165    let properties_str: Option<String> = row.get(9)?;
1166    let created_at: i64 = row.get(10)?;
1167    let updated_at: i64 = row.get(11)?;
1168    let deleted_at: Option<i64> = row.get(12)?;
1169
1170    if ns != namespace {
1171        return Err(SqliteError::InvalidData(format!(
1172            "note {id} belongs to namespace '{ns}', not '{namespace}'"
1173        )));
1174    }
1175
1176    let note_id = Uuid::parse_str(&id_s).map_err(|e| SqliteError::InvalidData(e.to_string()))?;
1177    let properties: Option<serde_json::Value> = properties_str
1178        .map(|s| serde_json::from_str(&s).map_err(|e| SqliteError::InvalidData(e.to_string())))
1179        .transpose()?;
1180
1181    Ok(Note {
1182        id: note_id,
1183        namespace: ns,
1184        kind,
1185        status,
1186        name,
1187        content,
1188        salience,
1189        decay_factor,
1190        expires_at,
1191        properties,
1192        created_at,
1193        updated_at,
1194        deleted_at,
1195    })
1196}
1197
1198fn max_option_f64(a: Option<f64>, b: Option<f64>) -> Option<f64> {
1199    match (a, b) {
1200        (Some(x), Some(y)) => Some(x.max(y)),
1201        (Some(x), None) => Some(x),
1202        (None, Some(y)) => Some(y),
1203        (None, None) => None,
1204    }
1205}
1206
1207fn append_merge_history(props: Option<Value>, entry: Value) -> Result<Option<Value>, SqliteError> {
1208    use serde_json::{json, Map};
1209    let mut obj: Map<String, Value> = match props {
1210        Some(Value::Object(m)) => m,
1211        Some(other) => {
1212            let mut m = Map::new();
1213            m.insert("_value".into(), other);
1214            m
1215        }
1216        None => Map::new(),
1217    };
1218    let history = obj
1219        .entry("_merge_history".to_string())
1220        .or_insert_with(|| json!([]));
1221    if let Value::Array(arr) = history {
1222        arr.push(entry);
1223    }
1224    Ok(Some(Value::Object(obj)))
1225}
1226
1227/// All note merge SQL on one connection inside a `BEGIN IMMEDIATE` transaction.
1228///
1229/// Reads both notes (must have same `kind`), rewires/drops incident edges, merges content
1230/// per `content_strategy`, tombstones `from`. Returns the updated `into` note for async
1231/// re-embedding.
1232///
1233/// When `dry_run` is true, all reads and computations are performed but no writes are issued.
1234// REASON: note merge additionally requires a content_strategy parameter versus entity merge;
1235// same sync/async boundary rationale as merge_entity_sql applies here.
1236#[allow(clippy::too_many_arguments)]
1237fn merge_note_sql(
1238    conn: &rusqlite::Connection,
1239    namespace: String,
1240    fts_table: String,
1241    vec_table: Option<String>,
1242    into_id: Uuid,
1243    from_id: Uuid,
1244    strategy: EntityDedupMergePolicy,
1245    content_strategy: ContentMergeStrategy,
1246    dry_run: bool,
1247) -> Result<(MergeSummary, khive_storage::note::Note), SqliteError> {
1248    let into_note = read_merge_note(conn, into_id, &namespace)?;
1249    let from_note = read_merge_note(conn, from_id, &namespace)?;
1250
1251    if into_note.kind != from_note.kind {
1252        return Err(SqliteError::InvalidData(format!(
1253            "cannot merge notes of different kinds: {} vs {}",
1254            into_note.kind, from_note.kind
1255        )));
1256    }
1257
1258    let now = chrono::Utc::now().timestamp_micros();
1259    let into_str = into_id.to_string();
1260    let from_str = from_id.to_string();
1261
1262    // Collect edges incident to from_id.
1263    // REASON: same as merge_entity_sql — full field mapping prevents column-order bugs.
1264    #[allow(dead_code)]
1265    struct EdgeRow {
1266        id: Uuid,
1267        source_id: Uuid,
1268        target_id: Uuid,
1269        relation: String,
1270        weight: f64,
1271        created_at: i64,
1272        updated_at: i64,
1273        deleted_at: Option<i64>,
1274        target_backend: Option<String>,
1275        metadata: Option<String>,
1276    }
1277    let parse_id =
1278        |s: String| Uuid::parse_str(&s).map_err(|e| SqliteError::InvalidData(e.to_string()));
1279
1280    let mut outbound: Vec<EdgeRow> = Vec::new();
1281    {
1282        let mut stmt = conn.prepare(
1283            "SELECT id, source_id, target_id, relation, weight, created_at, updated_at, deleted_at, target_backend, metadata \
1284             FROM graph_edges WHERE namespace = ?1 AND source_id = ?2",
1285        )?;
1286        let mut rows = stmt.query(rusqlite::params![&namespace, &from_str])?;
1287        while let Some(row) = rows.next()? {
1288            outbound.push(EdgeRow {
1289                id: parse_id(row.get(0)?)?,
1290                source_id: parse_id(row.get(1)?)?,
1291                target_id: parse_id(row.get(2)?)?,
1292                relation: row.get(3)?,
1293                weight: row.get(4)?,
1294                created_at: row.get(5)?,
1295                updated_at: row.get(6)?,
1296                deleted_at: row.get(7)?,
1297                target_backend: row.get(8)?,
1298                metadata: row.get(9)?,
1299            });
1300        }
1301    }
1302    let mut inbound: Vec<EdgeRow> = Vec::new();
1303    {
1304        let mut stmt = conn.prepare(
1305            "SELECT id, source_id, target_id, relation, weight, created_at, updated_at, deleted_at, target_backend, metadata \
1306             FROM graph_edges WHERE namespace = ?1 AND target_id = ?2",
1307        )?;
1308        let mut rows = stmt.query(rusqlite::params![&namespace, &from_str])?;
1309        while let Some(row) = rows.next()? {
1310            inbound.push(EdgeRow {
1311                id: parse_id(row.get(0)?)?,
1312                source_id: parse_id(row.get(1)?)?,
1313                target_id: parse_id(row.get(2)?)?,
1314                relation: row.get(3)?,
1315                weight: row.get(4)?,
1316                created_at: row.get(5)?,
1317                updated_at: row.get(6)?,
1318                deleted_at: row.get(7)?,
1319                target_backend: row.get(8)?,
1320                metadata: row.get(9)?,
1321            });
1322        }
1323    }
1324    let mut seen: HashSet<Uuid> = HashSet::new();
1325    let mut all_edges: Vec<EdgeRow> = Vec::new();
1326    for edge in outbound.into_iter().chain(inbound) {
1327        if seen.insert(edge.id) {
1328            all_edges.push(edge);
1329        }
1330    }
1331
1332    // Merge note fields.
1333    let (merged_content, content_appended) = match content_strategy {
1334        ContentMergeStrategy::Append => {
1335            if from_note.content.is_empty() {
1336                (into_note.content.clone(), false)
1337            } else {
1338                (
1339                    format!("{}\n\n---\n\n{}", into_note.content, from_note.content),
1340                    true,
1341                )
1342            }
1343        }
1344        ContentMergeStrategy::PreferInto => (into_note.content.clone(), false),
1345        ContentMergeStrategy::PreferFrom => (from_note.content.clone(), false),
1346    };
1347
1348    let merged_name = match strategy {
1349        EntityDedupMergePolicy::PreferFrom => from_note.name.clone().or(into_note.name.clone()),
1350        _ => into_note.name.clone().or(from_note.name.clone()),
1351    };
1352
1353    let (merged_props, properties_merged) =
1354        merge_properties(&into_note.properties, &from_note.properties, strategy);
1355
1356    // Append merge history to properties.
1357    let merge_history_entry = serde_json::json!({
1358        "merged_from": from_id.to_string(),
1359        "merged_at": now,
1360        "strategy": format!("{:?}", strategy),
1361        "content_strategy": format!("{:?}", content_strategy),
1362    });
1363    let merged_props = append_merge_history(merged_props, merge_history_entry)?;
1364
1365    let merged_salience = max_option_f64(into_note.salience, from_note.salience);
1366    let merged_expires_at = match (into_note.expires_at, from_note.expires_at) {
1367        (Some(a), Some(b)) => Some(a.max(b)),
1368        (Some(a), None) => Some(a),
1369        (None, Some(b)) => Some(b),
1370        (None, None) => None,
1371    };
1372
1373    let props_str = merged_props
1374        .as_ref()
1375        .map(|v| serde_json::to_string(v).unwrap_or_default());
1376
1377    let mut edges_rewired = 0usize;
1378    if !dry_run {
1379        // Rewire and upsert.
1380        for edge in all_edges {
1381            let raw_src = if edge.source_id == from_id {
1382                into_id
1383            } else {
1384                edge.source_id
1385            };
1386            let raw_tgt = if edge.target_id == from_id {
1387                into_id
1388            } else {
1389                edge.target_id
1390            };
1391            // Canonicalize symmetric relations before conflict check + UPDATE.
1392            let (new_src, new_tgt) = match edge.relation.parse::<EdgeRelation>() {
1393                Ok(rel) => canonical_edge_endpoints(rel, raw_src, raw_tgt),
1394                Err(_) => (raw_src, raw_tgt),
1395            };
1396            if new_src == new_tgt {
1397                conn.execute(
1398                    "DELETE FROM graph_edges WHERE namespace = ?1 AND id = ?2",
1399                    rusqlite::params![&namespace, edge.id.to_string()],
1400                )?;
1401                continue;
1402            }
1403            let now_ts = chrono::Utc::now().timestamp();
1404            // Same two-step approach as entity merge rewire: preserve original edge ID
1405            // when no conflict, merge into existing row when conflict exists.
1406            let conflict_id: Option<String> = {
1407                let conflict_src = new_src.to_string();
1408                let conflict_tgt = new_tgt.to_string();
1409                conn.query_row(
1410                    "SELECT id FROM graph_edges \
1411                     WHERE namespace = ?1 AND source_id = ?2 AND target_id = ?3 \
1412                     AND relation = ?4 AND id != ?5",
1413                    rusqlite::params![
1414                        &namespace,
1415                        &conflict_src,
1416                        &conflict_tgt,
1417                        &edge.relation,
1418                        edge.id.to_string(),
1419                    ],
1420                    |row| row.get(0),
1421                )
1422                .optional()
1423                .map_err(SqliteError::Rusqlite)?
1424            };
1425
1426            let changed = if let Some(existing_id) = conflict_id {
1427                conn.execute(
1428                    "DELETE FROM graph_edges WHERE namespace = ?1 AND id = ?2",
1429                    rusqlite::params![&namespace, edge.id.to_string()],
1430                )?;
1431                conn.execute(
1432                    "UPDATE graph_edges SET \
1433                     weight = ?1, updated_at = ?2, deleted_at = NULL, \
1434                     target_backend = ?3, metadata = ?4 \
1435                     WHERE namespace = ?5 AND id = ?6",
1436                    rusqlite::params![
1437                        edge.weight,
1438                        now_ts,
1439                        edge.target_backend,
1440                        edge.metadata,
1441                        &namespace,
1442                        &existing_id,
1443                    ],
1444                )?
1445            } else {
1446                conn.execute(
1447                    "UPDATE graph_edges SET \
1448                     source_id = ?1, target_id = ?2, updated_at = ?3 \
1449                     WHERE namespace = ?4 AND id = ?5",
1450                    rusqlite::params![
1451                        new_src.to_string(),
1452                        new_tgt.to_string(),
1453                        now_ts,
1454                        &namespace,
1455                        edge.id.to_string(),
1456                    ],
1457                )?
1458            };
1459            if changed > 0 {
1460                edges_rewired += 1;
1461            }
1462        }
1463
1464        // Upsert merged into-note.
1465        conn.execute(
1466            "INSERT OR REPLACE INTO notes \
1467             (id, namespace, kind, status, name, content, salience, decay_factor, \
1468              expires_at, properties, created_at, updated_at, deleted_at) \
1469             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
1470            rusqlite::params![
1471                &into_str,
1472                &namespace,
1473                &into_note.kind,
1474                &into_note.status,
1475                &merged_name,
1476                &merged_content,
1477                merged_salience,
1478                into_note.decay_factor,
1479                merged_expires_at,
1480                &props_str,
1481                into_note.created_at,
1482                now,
1483                into_note.deleted_at,
1484            ],
1485        )?;
1486
1487        // Update FTS for into-note.
1488        conn.execute(
1489            &format!(
1490                "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
1491                fts_table
1492            ),
1493            rusqlite::params![&namespace, &into_str],
1494        )?;
1495        // Derive FTS scalars through the shared constructor so this raw SQL
1496        // path is field-identical to TextSearch::upsert_document.  Critically,
1497        // `title` is an empty string (not SQL NULL) for nameless notes —
1498        // matching the unwrap_or("") in Fts5TextSearch::upsert_document and
1499        // allowing get_document to round-trip None ↔ "" correctly.
1500        let fts_merged = {
1501            let mut merged_note = Note::new(&namespace, &*into_note.kind, &*merged_content);
1502            merged_note.id = into_id;
1503            merged_note.name = merged_name.clone();
1504            merged_note.properties = merged_props.clone();
1505            merged_note.updated_at = now;
1506            note_fts_scalars(&merged_note)
1507        };
1508        conn.execute(
1509            &format!(
1510                "INSERT INTO {} \
1511                 (subject_id, kind, title, body, tags, namespace, metadata, updated_at) \
1512                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1513                fts_table
1514            ),
1515            rusqlite::params![
1516                &into_str,
1517                SubstrateKind::Note.to_string(),
1518                &fts_merged.title,
1519                &fts_merged.body,
1520                &fts_merged.tags,
1521                &namespace,
1522                &fts_merged.metadata,
1523                fts_merged.updated_at_micros,
1524            ],
1525        )?;
1526
1527        // Delete from-note from FTS.
1528        conn.execute(
1529            &format!(
1530                "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
1531                fts_table
1532            ),
1533            rusqlite::params![&namespace, &from_str],
1534        )?;
1535
1536        // Delete from-note from vector index if configured.
1537        if let Some(ref vec_tbl) = vec_table {
1538            conn.execute(
1539                &format!(
1540                    "DELETE FROM {} WHERE subject_id = ?1 AND namespace = ?2",
1541                    vec_tbl
1542                ),
1543                rusqlite::params![&from_str, &namespace],
1544            )?;
1545        }
1546
1547        // Tombstone the from-note.
1548        conn.execute(
1549            "UPDATE notes SET status = 'deleted', deleted_at = ?1, updated_at = ?1 \
1550             WHERE namespace = ?2 AND id = ?3 AND deleted_at IS NULL",
1551            rusqlite::params![now, &namespace, &from_str],
1552        )?;
1553    }
1554
1555    let updated_note = khive_storage::note::Note {
1556        id: into_id,
1557        namespace: namespace.clone(),
1558        kind: into_note.kind.clone(),
1559        status: into_note.status.clone(),
1560        name: merged_name,
1561        content: merged_content,
1562        salience: merged_salience,
1563        decay_factor: into_note.decay_factor,
1564        expires_at: merged_expires_at,
1565        properties: merged_props,
1566        created_at: into_note.created_at,
1567        updated_at: now,
1568        deleted_at: into_note.deleted_at,
1569    };
1570
1571    Ok((
1572        MergeSummary {
1573            kept_id: into_id,
1574            removed_id: from_id,
1575            edges_rewired,
1576            properties_merged,
1577            tags_unioned: 0,
1578            content_appended,
1579            dry_run,
1580        },
1581        updated_note,
1582    ))
1583}
1584
1585// ---------------------------------------------------------------------------
1586// Merge helpers (pure functions — easier to unit test)
1587// ---------------------------------------------------------------------------
1588
1589fn merge_string_field(into: &str, from: &str, strategy: EntityDedupMergePolicy) -> String {
1590    match strategy {
1591        EntityDedupMergePolicy::PreferInto | EntityDedupMergePolicy::Union => into.to_string(),
1592        EntityDedupMergePolicy::PreferFrom => from.to_string(),
1593    }
1594}
1595
1596fn merge_option_string_field(
1597    into: &Option<String>,
1598    from: &Option<String>,
1599    strategy: EntityDedupMergePolicy,
1600) -> Option<String> {
1601    match strategy {
1602        EntityDedupMergePolicy::PreferInto => {
1603            if into.is_some() {
1604                into.clone()
1605            } else {
1606                from.clone()
1607            }
1608        }
1609        EntityDedupMergePolicy::PreferFrom => {
1610            if from.is_some() {
1611                from.clone()
1612            } else {
1613                into.clone()
1614            }
1615        }
1616        EntityDedupMergePolicy::Union => {
1617            // Keep into's description; if empty, append from's.
1618            match (into, from) {
1619                (Some(a), _) if !a.is_empty() => Some(a.clone()),
1620                (_, Some(b)) => Some(b.clone()),
1621                _ => None,
1622            }
1623        }
1624    }
1625}
1626
1627/// Merge two property objects. Returns (merged, count_of_fields_from_from_that_were_added).
1628fn merge_properties(
1629    into: &Option<Value>,
1630    from: &Option<Value>,
1631    strategy: EntityDedupMergePolicy,
1632) -> (Option<Value>, usize) {
1633    match (into, from) {
1634        (None, None) => (None, 0),
1635        (Some(a), None) => (Some(a.clone()), 0),
1636        (None, Some(b)) => {
1637            let count = if let Value::Object(m) = b { m.len() } else { 1 };
1638            (Some(b.clone()), count)
1639        }
1640        (Some(into_val), Some(from_val)) => {
1641            let (merged, added) = merge_json(into_val, from_val, strategy);
1642            (Some(merged), added)
1643        }
1644    }
1645}
1646
1647/// Deep-merge two JSON values per strategy. Returns (merged, keys_contributed_by_from).
1648fn merge_json(into: &Value, from: &Value, strategy: EntityDedupMergePolicy) -> (Value, usize) {
1649    match (into, from, strategy) {
1650        (Value::Object(a), Value::Object(b), EntityDedupMergePolicy::Union) => {
1651            let mut result = a.clone();
1652            let mut added = 0usize;
1653            for (k, v_from) in b {
1654                if let Some(v_into) = a.get(k) {
1655                    let (merged, sub_added) =
1656                        merge_json(v_into, v_from, EntityDedupMergePolicy::Union);
1657                    result.insert(k.clone(), merged);
1658                    added += sub_added;
1659                } else {
1660                    result.insert(k.clone(), v_from.clone());
1661                    added += 1;
1662                }
1663            }
1664            (Value::Object(result), added)
1665        }
1666        (Value::Object(a), Value::Object(b), EntityDedupMergePolicy::PreferInto) => {
1667            let mut result = a.clone();
1668            let mut added = 0usize;
1669            for (k, v) in b {
1670                if !a.contains_key(k) {
1671                    result.insert(k.clone(), v.clone());
1672                    added += 1;
1673                }
1674            }
1675            (Value::Object(result), added)
1676        }
1677        (Value::Object(a), Value::Object(b), EntityDedupMergePolicy::PreferFrom) => {
1678            let mut result = a.clone();
1679            let mut added = 0usize;
1680            for (k, v) in b {
1681                result.insert(k.clone(), v.clone());
1682                if !a.contains_key(k) {
1683                    added += 1;
1684                }
1685            }
1686            (Value::Object(result), added)
1687        }
1688        // Non-object scalars: apply strategy directly.
1689        (_into_val, from_val, EntityDedupMergePolicy::PreferFrom) => (from_val.clone(), 1),
1690        _ => (into.clone(), 0),
1691    }
1692}
1693
1694fn union_tags(into: &[String], from: &[String]) -> (Vec<String>, usize) {
1695    let mut seen: HashSet<&str> = into.iter().map(|s| s.as_str()).collect();
1696    let mut result: Vec<String> = into.to_vec();
1697    let mut added = 0usize;
1698    for tag in from {
1699        if seen.insert(tag.as_str()) {
1700            result.push(tag.clone());
1701            added += 1;
1702        }
1703    }
1704    (result, added)
1705}
1706
1707// ---------------------------------------------------------------------------
1708// INLINE TEST JUSTIFICATION: tests here exercise patch/merge helpers and the
1709// update_note/update_entity paths that share private merge_properties logic.
1710// Moving them to tests/ would require pub-exporting merge_properties, which is
1711// an internal invariant not suitable for the public API surface. Broad
1712// behavioral curation tests live in tests/integration.rs.
1713// ---------------------------------------------------------------------------
1714
1715#[cfg(test)]
1716mod tests {
1717    use super::*;
1718    use crate::runtime::{KhiveRuntime, NamespaceToken};
1719    use khive_storage::types::{Direction, TextFilter, TextQueryMode, TextSearchRequest};
1720
1721    fn rt() -> KhiveRuntime {
1722        KhiveRuntime::memory().unwrap()
1723    }
1724
1725    // Helper: search FTS5 for `query` in a runtime namespace.
1726    async fn fts_hit(rt: &KhiveRuntime, token: &NamespaceToken, query: &str) -> Vec<Uuid> {
1727        let ns = token.namespace().as_str().to_string();
1728        rt.text(token)
1729            .unwrap()
1730            .search(TextSearchRequest {
1731                query: query.to_string(),
1732                mode: TextQueryMode::Plain,
1733                filter: Some(TextFilter {
1734                    namespaces: vec![ns],
1735                    ..Default::default()
1736                }),
1737                top_k: 50,
1738                snippet_chars: 100,
1739            })
1740            .await
1741            .unwrap()
1742            .into_iter()
1743            .map(|h| h.subject_id)
1744            .collect()
1745    }
1746
1747    #[tokio::test]
1748    async fn update_entity_patch_changes_only_specified_fields() {
1749        let rt = rt();
1750        let tok = NamespaceToken::local();
1751        let entity = rt
1752            .create_entity(
1753                &tok,
1754                "concept",
1755                None,
1756                "OriginalName",
1757                Some("orig desc"),
1758                Some(serde_json::json!({"k":"v"})),
1759                vec![],
1760            )
1761            .await
1762            .unwrap();
1763
1764        let updated = rt
1765            .update_entity(
1766                &tok,
1767                entity.id,
1768                EntityPatch {
1769                    description: Some(Some("new desc".to_string())),
1770                    ..Default::default()
1771                },
1772            )
1773            .await
1774            .unwrap();
1775
1776        assert_eq!(updated.name, "OriginalName");
1777        assert_eq!(updated.description.as_deref(), Some("new desc"));
1778        assert_eq!(updated.properties, Some(serde_json::json!({"k":"v"})));
1779    }
1780
1781    #[tokio::test]
1782    async fn update_entity_clear_description_with_some_none() {
1783        let rt = rt();
1784        let tok = NamespaceToken::local();
1785        let entity = rt
1786            .create_entity(
1787                &tok,
1788                "concept",
1789                None,
1790                "ClearDesc",
1791                Some("has description"),
1792                None,
1793                vec![],
1794            )
1795            .await
1796            .unwrap();
1797
1798        let updated = rt
1799            .update_entity(
1800                &tok,
1801                entity.id,
1802                EntityPatch {
1803                    description: Some(None),
1804                    ..Default::default()
1805                },
1806            )
1807            .await
1808            .unwrap();
1809
1810        assert!(
1811            updated.description.is_none(),
1812            "description should be cleared"
1813        );
1814    }
1815
1816    #[tokio::test]
1817    async fn update_entity_reindexes_when_name_changes() {
1818        let rt = rt();
1819        let tok = NamespaceToken::local();
1820        let entity = rt
1821            .create_entity(&tok, "concept", None, "OldName", None, None, vec![])
1822            .await
1823            .unwrap();
1824
1825        // Old name is findable.
1826        let hits_before = fts_hit(&rt, &tok, "OldName").await;
1827        assert!(
1828            hits_before.contains(&entity.id),
1829            "entity should be findable by old name"
1830        );
1831
1832        rt.update_entity(
1833            &tok,
1834            entity.id,
1835            EntityPatch {
1836                name: Some("NewName".to_string()),
1837                ..Default::default()
1838            },
1839        )
1840        .await
1841        .unwrap();
1842
1843        let hits_old = fts_hit(&rt, &tok, "OldName").await;
1844        let hits_new = fts_hit(&rt, &tok, "NewName").await;
1845
1846        // After rename, old name no longer matches this entity (FTS index updated).
1847        assert!(
1848            !hits_old.contains(&entity.id),
1849            "old name should no longer match after rename"
1850        );
1851        assert!(
1852            hits_new.contains(&entity.id),
1853            "new name should be findable after rename"
1854        );
1855    }
1856
1857    #[tokio::test]
1858    async fn update_entity_properties_merges_preserving_existing_keys() {
1859        let rt = rt();
1860        let tok = NamespaceToken::local();
1861        let entity = rt
1862            .create_entity(
1863                &tok,
1864                "concept",
1865                None,
1866                "MergeProps",
1867                None,
1868                Some(serde_json::json!({
1869                    "domain": "inference",
1870                    "repo": "lattice",
1871                    "status": "researched",
1872                })),
1873                vec![],
1874            )
1875            .await
1876            .unwrap();
1877
1878        let updated = rt
1879            .update_entity(
1880                &tok,
1881                entity.id,
1882                EntityPatch {
1883                    properties: Some(serde_json::json!({"status": "implemented"})),
1884                    ..Default::default()
1885                },
1886            )
1887            .await
1888            .unwrap();
1889
1890        let props = updated.properties.expect("properties should remain set");
1891        assert_eq!(props["domain"], "inference", "domain key must be preserved");
1892        assert_eq!(props["repo"], "lattice", "repo key must be preserved");
1893        assert_eq!(
1894            props["status"], "implemented",
1895            "status key must be updated by patch"
1896        );
1897    }
1898
1899    #[tokio::test]
1900    async fn update_entity_skips_reindex_when_only_properties_change() {
1901        let rt = rt();
1902        let tok = NamespaceToken::local();
1903        let entity = rt
1904            .create_entity(&tok, "concept", None, "StableIndexed", None, None, vec![])
1905            .await
1906            .unwrap();
1907
1908        // Verify it's in the index before.
1909        let hits_before = fts_hit(&rt, &tok, "StableIndexed").await;
1910        assert!(hits_before.contains(&entity.id));
1911
1912        // Only patch properties — text index should be untouched (still findable).
1913        rt.update_entity(
1914            &tok,
1915            entity.id,
1916            EntityPatch {
1917                properties: Some(serde_json::json!({"new": "prop"})),
1918                ..Default::default()
1919            },
1920        )
1921        .await
1922        .unwrap();
1923
1924        let hits_after = fts_hit(&rt, &tok, "StableIndexed").await;
1925        assert!(
1926            hits_after.contains(&entity.id),
1927            "still findable after props-only patch"
1928        );
1929    }
1930
1931    #[tokio::test]
1932    async fn merge_entity_rewires_edges() {
1933        let rt = rt();
1934        let tok = NamespaceToken::local();
1935        let a = rt
1936            .create_entity(&tok, "concept", None, "A", None, None, vec![])
1937            .await
1938            .unwrap();
1939        let b = rt
1940            .create_entity(&tok, "concept", None, "B", None, None, vec![])
1941            .await
1942            .unwrap();
1943        let c = rt
1944            .create_entity(&tok, "concept", None, "C", None, None, vec![])
1945            .await
1946            .unwrap();
1947        let d = rt
1948            .create_entity(&tok, "concept", None, "D", None, None, vec![])
1949            .await
1950            .unwrap();
1951
1952        // A→B and C→B; merge B into D → should become A→D and C→D.
1953        rt.link(&tok, a.id, b.id, EdgeRelation::Extends, 1.0, None)
1954            .await
1955            .unwrap();
1956        rt.link(&tok, c.id, b.id, EdgeRelation::Extends, 1.0, None)
1957            .await
1958            .unwrap();
1959
1960        let summary = rt
1961            .merge_entity(&tok, d.id, b.id, EntityDedupMergePolicy::PreferInto, false)
1962            .await
1963            .unwrap();
1964
1965        assert_eq!(summary.kept_id, d.id);
1966        assert_eq!(summary.removed_id, b.id);
1967        assert_eq!(summary.edges_rewired, 2);
1968
1969        // Verify edges now point to D.
1970        let a_neighbors = rt
1971            .neighbors(&tok, a.id, Direction::Out, None, None)
1972            .await
1973            .unwrap();
1974        assert_eq!(a_neighbors.len(), 1);
1975        assert_eq!(a_neighbors[0].node_id, d.id);
1976
1977        let c_neighbors = rt
1978            .neighbors(&tok, c.id, Direction::Out, None, None)
1979            .await
1980            .unwrap();
1981        assert_eq!(c_neighbors.len(), 1);
1982        assert_eq!(c_neighbors[0].node_id, d.id);
1983    }
1984
1985    #[tokio::test]
1986    async fn merge_entity_self_merge_rejected() {
1987        let rt = rt();
1988        let tok = NamespaceToken::local();
1989        let a = rt
1990            .create_entity(&tok, "concept", None, "A", None, None, vec![])
1991            .await
1992            .unwrap();
1993        let err = rt
1994            .merge_entity(&tok, a.id, a.id, EntityDedupMergePolicy::PreferInto, false)
1995            .await
1996            .unwrap_err();
1997        assert!(
1998            format!("{err:?}").contains("cannot merge an entity into itself"),
1999            "expected self-merge rejection, got: {err:?}"
2000        );
2001    }
2002
2003    #[tokio::test]
2004    async fn merge_entity_prefer_into_strategy() {
2005        let rt = rt();
2006        let tok = NamespaceToken::local();
2007        let into = rt
2008            .create_entity(
2009                &tok,
2010                "concept",
2011                None,
2012                "Into",
2013                None,
2014                Some(serde_json::json!({"a": 1})),
2015                vec![],
2016            )
2017            .await
2018            .unwrap();
2019        let from = rt
2020            .create_entity(
2021                &tok,
2022                "concept",
2023                None,
2024                "From",
2025                None,
2026                Some(serde_json::json!({"a": 2, "b": 3})),
2027                vec![],
2028            )
2029            .await
2030            .unwrap();
2031
2032        rt.merge_entity(
2033            &tok,
2034            into.id,
2035            from.id,
2036            EntityDedupMergePolicy::PreferInto,
2037            false,
2038        )
2039        .await
2040        .unwrap();
2041
2042        let kept = rt.get_entity(&tok, into.id).await.unwrap();
2043        let props = kept.properties.unwrap();
2044        // a stays as 1 (into wins), b is added from from.
2045        assert_eq!(props["a"], 1);
2046        assert_eq!(props["b"], 3);
2047    }
2048
2049    #[tokio::test]
2050    async fn merge_entity_prefer_from_strategy() {
2051        let rt = rt();
2052        let tok = NamespaceToken::local();
2053        let into = rt
2054            .create_entity(
2055                &tok,
2056                "concept",
2057                None,
2058                "Into",
2059                None,
2060                Some(serde_json::json!({"a": 1})),
2061                vec![],
2062            )
2063            .await
2064            .unwrap();
2065        let from = rt
2066            .create_entity(
2067                &tok,
2068                "concept",
2069                None,
2070                "From",
2071                None,
2072                Some(serde_json::json!({"a": 2, "b": 3})),
2073                vec![],
2074            )
2075            .await
2076            .unwrap();
2077
2078        rt.merge_entity(
2079            &tok,
2080            into.id,
2081            from.id,
2082            EntityDedupMergePolicy::PreferFrom,
2083            false,
2084        )
2085        .await
2086        .unwrap();
2087
2088        let kept = rt.get_entity(&tok, into.id).await.unwrap();
2089        let props = kept.properties.unwrap();
2090        // from wins on a, b also from from.
2091        assert_eq!(props["a"], 2);
2092        assert_eq!(props["b"], 3);
2093    }
2094
2095    #[tokio::test]
2096    async fn merge_entity_union_strategy() {
2097        let rt = rt();
2098        let tok = NamespaceToken::local();
2099        let into = rt
2100            .create_entity(
2101                &tok,
2102                "concept",
2103                None,
2104                "Into",
2105                None,
2106                Some(serde_json::json!({"a": 1})),
2107                vec![],
2108            )
2109            .await
2110            .unwrap();
2111        let from = rt
2112            .create_entity(
2113                &tok,
2114                "concept",
2115                None,
2116                "From",
2117                None,
2118                Some(serde_json::json!({"a": 2, "b": 3})),
2119                vec![],
2120            )
2121            .await
2122            .unwrap();
2123
2124        rt.merge_entity(&tok, into.id, from.id, EntityDedupMergePolicy::Union, false)
2125            .await
2126            .unwrap();
2127
2128        let kept = rt.get_entity(&tok, into.id).await.unwrap();
2129        let props = kept.properties.unwrap();
2130        // Scalar conflict: into wins → a=1. b added from from.
2131        assert_eq!(props["a"], 1);
2132        assert_eq!(props["b"], 3);
2133    }
2134
2135    #[tokio::test]
2136    async fn merge_entity_unions_tags() {
2137        let rt = rt();
2138        let tok = NamespaceToken::local();
2139        let into = rt
2140            .create_entity(
2141                &tok,
2142                "concept",
2143                None,
2144                "Into",
2145                None,
2146                None,
2147                vec!["x".to_string(), "y".to_string()],
2148            )
2149            .await
2150            .unwrap();
2151        let from = rt
2152            .create_entity(
2153                &tok,
2154                "concept",
2155                None,
2156                "From",
2157                None,
2158                None,
2159                vec!["y".to_string(), "z".to_string()],
2160            )
2161            .await
2162            .unwrap();
2163
2164        rt.merge_entity(
2165            &tok,
2166            into.id,
2167            from.id,
2168            EntityDedupMergePolicy::PreferInto,
2169            false,
2170        )
2171        .await
2172        .unwrap();
2173
2174        let kept = rt.get_entity(&tok, into.id).await.unwrap();
2175        let mut tags = kept.tags.clone();
2176        tags.sort();
2177        assert_eq!(tags, vec!["x", "y", "z"]);
2178    }
2179
2180    #[tokio::test]
2181    async fn merge_entity_drops_self_loops() {
2182        let rt = rt();
2183        let tok = NamespaceToken::local();
2184        let a = rt
2185            .create_entity(&tok, "concept", None, "A", None, None, vec![])
2186            .await
2187            .unwrap();
2188        let b = rt
2189            .create_entity(&tok, "concept", None, "B", None, None, vec![])
2190            .await
2191            .unwrap();
2192
2193        // A `extends` B — merging B into A would produce A `extends` A → drop it.
2194        rt.link(&tok, a.id, b.id, EdgeRelation::Extends, 1.0, None)
2195            .await
2196            .unwrap();
2197
2198        let summary = rt
2199            .merge_entity(&tok, a.id, b.id, EntityDedupMergePolicy::PreferInto, false)
2200            .await
2201            .unwrap();
2202
2203        assert_eq!(
2204            summary.edges_rewired, 0,
2205            "self-loop should be dropped, not rewired"
2206        );
2207
2208        let a_out = rt
2209            .neighbors(&tok, a.id, Direction::Out, None, None)
2210            .await
2211            .unwrap();
2212        assert!(a_out.is_empty(), "no self-loop should remain");
2213    }
2214
2215    // ---- merge helper unit tests ----
2216
2217    #[test]
2218    fn union_tags_deduplicates() {
2219        let (tags, added) = union_tags(
2220            &["x".to_string(), "y".to_string()],
2221            &["y".to_string(), "z".to_string()],
2222        );
2223        let mut sorted = tags.clone();
2224        sorted.sort();
2225        assert_eq!(sorted, vec!["x", "y", "z"]);
2226        assert_eq!(added, 1);
2227    }
2228
2229    #[test]
2230    fn merge_properties_prefer_into_fills_missing_keys() {
2231        let a = serde_json::json!({"a": 1});
2232        let b = serde_json::json!({"a": 99, "b": 2});
2233        let (merged, added) =
2234            merge_properties(&Some(a), &Some(b), EntityDedupMergePolicy::PreferInto);
2235        let m = merged.unwrap();
2236        assert_eq!(m["a"], 1);
2237        assert_eq!(m["b"], 2);
2238        assert_eq!(added, 1);
2239    }
2240
2241    // ---- tombstone and note merge tests ----
2242
2243    #[tokio::test]
2244    async fn merge_entity_tombstones_source_with_provenance() {
2245        let rt = rt();
2246        let tok = NamespaceToken::local();
2247        let into = rt
2248            .create_entity(&tok, "concept", None, "Into", None, None, vec![])
2249            .await
2250            .unwrap();
2251        let from = rt
2252            .create_entity(&tok, "concept", None, "From", None, None, vec![])
2253            .await
2254            .unwrap();
2255        let from_id = from.id;
2256
2257        rt.merge_entity(
2258            &tok,
2259            into.id,
2260            from_id,
2261            EntityDedupMergePolicy::PreferInto,
2262            false,
2263        )
2264        .await
2265        .unwrap();
2266
2267        // After merge, get_entity returns an error (soft-deleted rows are excluded).
2268        assert!(
2269            rt.get_entity(&tok, from_id).await.is_err(),
2270            "tombstoned source should not be returned by get_entity"
2271        );
2272
2273        // Verify the source row still exists in SQL with provenance.
2274        let pool = rt.backend().pool_arc();
2275        let (deleted_at, merged_into): (Option<i64>, Option<String>) =
2276            tokio::task::spawn_blocking(move || {
2277                let guard = pool.writer().unwrap();
2278                guard
2279                    .conn()
2280                    .query_row(
2281                        "SELECT deleted_at, merged_into FROM entities WHERE id = ?1",
2282                        [from_id.to_string()],
2283                        |row| Ok((row.get(0)?, row.get(1)?)),
2284                    )
2285                    .unwrap()
2286            })
2287            .await
2288            .unwrap();
2289        assert!(
2290            deleted_at.is_some(),
2291            "tombstoned entity must have deleted_at set"
2292        );
2293        assert_eq!(
2294            merged_into.as_deref(),
2295            Some(into.id.to_string().as_str()),
2296            "merged_into must point to into_id"
2297        );
2298    }
2299
2300    #[tokio::test]
2301    async fn merge_note_same_kind_appends_content() {
2302        let rt = rt();
2303        let tok = NamespaceToken::local();
2304        let into = rt
2305            .create_note(
2306                &tok,
2307                "observation",
2308                None,
2309                "Into content",
2310                None,
2311                None,
2312                vec![],
2313            )
2314            .await
2315            .unwrap();
2316        let from = rt
2317            .create_note(
2318                &tok,
2319                "observation",
2320                None,
2321                "From content",
2322                None,
2323                None,
2324                vec![],
2325            )
2326            .await
2327            .unwrap();
2328        let from_id = from.id;
2329
2330        let summary = rt
2331            .merge_note(
2332                &tok,
2333                into.id,
2334                from_id,
2335                EntityDedupMergePolicy::PreferInto,
2336                ContentMergeStrategy::Append,
2337                false,
2338            )
2339            .await
2340            .unwrap();
2341
2342        assert_eq!(summary.kept_id, into.id);
2343        assert_eq!(summary.removed_id, from_id);
2344        assert!(summary.content_appended);
2345        assert!(!summary.dry_run);
2346
2347        // Source is no longer findable.
2348        let from_store = rt.notes(&tok).unwrap();
2349        assert!(
2350            from_store.get_note(from_id).await.unwrap().is_none(),
2351            "merged-from note should be soft-deleted"
2352        );
2353    }
2354
2355    #[tokio::test]
2356    async fn merge_note_different_kinds_rejected() {
2357        let rt = rt();
2358        let tok = NamespaceToken::local();
2359        let into = rt
2360            .create_note(&tok, "observation", None, "Into", None, None, vec![])
2361            .await
2362            .unwrap();
2363        let from = rt
2364            .create_note(&tok, "decision", None, "From", None, None, vec![])
2365            .await
2366            .unwrap();
2367
2368        let result = rt
2369            .merge_note(
2370                &tok,
2371                into.id,
2372                from.id,
2373                EntityDedupMergePolicy::PreferInto,
2374                ContentMergeStrategy::Append,
2375                false,
2376            )
2377            .await;
2378        assert!(result.is_err(), "merging different note kinds must fail");
2379    }
2380
2381    #[tokio::test]
2382    async fn merge_note_dry_run_leaves_notes_unchanged() {
2383        let rt = rt();
2384        let tok = NamespaceToken::local();
2385        let into = rt
2386            .create_note(
2387                &tok,
2388                "observation",
2389                None,
2390                "Into content",
2391                None,
2392                None,
2393                vec![],
2394            )
2395            .await
2396            .unwrap();
2397        let from = rt
2398            .create_note(
2399                &tok,
2400                "observation",
2401                None,
2402                "From content",
2403                None,
2404                None,
2405                vec![],
2406            )
2407            .await
2408            .unwrap();
2409        let into_id = into.id;
2410        let from_id = from.id;
2411
2412        let summary = rt
2413            .merge_note(
2414                &tok,
2415                into_id,
2416                from_id,
2417                EntityDedupMergePolicy::PreferInto,
2418                ContentMergeStrategy::Append,
2419                true,
2420            )
2421            .await
2422            .unwrap();
2423
2424        assert!(summary.dry_run);
2425
2426        // Both notes still exist unchanged.
2427        let store = rt.notes(&tok).unwrap();
2428        let into_after = store.get_note(into_id).await.unwrap().unwrap();
2429        let from_after = store.get_note(from_id).await.unwrap().unwrap();
2430        assert_eq!(
2431            into_after.content, "Into content",
2432            "dry_run must not mutate into-note"
2433        );
2434        assert_eq!(
2435            from_after.content, "From content",
2436            "dry_run must not mutate from-note"
2437        );
2438    }
2439
2440    // Regression: merge two NAMELESS notes with no embedding model configured.
2441    // Before this fix, the raw SQL FTS INSERT bound &merged_name directly — for a
2442    // nameless note that is SQL NULL, while Fts5TextSearch::upsert_document stores
2443    // an empty string.  The mismatch caused get_document to diverge (or fail) for
2444    // nameless merged notes.  After the fix, note_fts_scalars drives every scalar
2445    // and the round-trip is field-identical.
2446    #[tokio::test]
2447    async fn merge_nameless_notes_fts_document_is_parity_correct() {
2448        use khive_storage::types::TextSearchRequest;
2449
2450        let rt = rt(); // in-memory runtime — no embedding model configured
2451        let tok = NamespaceToken::local();
2452
2453        let into = rt
2454            .create_note(
2455                &tok,
2456                "observation",
2457                None,
2458                "intosentinelzxq body",
2459                None,
2460                Some(serde_json::json!({"src": "into"})),
2461                vec![],
2462            )
2463            .await
2464            .expect("create into-note");
2465        let from = rt
2466            .create_note(
2467                &tok,
2468                "observation",
2469                None,
2470                "fromsentinelzxq body",
2471                None,
2472                None,
2473                vec![],
2474            )
2475            .await
2476            .expect("create from-note");
2477
2478        let into_id = into.id;
2479        let from_id = from.id;
2480
2481        rt.merge_note(
2482            &tok,
2483            into_id,
2484            from_id,
2485            EntityDedupMergePolicy::PreferInto,
2486            ContentMergeStrategy::Append,
2487            false,
2488        )
2489        .await
2490        .expect("merge_note must succeed");
2491
2492        // Fetch the merged note from the note store to get its current state.
2493        let note_store = rt.notes(&tok).expect("note store");
2494        let merged_note = note_store
2495            .get_note(into_id)
2496            .await
2497            .expect("get_note")
2498            .expect("merged note must exist");
2499
2500        // Compute the expected FTS document via the shared constructor.
2501        let expected = note_fts_document(&merged_note);
2502
2503        // Fetch the stored FTS document and verify field parity.
2504        let fts = rt.text_for_notes(&tok).expect("FTS store");
2505        let stored = fts
2506            .get_document("local", into_id)
2507            .await
2508            .expect("get_document must not error")
2509            .expect("FTS document must exist after merge");
2510
2511        // Core parity: subject_id, title (must be None, not Some("")), body.
2512        assert_eq!(stored.subject_id, expected.subject_id, "subject_id");
2513        assert_eq!(
2514            stored.title, expected.title,
2515            "title (None for nameless note)"
2516        );
2517        assert_eq!(stored.body, expected.body, "body");
2518        assert_eq!(stored.namespace, expected.namespace, "namespace");
2519        assert_eq!(stored.kind, expected.kind, "kind");
2520
2521        // The merged note is nameless — title must be None, matching the shared path.
2522        assert!(
2523            stored.title.is_none(),
2524            "nameless merged note must have title=None in FTS (was NULL before fix)"
2525        );
2526
2527        // The merged note must be searchable by a unique token from the into-note body.
2528        let hits = fts
2529            .search(TextSearchRequest {
2530                query: "intosentinelzxq".to_string(),
2531                mode: khive_storage::types::TextQueryMode::Plain,
2532                filter: None,
2533                top_k: 10,
2534                snippet_chars: 0,
2535            })
2536            .await
2537            .expect("search");
2538        assert!(
2539            hits.iter().any(|h| h.subject_id == into_id),
2540            "merged note must be searchable by into-note content"
2541        );
2542    }
2543
2544    #[tokio::test]
2545    async fn update_edge_updates_properties() {
2546        use khive_storage::EdgeRelation;
2547        let rt = rt();
2548        let tok = NamespaceToken::local();
2549        let a = rt
2550            .create_entity(&tok, "concept", None, "A", None, None, vec![])
2551            .await
2552            .unwrap();
2553        let b = rt
2554            .create_entity(&tok, "concept", None, "B", None, None, vec![])
2555            .await
2556            .unwrap();
2557        let edge = rt
2558            .link(&tok, a.id, b.id, EdgeRelation::Extends, 0.5, None)
2559            .await
2560            .unwrap();
2561        let edge_id: Uuid = edge.id.into();
2562
2563        let updated = rt
2564            .update_edge(
2565                &tok,
2566                edge_id,
2567                EdgePatch {
2568                    properties: Some(serde_json::json!({"source": "manual"})),
2569                    ..Default::default()
2570                },
2571            )
2572            .await
2573            .unwrap();
2574
2575        assert_eq!(updated.metadata.as_ref().unwrap()["source"], "manual");
2576        assert!((updated.weight - 0.5).abs() < 0.001, "weight unchanged");
2577    }
2578
2579    // scenario-kg-maintenance C1 regression: merge must not crash when both
2580    // entities share a common third-party edge (duplicate triple after rewire).
2581    // Before the fix, the double-ON-CONFLICT INSERT raised a UNIQUE constraint
2582    // error at the SQLite layer and the merge aborted mid-transaction.
2583    #[tokio::test]
2584    async fn merge_entity_survives_shared_edge_to_third_party() {
2585        use khive_storage::EdgeRelation;
2586        let rt = rt();
2587        let tok = NamespaceToken::local();
2588
2589        // Create three entities: A and B will be merged; shared is the common target.
2590        // Use `extends` (concept→concept) which is a valid endpoint combination.
2591        let a = rt
2592            .create_entity(&tok, "concept", None, "A", None, None, vec![])
2593            .await
2594            .unwrap();
2595        let b = rt
2596            .create_entity(&tok, "concept", None, "B", None, None, vec![])
2597            .await
2598            .unwrap();
2599        let shared = rt
2600            .create_entity(&tok, "concept", None, "Shared", None, None, vec![])
2601            .await
2602            .unwrap();
2603
2604        // Both A and B extend the same shared concept — this creates a duplicate
2605        // triple (A/B → shared, extends) that triggers the crash on rewire.
2606        rt.link(&tok, a.id, shared.id, EdgeRelation::Extends, 1.0, None)
2607            .await
2608            .unwrap();
2609        rt.link(&tok, b.id, shared.id, EdgeRelation::Extends, 1.0, None)
2610            .await
2611            .unwrap();
2612
2613        // Before the fix this would return Err with "UNIQUE constraint failed".
2614        let summary = rt
2615            .merge_entity(
2616                &tok,
2617                a.id,
2618                b.id,
2619                crate::EntityDedupMergePolicy::PreferInto,
2620                false,
2621            )
2622            .await
2623            .expect(
2624                "C1: merge must succeed even when both entities share an edge to a third party",
2625            );
2626
2627        assert_eq!(summary.kept_id, a.id);
2628        assert_eq!(summary.removed_id, b.id);
2629        // A already had the Extends edge to shared; when B→shared is rewired to
2630        // A→shared, the ON CONFLICT DO UPDATE refreshes the existing row (clears
2631        // deleted_at, updates weight). rusqlite reports this as 1 change, so
2632        // edges_rewired will be >= 0. The important invariant is that the merge
2633        // did NOT crash and exactly one live edge A→shared remains.
2634
2635        // One live edge A→shared must exist after merge.
2636        let a_edges = rt
2637            .list_edges(
2638                &tok,
2639                crate::EdgeListFilter {
2640                    source_id: Some(a.id),
2641                    target_id: Some(shared.id),
2642                    relations: vec![EdgeRelation::Extends],
2643                    ..Default::default()
2644                },
2645                10,
2646            )
2647            .await
2648            .unwrap();
2649        assert_eq!(
2650            a_edges.len(),
2651            1,
2652            "C1: exactly one live A→shared Extends edge must exist after merge; got: {a_edges:?}"
2653        );
2654
2655        // Tombstone check: B must be soft-deleted after successful merge (C3).
2656        // get_entity filters deleted_at IS NULL, so a tombstoned entity returns None.
2657        let b_after = rt.entities(&tok).unwrap().get_entity(b.id).await.unwrap();
2658        assert!(
2659            b_after.is_none(),
2660            "C3: from_entity must be tombstoned (get_entity returns None for deleted) after merge; got: {b_after:?}"
2661        );
2662    }
2663
2664    // H2 regression: merge_entity at the runtime level must reject cross-kind merges.
2665    // Before the H2 fix, only the pack handler had this guard; a direct runtime caller
2666    // could still merge concept+project, silently tombstoning the source entity.
2667    #[tokio::test]
2668    async fn merge_entity_cross_kind_rejected_at_runtime() {
2669        let rt = rt();
2670        let tok = NamespaceToken::local();
2671
2672        let concept = rt
2673            .create_entity(&tok, "concept", None, "H2Concept", None, None, vec![])
2674            .await
2675            .unwrap();
2676        let project = rt
2677            .create_entity(&tok, "project", None, "H2Project", None, None, vec![])
2678            .await
2679            .unwrap();
2680
2681        // Cross-kind merge must return InvalidInput at the runtime level.
2682        let err = rt
2683            .merge_entity(
2684                &tok,
2685                concept.id,
2686                project.id,
2687                crate::EntityDedupMergePolicy::PreferInto,
2688                false,
2689            )
2690            .await
2691            .expect_err("H2: cross-kind merge must be rejected by runtime");
2692        assert!(
2693            matches!(err, crate::RuntimeError::InvalidInput(_)),
2694            "H2: expected InvalidInput, got: {err:?}"
2695        );
2696
2697        // Both entities must survive the failed merge attempt with no tombstone.
2698        let concept_after = rt.get_entity(&tok, concept.id).await;
2699        let project_after = rt.get_entity(&tok, project.id).await;
2700        assert!(
2701            concept_after.is_ok(),
2702            "H2: concept must remain live after rejected merge; got: {concept_after:?}"
2703        );
2704        assert!(
2705            project_after.is_ok(),
2706            "H2: project must remain live after rejected merge; got: {project_after:?}"
2707        );
2708    }
2709
2710    // scenario-kg-maintenance C2 regression: same-kind merge must succeed.
2711    #[tokio::test]
2712    async fn merge_entity_same_kind_succeeds() {
2713        let rt = rt();
2714        let tok = NamespaceToken::local();
2715
2716        let c1 = rt
2717            .create_entity(&tok, "concept", None, "Concept1", None, None, vec![])
2718            .await
2719            .unwrap();
2720        let c2 = rt
2721            .create_entity(&tok, "concept", None, "Concept2", None, None, vec![])
2722            .await
2723            .unwrap();
2724
2725        let summary = rt
2726            .merge_entity(
2727                &tok,
2728                c1.id,
2729                c2.id,
2730                crate::EntityDedupMergePolicy::PreferInto,
2731                false,
2732            )
2733            .await
2734            .expect("same-kind merge must succeed");
2735        assert_eq!(summary.kept_id, c1.id);
2736        assert_eq!(summary.removed_id, c2.id);
2737
2738        // c2 must be tombstoned.
2739        let c2_after = rt.entities(&tok).unwrap().get_entity(c2.id).await.unwrap();
2740        assert!(c2_after.is_none(), "from_entity must be tombstoned");
2741    }
2742
2743    // ── #567 regression: cross-namespace merge_note must be denied on either ID ──
2744
2745    #[tokio::test]
2746    async fn merge_note_cross_namespace_either_id_returns_not_found() {
2747        use crate::error::RuntimeError;
2748        use crate::Namespace;
2749
2750        let rt = rt();
2751        let ns_a = NamespaceToken::for_namespace(Namespace::parse("ns-a").unwrap());
2752        let ns_b = NamespaceToken::for_namespace(Namespace::parse("ns-b").unwrap());
2753
2754        let into_a = rt
2755            .create_note(&ns_a, "observation", None, "Into A", None, None, vec![])
2756            .await
2757            .unwrap();
2758        let from_a = rt
2759            .create_note(&ns_a, "observation", None, "From A", None, None, vec![])
2760            .await
2761            .unwrap();
2762        let note_b = rt
2763            .create_note(&ns_b, "observation", None, "Note B", None, None, vec![])
2764            .await
2765            .unwrap();
2766
2767        // foreign into_id: note_b belongs to ns_b, caller token is ns_a
2768        let foreign_into = rt
2769            .merge_note(
2770                &ns_a,
2771                note_b.id,
2772                from_a.id,
2773                EntityDedupMergePolicy::PreferInto,
2774                ContentMergeStrategy::Append,
2775                false,
2776            )
2777            .await;
2778        assert!(
2779            matches!(foreign_into, Err(RuntimeError::NotFound(_))),
2780            "foreign into_id must be denied before merge, got {foreign_into:?}"
2781        );
2782
2783        // foreign from_id: note_b belongs to ns_b, caller token is ns_a
2784        let foreign_from = rt
2785            .merge_note(
2786                &ns_a,
2787                into_a.id,
2788                note_b.id,
2789                EntityDedupMergePolicy::PreferInto,
2790                ContentMergeStrategy::Append,
2791                false,
2792            )
2793            .await;
2794        assert!(
2795            matches!(foreign_from, Err(RuntimeError::NotFound(_))),
2796            "foreign from_id must be denied before merge, got {foreign_from:?}"
2797        );
2798    }
2799
2800    // ── #hardening Item 5: cross-namespace entity update/merge must be denied ──
2801
2802    #[tokio::test]
2803    async fn update_entity_cross_namespace_returns_not_found_and_preserves_source() {
2804        use crate::error::RuntimeError;
2805        use crate::Namespace;
2806
2807        let rt = rt();
2808        let ns_a = NamespaceToken::for_namespace(Namespace::parse("ns-a").unwrap());
2809        let ns_b = NamespaceToken::for_namespace(Namespace::parse("ns-b").unwrap());
2810
2811        let entity = rt
2812            .create_entity(
2813                &ns_a,
2814                "concept",
2815                None,
2816                "Alpha",
2817                Some("original"),
2818                None,
2819                vec![],
2820            )
2821            .await
2822            .unwrap();
2823
2824        let err = rt
2825            .update_entity(
2826                &ns_b,
2827                entity.id,
2828                EntityPatch {
2829                    name: Some("Compromised".into()),
2830                    ..Default::default()
2831                },
2832            )
2833            .await;
2834
2835        assert!(
2836            matches!(err, Err(RuntimeError::NotFound(_))),
2837            "cross-namespace update must return opaque NotFound, got {err:?}"
2838        );
2839
2840        let after = rt.get_entity(&ns_a, entity.id).await.unwrap();
2841        assert_eq!(
2842            after.name, "Alpha",
2843            "foreign update must not mutate source row"
2844        );
2845        assert_eq!(after.description.as_deref(), Some("original"));
2846    }
2847
2848    #[tokio::test]
2849    async fn merge_entity_cross_namespace_either_id_returns_not_found() {
2850        use crate::error::RuntimeError;
2851        use crate::Namespace;
2852
2853        let rt = rt();
2854        let ns_a = NamespaceToken::for_namespace(Namespace::parse("ns-a").unwrap());
2855        let ns_b = NamespaceToken::for_namespace(Namespace::parse("ns-b").unwrap());
2856
2857        let into_a = rt
2858            .create_entity(&ns_a, "concept", None, "Into A", None, None, vec![])
2859            .await
2860            .unwrap();
2861        let from_a = rt
2862            .create_entity(&ns_a, "concept", None, "From A", None, None, vec![])
2863            .await
2864            .unwrap();
2865        let foreign_b = rt
2866            .create_entity(&ns_b, "concept", None, "Foreign B", None, None, vec![])
2867            .await
2868            .unwrap();
2869
2870        // foreign into_id: foreign_b belongs to ns_b, caller token is ns_a
2871        let foreign_into = rt
2872            .merge_entity(
2873                &ns_a,
2874                foreign_b.id,
2875                from_a.id,
2876                EntityDedupMergePolicy::PreferInto,
2877                false,
2878            )
2879            .await;
2880        assert!(
2881            matches!(foreign_into, Err(RuntimeError::NotFound(_))),
2882            "foreign into_id must be denied before merge, got {foreign_into:?}"
2883        );
2884
2885        // foreign from_id: foreign_b belongs to ns_b, caller token is ns_a
2886        let foreign_from = rt
2887            .merge_entity(
2888                &ns_a,
2889                into_a.id,
2890                foreign_b.id,
2891                EntityDedupMergePolicy::PreferInto,
2892                false,
2893            )
2894            .await;
2895        assert!(
2896            matches!(foreign_from, Err(RuntimeError::NotFound(_))),
2897            "foreign from_id must be denied before merge, got {foreign_from:?}"
2898        );
2899
2900        // All three entities survive the failed merges.
2901        assert!(rt.get_entity(&ns_a, into_a.id).await.is_ok());
2902        assert!(rt.get_entity(&ns_a, from_a.id).await.is_ok());
2903        assert!(rt.get_entity(&ns_b, foreign_b.id).await.is_ok());
2904    }
2905
2906    // Parity: entity_fts_document must produce the same body/title as the
2907    // create_entity and update_entity FTS write paths.
2908    #[test]
2909    fn entity_fts_document_with_description() {
2910        let mut entity = Entity::new("local", "concept", "MyEntity");
2911        entity = entity.with_description("some description text");
2912        let doc = entity_fts_document(&entity);
2913        assert_eq!(doc.subject_id, entity.id);
2914        assert_eq!(doc.namespace, "local");
2915        assert_eq!(doc.title.as_deref(), Some("MyEntity"));
2916        assert_eq!(doc.body, "MyEntity some description text");
2917        assert_eq!(doc.kind, khive_types::SubstrateKind::Entity);
2918    }
2919
2920    #[test]
2921    fn entity_fts_document_without_description() {
2922        let entity = Entity::new("local", "concept", "NameOnly");
2923        let doc = entity_fts_document(&entity);
2924        assert_eq!(doc.title.as_deref(), Some("NameOnly"));
2925        assert_eq!(doc.body, "NameOnly");
2926    }
2927
2928    #[test]
2929    fn entity_fts_document_empty_description_uses_name_only() {
2930        let mut entity = Entity::new("local", "concept", "TitleOnly");
2931        entity = entity.with_description("");
2932        let doc = entity_fts_document(&entity);
2933        assert_eq!(
2934            doc.body, "TitleOnly",
2935            "empty description must not be appended"
2936        );
2937    }
2938
2939    // Cross-path equality: an entity created through the runtime (operations.rs
2940    // create_entity path) must produce a stored FTS document field-identical to
2941    // entity_fts_document() called on the same Entity.
2942    #[tokio::test]
2943    async fn entity_fts_document_matches_runtime_create_path() {
2944        let rt = rt();
2945        let tok = NamespaceToken::local();
2946
2947        let entity = rt
2948            .create_entity(
2949                &tok,
2950                "concept",
2951                None,
2952                "CrossPathTitle",
2953                Some("cross path description body"),
2954                Some(serde_json::json!({"key": "val"})),
2955                vec!["tag1".to_string()],
2956            )
2957            .await
2958            .expect("create_entity");
2959
2960        let fts = rt.text(&tok).expect("FTS store");
2961        let stored = fts
2962            .get_document("local", entity.id)
2963            .await
2964            .expect("get_document")
2965            .expect("document must exist after create_entity");
2966
2967        let expected = entity_fts_document(&entity);
2968
2969        assert_eq!(stored.subject_id, expected.subject_id, "subject_id");
2970        assert_eq!(stored.kind, expected.kind, "kind");
2971        assert_eq!(stored.title, expected.title, "title");
2972        assert_eq!(stored.body, expected.body, "body");
2973        assert_eq!(stored.namespace, expected.namespace, "namespace");
2974    }
2975
2976    // Cross-path equality: update_entity must produce a stored FTS document
2977    // field-identical to entity_fts_document() on the updated Entity.
2978    #[tokio::test]
2979    async fn entity_fts_document_matches_runtime_update_path() {
2980        let rt = rt();
2981        let tok = NamespaceToken::local();
2982
2983        let entity = rt
2984            .create_entity(
2985                &tok,
2986                "concept",
2987                None,
2988                "OldName",
2989                Some("old desc"),
2990                None,
2991                vec![],
2992            )
2993            .await
2994            .expect("create_entity");
2995
2996        let updated = rt
2997            .update_entity(
2998                &tok,
2999                entity.id,
3000                EntityPatch {
3001                    name: Some("NewName".to_string()),
3002                    description: Some(Some("new desc".to_string())),
3003                    ..Default::default()
3004                },
3005            )
3006            .await
3007            .expect("update_entity");
3008
3009        let fts = rt.text(&tok).expect("FTS store");
3010        let stored = fts
3011            .get_document("local", updated.id)
3012            .await
3013            .expect("get_document")
3014            .expect("document must exist after update_entity");
3015
3016        let expected = entity_fts_document(&updated);
3017
3018        assert_eq!(stored.title, expected.title, "title after update");
3019        assert_eq!(stored.body, expected.body, "body after update");
3020    }
3021
3022    // Cross-path equality: merge_entity must produce a stored FTS document for
3023    // the kept entity that is field-identical to entity_fts_document().
3024    #[tokio::test]
3025    async fn entity_fts_document_matches_runtime_merge_path() {
3026        let rt = rt();
3027        let tok = NamespaceToken::local();
3028
3029        let into_e = rt
3030            .create_entity(
3031                &tok,
3032                "concept",
3033                None,
3034                "IntoEntity",
3035                Some("into desc"),
3036                None,
3037                vec![],
3038            )
3039            .await
3040            .expect("create into");
3041        let from_e = rt
3042            .create_entity(
3043                &tok,
3044                "concept",
3045                None,
3046                "FromEntity",
3047                Some("from desc"),
3048                None,
3049                vec![],
3050            )
3051            .await
3052            .expect("create from");
3053
3054        let summary = rt
3055            .merge_entity(
3056                &tok,
3057                into_e.id,
3058                from_e.id,
3059                EntityDedupMergePolicy::PreferInto,
3060                false,
3061            )
3062            .await
3063            .expect("merge_entity");
3064
3065        let kept = rt
3066            .get_entity(&tok, summary.kept_id)
3067            .await
3068            .expect("get kept");
3069
3070        let fts = rt.text(&tok).expect("FTS store");
3071        let stored = fts
3072            .get_document("local", kept.id)
3073            .await
3074            .expect("get_document")
3075            .expect("FTS document must exist for kept entity after merge");
3076
3077        let expected = entity_fts_document(&kept);
3078
3079        assert_eq!(stored.title, expected.title, "title after merge");
3080        assert_eq!(stored.body, expected.body, "body after merge");
3081    }
3082}