Skip to main content

khive_runtime/
curation.rs

1// Licensed under the Apache License, Version 2.0.
2
3//! Curation operations: entity update/merge and edge-list filter type.
4//!
5//! See ADR-014 for the full specification and semantics.
6
7use std::collections::HashSet;
8
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use uuid::Uuid;
12
13use khive_db::SqliteError;
14use khive_storage::types::{EdgeFilter, TextDocument};
15use khive_storage::{EdgeRelation, Entity, SubstrateKind};
16
17use crate::error::{RuntimeError, RuntimeResult};
18use crate::runtime::KhiveRuntime;
19
20// ---------------------------------------------------------------------------
21// Public types
22// ---------------------------------------------------------------------------
23
24/// Patch for `update_entity`. Only `Some(_)` fields are applied; `None` means "leave unchanged".
25///
26/// For `description`:
27/// - `None` (outer) — leave the current description as-is
28/// - `Some(None)` — clear the description (set to NULL)
29/// - `Some(Some(s))` — set the description to `s`
30///
31/// For `properties` (deep-merge semantics):
32/// - `None` — leave properties as-is
33/// - `Some(value)` — deep-merge `value` into existing properties. Keys present in
34///   the patch overwrite existing keys; keys absent from the patch are preserved.
35///   Removing a key requires explicit replacement of the parent object (or a future
36///   `unset`/`null-marker` extension).
37///
38/// For `tags` — replace semantics: `Some(vec)` sets tags to exactly `vec`. To add
39/// a tag without losing existing tags, read the entity first, push the new tag,
40/// and pass the full list back.
41#[derive(Clone, Debug, Default)]
42pub struct EntityPatch {
43    pub name: Option<String>,
44    pub description: Option<Option<String>>,
45    pub properties: Option<Value>,
46    pub tags: Option<Vec<String>>,
47}
48
49/// Strategy used when merging two entities.
50#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
51#[serde(rename_all = "snake_case")]
52pub enum MergeStrategy {
53    /// `into` values win on conflict. Tags are unioned. Properties from `from` fill in
54    /// keys that `into` doesn't have. This is the default.
55    #[default]
56    PreferInto,
57    /// `from` values win on conflict.
58    PreferFrom,
59    /// Deep-merge: object properties merge recursively. Scalar conflicts go to `into`.
60    Union,
61}
62
63/// Result returned by `merge_entity`.
64#[derive(Clone, Debug, Serialize, Deserialize)]
65pub struct MergeSummary {
66    pub kept_id: Uuid,
67    pub removed_id: Uuid,
68    pub edges_rewired: usize,
69    pub properties_merged: usize,
70    pub tags_unioned: usize,
71}
72
73/// Filter for `list_edges` / `count_edges`.
74#[derive(Clone, Debug, Default)]
75pub struct EdgeListFilter {
76    pub source_id: Option<Uuid>,
77    pub target_id: Option<Uuid>,
78    /// Empty = any relation.
79    pub relations: Vec<EdgeRelation>,
80    pub min_weight: Option<f64>,
81    pub max_weight: Option<f64>,
82}
83
84impl From<EdgeListFilter> for EdgeFilter {
85    fn from(f: EdgeListFilter) -> Self {
86        EdgeFilter {
87            source_ids: f.source_id.into_iter().collect(),
88            target_ids: f.target_id.into_iter().collect(),
89            relations: f.relations,
90            min_weight: f.min_weight,
91            max_weight: f.max_weight,
92            ..Default::default()
93        }
94    }
95}
96
97// ---------------------------------------------------------------------------
98// Implementation
99// ---------------------------------------------------------------------------
100
101impl KhiveRuntime {
102    /// Patch-style entity update.
103    ///
104    /// Only fields set to `Some(_)` are changed. Re-indexes FTS5 (and vectors if configured)
105    /// when `name` or `description` changes; skips re-indexing for property/tag-only patches.
106    ///
107    /// Returns `RuntimeError::NotFound` if the entity does not exist or belongs to a different
108    /// namespace. This enforces ADR-007 namespace isolation at the runtime layer.
109    pub async fn update_entity(
110        &self,
111        namespace: Option<&str>,
112        id: Uuid,
113        patch: EntityPatch,
114    ) -> RuntimeResult<Entity> {
115        let store = self.entities(namespace)?;
116        let mut entity = store
117            .get_entity(id)
118            .await?
119            .ok_or_else(|| RuntimeError::NotFound(format!("entity {id}")))?;
120
121        if entity.namespace != self.ns(namespace) {
122            return Err(RuntimeError::NotFound(format!("entity {id}")));
123        }
124
125        let mut text_changed = false;
126
127        if let Some(name) = patch.name {
128            text_changed |= entity.name != name;
129            entity.name = name;
130        }
131        if let Some(desc_patch) = patch.description {
132            text_changed |= entity.description != desc_patch;
133            entity.description = desc_patch;
134        }
135        if let Some(props) = patch.properties {
136            let (merged, _) =
137                merge_properties(&entity.properties, &Some(props), MergeStrategy::PreferFrom);
138            entity.properties = merged;
139        }
140        if let Some(tags) = patch.tags {
141            entity.tags = tags;
142        }
143
144        entity.updated_at = chrono::Utc::now().timestamp_micros();
145        store.upsert_entity(entity.clone()).await?;
146
147        if text_changed {
148            self.reindex_entity(namespace, &entity).await?;
149        }
150
151        Ok(entity)
152    }
153
154    /// Merge `from_id` into `into_id`.
155    ///
156    /// All edges incident to `from_id` are rewired to `into_id`. Self-loops that would
157    /// result from the rewire are dropped. Properties and tags are merged per `strategy`.
158    /// `from_id` is hard-deleted and removed from indexes. Returns a summary.
159    ///
160    /// Atomic: all SQL (entity reads/writes, edge rewires, FTS updates, vec-index delete)
161    /// runs on a single pool connection inside one `BEGIN IMMEDIATE` transaction via
162    /// `merge_entity_sql`. If embedding vectors are configured, the vector re-insert for
163    /// `into_id` is performed after the transaction (requires async embedding computation).
164    pub async fn merge_entity(
165        &self,
166        namespace: Option<&str>,
167        into_id: Uuid,
168        from_id: Uuid,
169        strategy: MergeStrategy,
170    ) -> RuntimeResult<MergeSummary> {
171        let ns = self.ns(namespace).to_string();
172        let sanitized_ns: String = ns
173            .chars()
174            .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
175            .collect();
176        let fts_table = format!("fts_entities_{}", sanitized_ns);
177        let vec_table = self.config().embedding_model.map(|model| {
178            let key: String = model
179                .to_string()
180                .chars()
181                .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
182                .collect();
183            format!("vec_{}", key)
184        });
185
186        // Ensure all required tables exist before entering the transaction.
187        // Each accessor applies its DDL idempotently via `CREATE TABLE IF NOT EXISTS`.
188        let _ = self.entities(namespace)?;
189        let _ = self.graph(namespace)?;
190        let _ = self.text(namespace)?;
191        if self.config().embedding_model.is_some() {
192            let _ = self.vectors(namespace)?;
193        }
194
195        let pool = self.backend().pool_arc();
196
197        let (summary, updated_entity) = tokio::task::spawn_blocking(move || {
198            let guard = pool.writer()?;
199            guard.transaction(|conn| {
200                merge_entity_sql(conn, ns, fts_table, vec_table, into_id, from_id, strategy)
201            })
202        })
203        .await
204        .map_err(|e| RuntimeError::Internal(e.to_string()))??;
205
206        // If vectors are configured, reindex into_entity (requires async embedding).
207        // FTS and vec-delete were already committed inside the transaction above.
208        if self.config().embedding_model.is_some() {
209            self.reindex_entity(namespace, &updated_entity).await?;
210        }
211
212        Ok(summary)
213    }
214
215    // ---- Internal helpers ----
216
217    /// Re-upsert FTS5 document (and vector if model configured) for the entity.
218    ///
219    /// Uses `entity.namespace` — the authoritative namespace stored on the record — rather
220    /// than the caller-supplied `namespace` parameter. This prevents a cross-namespace
221    /// reindex from writing the search document into the wrong namespace's FTS index.
222    pub(crate) async fn reindex_entity(
223        &self,
224        namespace: Option<&str>,
225        entity: &Entity,
226    ) -> RuntimeResult<()> {
227        let body = match &entity.description {
228            Some(d) if !d.is_empty() => format!("{} {}", entity.name, d),
229            _ => entity.name.clone(),
230        };
231        // Use entity.namespace (authoritative) rather than self.ns(namespace) (caller claim).
232        let ns = entity.namespace.clone();
233        self.text(namespace)?
234            .upsert_document(TextDocument {
235                subject_id: entity.id,
236                kind: SubstrateKind::Entity,
237                title: Some(entity.name.clone()),
238                body: body.clone(),
239                tags: entity.tags.clone(),
240                namespace: ns.clone(),
241                metadata: entity.properties.clone(),
242                updated_at: chrono::Utc::now(),
243            })
244            .await?;
245
246        if self.config().embedding_model.is_some() {
247            let vector = self.embed(&body).await?;
248            self.vectors(namespace)?
249                .insert(entity.id, SubstrateKind::Entity, &ns, vector)
250                .await?;
251        }
252
253        Ok(())
254    }
255
256    /// Remove an entity from FTS5 and (if configured) vector indexes.
257    pub(crate) async fn remove_from_indexes(
258        &self,
259        namespace: Option<&str>,
260        id: Uuid,
261    ) -> RuntimeResult<()> {
262        let ns = self.ns(namespace).to_string();
263        self.text(namespace)?.delete_document(&ns, id).await?;
264        if self.config().embedding_model.is_some() {
265            self.vectors(namespace)?.delete(id).await?;
266        }
267        Ok(())
268    }
269}
270
271// ---------------------------------------------------------------------------
272// Transactional merge SQL helpers
273// ---------------------------------------------------------------------------
274
275/// Read one entity row by ID within a namespace, returning `SqliteError` on missing/wrong-ns.
276fn read_merge_entity(
277    conn: &rusqlite::Connection,
278    id: Uuid,
279    namespace: &str,
280) -> Result<Entity, SqliteError> {
281    let id_str = id.to_string();
282    let mut stmt = conn.prepare(
283        "SELECT id, namespace, kind, name, description, properties, tags, \
284         created_at, updated_at, deleted_at \
285         FROM entities WHERE id = ?1 AND deleted_at IS NULL",
286    )?;
287    let mut rows = stmt.query(rusqlite::params![id_str])?;
288    let row = rows
289        .next()?
290        .ok_or_else(|| SqliteError::InvalidData(format!("entity {id} not found")))?;
291
292    let id_s: String = row.get(0)?;
293    let ns: String = row.get(1)?;
294    let kind: String = row.get(2)?;
295    let name: String = row.get(3)?;
296    let description: Option<String> = row.get(4)?;
297    let properties_str: Option<String> = row.get(5)?;
298    let tags_str: String = row.get(6)?;
299    let created_at: i64 = row.get(7)?;
300    let updated_at: i64 = row.get(8)?;
301    let deleted_at: Option<i64> = row.get(9)?;
302
303    if ns != namespace {
304        return Err(SqliteError::InvalidData(format!(
305            "entity {id} belongs to namespace '{ns}', not '{namespace}'"
306        )));
307    }
308
309    let entity_id = Uuid::parse_str(&id_s).map_err(|e| SqliteError::InvalidData(e.to_string()))?;
310    let properties: Option<Value> = properties_str
311        .map(|s| {
312            serde_json::from_str::<Value>(&s).map_err(|e| SqliteError::InvalidData(e.to_string()))
313        })
314        .transpose()?;
315    let tags: Vec<String> =
316        serde_json::from_str(&tags_str).map_err(|e| SqliteError::InvalidData(e.to_string()))?;
317
318    Ok(Entity {
319        id: entity_id,
320        namespace: ns,
321        kind,
322        name,
323        description,
324        properties,
325        tags,
326        created_at,
327        updated_at,
328        deleted_at,
329    })
330}
331
332/// All merge SQL on one connection inside an already-open `BEGIN IMMEDIATE` transaction.
333///
334/// Reads both entities, rewires/drops incident edges, merges entity fields, updates FTS,
335/// deletes the `from` vec entry (if `vec_table` is Some), and hard-deletes `from` from
336/// entities.  Returns the updated `into` entity so the caller can do the async vec re-insert.
337fn merge_entity_sql(
338    conn: &rusqlite::Connection,
339    namespace: String,
340    fts_table: String,
341    vec_table: Option<String>,
342    into_id: Uuid,
343    from_id: Uuid,
344    strategy: MergeStrategy,
345) -> Result<(MergeSummary, Entity), SqliteError> {
346    let into_entity = read_merge_entity(conn, into_id, &namespace)?;
347    let from_entity = read_merge_entity(conn, from_id, &namespace)?;
348
349    // --- Collect edges incident to from_id ---
350    struct EdgeRow {
351        id: Uuid,
352        source_id: Uuid,
353        target_id: Uuid,
354        relation: String,
355        weight: f64,
356        created_at: i64,
357        metadata: Option<String>,
358    }
359
360    let parse_id =
361        |s: String| Uuid::parse_str(&s).map_err(|e| SqliteError::InvalidData(e.to_string()));
362
363    let from_str = from_id.to_string();
364
365    let mut outbound: Vec<EdgeRow> = Vec::new();
366    {
367        let mut stmt = conn.prepare(
368            "SELECT id, source_id, target_id, relation, weight, created_at, metadata \
369             FROM graph_edges WHERE namespace = ?1 AND source_id = ?2",
370        )?;
371        let mut rows = stmt.query(rusqlite::params![&namespace, &from_str])?;
372        while let Some(row) = rows.next()? {
373            outbound.push(EdgeRow {
374                id: parse_id(row.get(0)?)?,
375                source_id: parse_id(row.get(1)?)?,
376                target_id: parse_id(row.get(2)?)?,
377                relation: row.get(3)?,
378                weight: row.get(4)?,
379                created_at: row.get(5)?,
380                metadata: row.get(6)?,
381            });
382        }
383    }
384
385    let mut inbound: Vec<EdgeRow> = Vec::new();
386    {
387        let mut stmt = conn.prepare(
388            "SELECT id, source_id, target_id, relation, weight, created_at, metadata \
389             FROM graph_edges WHERE namespace = ?1 AND target_id = ?2",
390        )?;
391        let mut rows = stmt.query(rusqlite::params![&namespace, &from_str])?;
392        while let Some(row) = rows.next()? {
393            inbound.push(EdgeRow {
394                id: parse_id(row.get(0)?)?,
395                source_id: parse_id(row.get(1)?)?,
396                target_id: parse_id(row.get(2)?)?,
397                relation: row.get(3)?,
398                weight: row.get(4)?,
399                created_at: row.get(5)?,
400                metadata: row.get(6)?,
401            });
402        }
403    }
404
405    // Deduplicate by edge ID (a self-edge from_id→from_id appears in both lists).
406    let mut seen: HashSet<Uuid> = HashSet::new();
407    let mut all_edges: Vec<EdgeRow> = Vec::new();
408    for edge in outbound.into_iter().chain(inbound) {
409        if seen.insert(edge.id) {
410            all_edges.push(edge);
411        }
412    }
413
414    // --- Rewire edges ---
415    let mut edges_rewired = 0usize;
416    for edge in all_edges {
417        let new_src = if edge.source_id == from_id {
418            into_id
419        } else {
420            edge.source_id
421        };
422        let new_tgt = if edge.target_id == from_id {
423            into_id
424        } else {
425            edge.target_id
426        };
427
428        if new_src == new_tgt {
429            conn.execute(
430                "DELETE FROM graph_edges WHERE namespace = ?1 AND id = ?2",
431                rusqlite::params![&namespace, edge.id.to_string()],
432            )?;
433            continue;
434        }
435
436        conn.execute(
437            "INSERT INTO graph_edges \
438             (namespace, id, source_id, target_id, relation, weight, created_at, metadata) \
439             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8) \
440             ON CONFLICT(namespace, id) DO UPDATE SET \
441                 source_id = excluded.source_id, \
442                 target_id = excluded.target_id, \
443                 relation = excluded.relation, \
444                 weight = excluded.weight, \
445                 created_at = excluded.created_at, \
446                 metadata = excluded.metadata \
447             ON CONFLICT(namespace, source_id, target_id, relation) DO NOTHING",
448            rusqlite::params![
449                &namespace,
450                edge.id.to_string(),
451                new_src.to_string(),
452                new_tgt.to_string(),
453                &edge.relation,
454                edge.weight,
455                edge.created_at,
456                edge.metadata,
457            ],
458        )?;
459        edges_rewired += 1;
460    }
461
462    // --- Merge entity fields ---
463    let (merged_props, properties_merged) =
464        merge_properties(&into_entity.properties, &from_entity.properties, strategy);
465    let merged_name = merge_string_field(&into_entity.name, &from_entity.name, strategy);
466    let merged_description =
467        merge_option_string_field(&into_entity.description, &from_entity.description, strategy);
468    let (merged_tags, tags_unioned) = union_tags(&into_entity.tags, &from_entity.tags);
469
470    let now = chrono::Utc::now().timestamp_micros();
471    let into_str = into_id.to_string();
472    let props_str = merged_props
473        .as_ref()
474        .map(|v| serde_json::to_string(v).unwrap_or_default());
475    let tags_json = serde_json::to_string(&merged_tags).unwrap_or_else(|_| "[]".to_string());
476
477    // --- Upsert merged entity ---
478    conn.execute(
479        "INSERT OR REPLACE INTO entities \
480         (id, namespace, kind, name, description, properties, tags, \
481          created_at, updated_at, deleted_at) \
482         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
483        rusqlite::params![
484            &into_str,
485            &namespace,
486            &into_entity.kind,
487            &merged_name,
488            &merged_description,
489            &props_str,
490            &tags_json,
491            into_entity.created_at,
492            now,
493            into_entity.deleted_at,
494        ],
495    )?;
496
497    // --- Reindex into_id in FTS (delete existing, insert updated) ---
498    let fts_body = match &merged_description {
499        Some(d) if !d.is_empty() => format!("{} {}", merged_name, d),
500        _ => merged_name.clone(),
501    };
502    let kind_str = SubstrateKind::Entity.to_string();
503
504    conn.execute(
505        &format!(
506            "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
507            fts_table
508        ),
509        rusqlite::params![&namespace, &into_str],
510    )?;
511    conn.execute(
512        &format!(
513            "INSERT INTO {} \
514             (subject_id, kind, title, body, tags, namespace, metadata, updated_at) \
515             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
516            fts_table
517        ),
518        rusqlite::params![
519            &into_str,
520            &kind_str,
521            &merged_name,
522            &fts_body,
523            &tags_json,
524            &namespace,
525            &props_str,
526            now,
527        ],
528    )?;
529
530    // --- Delete from_id from FTS ---
531    conn.execute(
532        &format!(
533            "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
534            fts_table
535        ),
536        rusqlite::params![&namespace, &from_str],
537    )?;
538
539    // --- Delete from_id from vector index if configured ---
540    if let Some(ref vec_tbl) = vec_table {
541        conn.execute(
542            &format!(
543                "DELETE FROM {} WHERE subject_id = ?1 AND namespace = ?2",
544                vec_tbl
545            ),
546            rusqlite::params![&from_str, &namespace],
547        )?;
548    }
549
550    // --- Hard-delete from entity ---
551    conn.execute(
552        "DELETE FROM entities WHERE id = ?1",
553        rusqlite::params![&from_str],
554    )?;
555
556    let updated_entity = Entity {
557        id: into_id,
558        namespace,
559        kind: into_entity.kind,
560        name: merged_name,
561        description: merged_description,
562        properties: merged_props,
563        tags: merged_tags,
564        created_at: into_entity.created_at,
565        updated_at: now,
566        deleted_at: into_entity.deleted_at,
567    };
568
569    Ok((
570        MergeSummary {
571            kept_id: into_id,
572            removed_id: from_id,
573            edges_rewired,
574            properties_merged,
575            tags_unioned,
576        },
577        updated_entity,
578    ))
579}
580
581// ---------------------------------------------------------------------------
582// Merge helpers (pure functions — easier to unit test)
583// ---------------------------------------------------------------------------
584
585fn merge_string_field(into: &str, from: &str, strategy: MergeStrategy) -> String {
586    match strategy {
587        MergeStrategy::PreferInto | MergeStrategy::Union => into.to_string(),
588        MergeStrategy::PreferFrom => from.to_string(),
589    }
590}
591
592fn merge_option_string_field(
593    into: &Option<String>,
594    from: &Option<String>,
595    strategy: MergeStrategy,
596) -> Option<String> {
597    match strategy {
598        MergeStrategy::PreferInto => {
599            if into.is_some() {
600                into.clone()
601            } else {
602                from.clone()
603            }
604        }
605        MergeStrategy::PreferFrom => {
606            if from.is_some() {
607                from.clone()
608            } else {
609                into.clone()
610            }
611        }
612        MergeStrategy::Union => {
613            // Keep into's description; if empty, append from's.
614            match (into, from) {
615                (Some(a), _) if !a.is_empty() => Some(a.clone()),
616                (_, Some(b)) => Some(b.clone()),
617                _ => None,
618            }
619        }
620    }
621}
622
623/// Merge two property objects. Returns (merged, count_of_fields_from_from_that_were_added).
624fn merge_properties(
625    into: &Option<Value>,
626    from: &Option<Value>,
627    strategy: MergeStrategy,
628) -> (Option<Value>, usize) {
629    match (into, from) {
630        (None, None) => (None, 0),
631        (Some(a), None) => (Some(a.clone()), 0),
632        (None, Some(b)) => {
633            let count = if let Value::Object(m) = b { m.len() } else { 1 };
634            (Some(b.clone()), count)
635        }
636        (Some(into_val), Some(from_val)) => {
637            let (merged, added) = merge_json(into_val, from_val, strategy);
638            (Some(merged), added)
639        }
640    }
641}
642
643/// Deep-merge two JSON values per strategy. Returns (merged, keys_contributed_by_from).
644fn merge_json(into: &Value, from: &Value, strategy: MergeStrategy) -> (Value, usize) {
645    match (into, from, strategy) {
646        (Value::Object(a), Value::Object(b), MergeStrategy::Union) => {
647            let mut result = a.clone();
648            let mut added = 0usize;
649            for (k, v_from) in b {
650                if let Some(v_into) = a.get(k) {
651                    let (merged, sub_added) = merge_json(v_into, v_from, MergeStrategy::Union);
652                    result.insert(k.clone(), merged);
653                    added += sub_added;
654                } else {
655                    result.insert(k.clone(), v_from.clone());
656                    added += 1;
657                }
658            }
659            (Value::Object(result), added)
660        }
661        (Value::Object(a), Value::Object(b), MergeStrategy::PreferInto) => {
662            let mut result = a.clone();
663            let mut added = 0usize;
664            for (k, v) in b {
665                if !a.contains_key(k) {
666                    result.insert(k.clone(), v.clone());
667                    added += 1;
668                }
669            }
670            (Value::Object(result), added)
671        }
672        (Value::Object(a), Value::Object(b), MergeStrategy::PreferFrom) => {
673            let mut result = a.clone();
674            let mut added = 0usize;
675            for (k, v) in b {
676                result.insert(k.clone(), v.clone());
677                if !a.contains_key(k) {
678                    added += 1;
679                }
680            }
681            (Value::Object(result), added)
682        }
683        // Non-object scalars: apply strategy directly.
684        (_into_val, from_val, MergeStrategy::PreferFrom) => (from_val.clone(), 1),
685        _ => (into.clone(), 0),
686    }
687}
688
689fn union_tags(into: &[String], from: &[String]) -> (Vec<String>, usize) {
690    let mut seen: HashSet<&str> = into.iter().map(|s| s.as_str()).collect();
691    let mut result: Vec<String> = into.to_vec();
692    let mut added = 0usize;
693    for tag in from {
694        if seen.insert(tag.as_str()) {
695            result.push(tag.clone());
696            added += 1;
697        }
698    }
699    (result, added)
700}
701
702// ---------------------------------------------------------------------------
703// Unit tests
704// ---------------------------------------------------------------------------
705
706#[cfg(test)]
707mod tests {
708    use super::*;
709    use crate::runtime::KhiveRuntime;
710    use khive_storage::types::{Direction, TextFilter, TextQueryMode, TextSearchRequest};
711
712    fn rt() -> KhiveRuntime {
713        KhiveRuntime::memory().unwrap()
714    }
715
716    // Helper: search FTS5 for `query` in a runtime namespace.
717    async fn fts_hit(rt: &KhiveRuntime, namespace: Option<&str>, query: &str) -> Vec<Uuid> {
718        let ns = rt.ns(namespace).to_string();
719        rt.text(namespace)
720            .unwrap()
721            .search(TextSearchRequest {
722                query: query.to_string(),
723                mode: TextQueryMode::Plain,
724                filter: Some(TextFilter {
725                    namespaces: vec![ns],
726                    ..Default::default()
727                }),
728                top_k: 50,
729                snippet_chars: 100,
730            })
731            .await
732            .unwrap()
733            .into_iter()
734            .map(|h| h.subject_id)
735            .collect()
736    }
737
738    #[tokio::test]
739    async fn update_entity_patch_changes_only_specified_fields() {
740        let rt = rt();
741        let entity = rt
742            .create_entity(
743                None,
744                "concept",
745                "OriginalName",
746                Some("orig desc"),
747                Some(serde_json::json!({"k":"v"})),
748                vec![],
749            )
750            .await
751            .unwrap();
752
753        let updated = rt
754            .update_entity(
755                None,
756                entity.id,
757                EntityPatch {
758                    description: Some(Some("new desc".to_string())),
759                    ..Default::default()
760                },
761            )
762            .await
763            .unwrap();
764
765        assert_eq!(updated.name, "OriginalName");
766        assert_eq!(updated.description.as_deref(), Some("new desc"));
767        assert_eq!(updated.properties, Some(serde_json::json!({"k":"v"})));
768    }
769
770    #[tokio::test]
771    async fn update_entity_clear_description_with_some_none() {
772        let rt = rt();
773        let entity = rt
774            .create_entity(
775                None,
776                "concept",
777                "ClearDesc",
778                Some("has description"),
779                None,
780                vec![],
781            )
782            .await
783            .unwrap();
784
785        let updated = rt
786            .update_entity(
787                None,
788                entity.id,
789                EntityPatch {
790                    description: Some(None),
791                    ..Default::default()
792                },
793            )
794            .await
795            .unwrap();
796
797        assert!(
798            updated.description.is_none(),
799            "description should be cleared"
800        );
801    }
802
803    #[tokio::test]
804    async fn update_entity_reindexes_when_name_changes() {
805        let rt = rt();
806        let entity = rt
807            .create_entity(None, "concept", "OldName", None, None, vec![])
808            .await
809            .unwrap();
810
811        // Old name is findable.
812        let hits_before = fts_hit(&rt, None, "OldName").await;
813        assert!(
814            hits_before.contains(&entity.id),
815            "entity should be findable by old name"
816        );
817
818        rt.update_entity(
819            None,
820            entity.id,
821            EntityPatch {
822                name: Some("NewName".to_string()),
823                ..Default::default()
824            },
825        )
826        .await
827        .unwrap();
828
829        let hits_old = fts_hit(&rt, None, "OldName").await;
830        let hits_new = fts_hit(&rt, None, "NewName").await;
831
832        // After rename, old name no longer matches this entity (FTS index updated).
833        assert!(
834            !hits_old.contains(&entity.id),
835            "old name should no longer match after rename"
836        );
837        assert!(
838            hits_new.contains(&entity.id),
839            "new name should be findable after rename"
840        );
841    }
842
843    #[tokio::test]
844    async fn update_entity_properties_merges_preserving_existing_keys() {
845        let rt = rt();
846        let entity = rt
847            .create_entity(
848                None,
849                "concept",
850                "MergeProps",
851                None,
852                Some(serde_json::json!({
853                    "domain": "inference",
854                    "repo": "lattice",
855                    "status": "researched",
856                })),
857                vec![],
858            )
859            .await
860            .unwrap();
861
862        let updated = rt
863            .update_entity(
864                None,
865                entity.id,
866                EntityPatch {
867                    properties: Some(serde_json::json!({"status": "implemented"})),
868                    ..Default::default()
869                },
870            )
871            .await
872            .unwrap();
873
874        let props = updated.properties.expect("properties should remain set");
875        assert_eq!(props["domain"], "inference", "domain key must be preserved");
876        assert_eq!(props["repo"], "lattice", "repo key must be preserved");
877        assert_eq!(
878            props["status"], "implemented",
879            "status key must be updated by patch"
880        );
881    }
882
883    #[tokio::test]
884    async fn update_entity_skips_reindex_when_only_properties_change() {
885        let rt = rt();
886        let entity = rt
887            .create_entity(None, "concept", "StableIndexed", None, None, vec![])
888            .await
889            .unwrap();
890
891        // Verify it's in the index before.
892        let hits_before = fts_hit(&rt, None, "StableIndexed").await;
893        assert!(hits_before.contains(&entity.id));
894
895        // Only patch properties — text index should be untouched (still findable).
896        rt.update_entity(
897            None,
898            entity.id,
899            EntityPatch {
900                properties: Some(serde_json::json!({"new": "prop"})),
901                ..Default::default()
902            },
903        )
904        .await
905        .unwrap();
906
907        let hits_after = fts_hit(&rt, None, "StableIndexed").await;
908        assert!(
909            hits_after.contains(&entity.id),
910            "still findable after props-only patch"
911        );
912    }
913
914    #[tokio::test]
915    async fn merge_entity_rewires_edges() {
916        let rt = rt();
917        let a = rt
918            .create_entity(None, "concept", "A", None, None, vec![])
919            .await
920            .unwrap();
921        let b = rt
922            .create_entity(None, "concept", "B", None, None, vec![])
923            .await
924            .unwrap();
925        let c = rt
926            .create_entity(None, "concept", "C", None, None, vec![])
927            .await
928            .unwrap();
929        let d = rt
930            .create_entity(None, "concept", "D", None, None, vec![])
931            .await
932            .unwrap();
933
934        // A→B and C→B; merge B into D → should become A→D and C→D.
935        rt.link(None, a.id, b.id, EdgeRelation::Extends, 1.0)
936            .await
937            .unwrap();
938        rt.link(None, c.id, b.id, EdgeRelation::Extends, 1.0)
939            .await
940            .unwrap();
941
942        let summary = rt
943            .merge_entity(None, d.id, b.id, MergeStrategy::PreferInto)
944            .await
945            .unwrap();
946
947        assert_eq!(summary.kept_id, d.id);
948        assert_eq!(summary.removed_id, b.id);
949        assert_eq!(summary.edges_rewired, 2);
950
951        // Verify edges now point to D.
952        let a_neighbors = rt
953            .neighbors(None, a.id, Direction::Out, None, None)
954            .await
955            .unwrap();
956        assert_eq!(a_neighbors.len(), 1);
957        assert_eq!(a_neighbors[0].node_id, d.id);
958
959        let c_neighbors = rt
960            .neighbors(None, c.id, Direction::Out, None, None)
961            .await
962            .unwrap();
963        assert_eq!(c_neighbors.len(), 1);
964        assert_eq!(c_neighbors[0].node_id, d.id);
965    }
966
967    #[tokio::test]
968    async fn merge_entity_prefer_into_strategy() {
969        let rt = rt();
970        let into = rt
971            .create_entity(
972                None,
973                "concept",
974                "Into",
975                None,
976                Some(serde_json::json!({"a": 1})),
977                vec![],
978            )
979            .await
980            .unwrap();
981        let from = rt
982            .create_entity(
983                None,
984                "concept",
985                "From",
986                None,
987                Some(serde_json::json!({"a": 2, "b": 3})),
988                vec![],
989            )
990            .await
991            .unwrap();
992
993        rt.merge_entity(None, into.id, from.id, MergeStrategy::PreferInto)
994            .await
995            .unwrap();
996
997        let kept = rt.get_entity(None, into.id).await.unwrap().unwrap();
998        let props = kept.properties.unwrap();
999        // a stays as 1 (into wins), b is added from from.
1000        assert_eq!(props["a"], 1);
1001        assert_eq!(props["b"], 3);
1002    }
1003
1004    #[tokio::test]
1005    async fn merge_entity_prefer_from_strategy() {
1006        let rt = rt();
1007        let into = rt
1008            .create_entity(
1009                None,
1010                "concept",
1011                "Into",
1012                None,
1013                Some(serde_json::json!({"a": 1})),
1014                vec![],
1015            )
1016            .await
1017            .unwrap();
1018        let from = rt
1019            .create_entity(
1020                None,
1021                "concept",
1022                "From",
1023                None,
1024                Some(serde_json::json!({"a": 2, "b": 3})),
1025                vec![],
1026            )
1027            .await
1028            .unwrap();
1029
1030        rt.merge_entity(None, into.id, from.id, MergeStrategy::PreferFrom)
1031            .await
1032            .unwrap();
1033
1034        let kept = rt.get_entity(None, into.id).await.unwrap().unwrap();
1035        let props = kept.properties.unwrap();
1036        // from wins on a, b also from from.
1037        assert_eq!(props["a"], 2);
1038        assert_eq!(props["b"], 3);
1039    }
1040
1041    #[tokio::test]
1042    async fn merge_entity_union_strategy() {
1043        let rt = rt();
1044        let into = rt
1045            .create_entity(
1046                None,
1047                "concept",
1048                "Into",
1049                None,
1050                Some(serde_json::json!({"a": 1})),
1051                vec![],
1052            )
1053            .await
1054            .unwrap();
1055        let from = rt
1056            .create_entity(
1057                None,
1058                "concept",
1059                "From",
1060                None,
1061                Some(serde_json::json!({"a": 2, "b": 3})),
1062                vec![],
1063            )
1064            .await
1065            .unwrap();
1066
1067        rt.merge_entity(None, into.id, from.id, MergeStrategy::Union)
1068            .await
1069            .unwrap();
1070
1071        let kept = rt.get_entity(None, into.id).await.unwrap().unwrap();
1072        let props = kept.properties.unwrap();
1073        // Scalar conflict: into wins → a=1. b added from from.
1074        assert_eq!(props["a"], 1);
1075        assert_eq!(props["b"], 3);
1076    }
1077
1078    #[tokio::test]
1079    async fn merge_entity_unions_tags() {
1080        let rt = rt();
1081        let into = rt
1082            .create_entity(
1083                None,
1084                "concept",
1085                "Into",
1086                None,
1087                None,
1088                vec!["x".to_string(), "y".to_string()],
1089            )
1090            .await
1091            .unwrap();
1092        let from = rt
1093            .create_entity(
1094                None,
1095                "concept",
1096                "From",
1097                None,
1098                None,
1099                vec!["y".to_string(), "z".to_string()],
1100            )
1101            .await
1102            .unwrap();
1103
1104        rt.merge_entity(None, into.id, from.id, MergeStrategy::PreferInto)
1105            .await
1106            .unwrap();
1107
1108        let kept = rt.get_entity(None, into.id).await.unwrap().unwrap();
1109        let mut tags = kept.tags.clone();
1110        tags.sort();
1111        assert_eq!(tags, vec!["x", "y", "z"]);
1112    }
1113
1114    #[tokio::test]
1115    async fn merge_entity_drops_self_loops() {
1116        let rt = rt();
1117        let a = rt
1118            .create_entity(None, "concept", "A", None, None, vec![])
1119            .await
1120            .unwrap();
1121        let b = rt
1122            .create_entity(None, "concept", "B", None, None, vec![])
1123            .await
1124            .unwrap();
1125
1126        // A `extends` B — merging B into A would produce A `extends` A → drop it.
1127        rt.link(None, a.id, b.id, EdgeRelation::Extends, 1.0)
1128            .await
1129            .unwrap();
1130
1131        let summary = rt
1132            .merge_entity(None, a.id, b.id, MergeStrategy::PreferInto)
1133            .await
1134            .unwrap();
1135
1136        assert_eq!(
1137            summary.edges_rewired, 0,
1138            "self-loop should be dropped, not rewired"
1139        );
1140
1141        let a_out = rt
1142            .neighbors(None, a.id, Direction::Out, None, None)
1143            .await
1144            .unwrap();
1145        assert!(a_out.is_empty(), "no self-loop should remain");
1146    }
1147
1148    // ---- merge helper unit tests ----
1149
1150    #[test]
1151    fn union_tags_deduplicates() {
1152        let (tags, added) = union_tags(
1153            &["x".to_string(), "y".to_string()],
1154            &["y".to_string(), "z".to_string()],
1155        );
1156        let mut sorted = tags.clone();
1157        sorted.sort();
1158        assert_eq!(sorted, vec!["x", "y", "z"]);
1159        assert_eq!(added, 1);
1160    }
1161
1162    #[test]
1163    fn merge_properties_prefer_into_fills_missing_keys() {
1164        let a = serde_json::json!({"a": 1});
1165        let b = serde_json::json!({"a": 99, "b": 2});
1166        let (merged, added) = merge_properties(&Some(a), &Some(b), MergeStrategy::PreferInto);
1167        let m = merged.unwrap();
1168        assert_eq!(m["a"], 1);
1169        assert_eq!(m["b"], 2);
1170        assert_eq!(added, 1);
1171    }
1172}