1use std::collections::HashSet;
8
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use uuid::Uuid;
12
13use khive_db::SqliteError;
14use khive_storage::types::{EdgeFilter, TextDocument};
15use khive_storage::{EdgeRelation, Entity, SubstrateKind};
16use khive_types::EventKind;
17
18use crate::error::{RuntimeError, RuntimeResult};
19use crate::runtime::{KhiveRuntime, NamespaceToken};
20
21#[derive(Clone, Debug, Default)]
43pub struct EntityPatch {
44 pub name: Option<String>,
45 pub description: Option<Option<String>>,
46 pub properties: Option<Value>,
47 pub tags: Option<Vec<String>>,
48}
49
50#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
52#[serde(rename_all = "snake_case")]
53pub enum EntityDedupMergePolicy {
54 #[default]
57 PreferInto,
58 PreferFrom,
60 Union,
62}
63
64#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
66#[serde(rename_all = "snake_case")]
67pub enum ContentMergeStrategy {
68 #[default]
69 Append,
70 PreferInto,
71 PreferFrom,
72}
73
74#[derive(Clone, Debug, Serialize, Deserialize)]
76pub struct MergeSummary {
77 pub kept_id: Uuid,
78 pub removed_id: Uuid,
79 pub edges_rewired: usize,
80 pub properties_merged: usize,
81 pub tags_unioned: usize,
82 pub content_appended: bool,
83 pub dry_run: bool,
84}
85
86#[derive(Clone, Debug, Default)]
91pub struct EdgePatch {
92 pub relation: Option<EdgeRelation>,
93 pub weight: Option<f64>,
94 pub properties: Option<Value>,
95}
96
97#[derive(Clone, Debug, Default)]
104pub struct NotePatch {
105 pub name: Option<Option<String>>,
106 pub content: Option<String>,
107 pub salience: Option<Option<f64>>,
108 pub decay_factor: Option<Option<f64>>,
109 pub properties: Option<Value>,
110 pub(crate) kind_status: Option<String>,
111}
112
113impl NotePatch {
114 pub fn new(
117 name: Option<Option<String>>,
118 content: Option<String>,
119 salience: Option<Option<f64>>,
120 decay_factor: Option<Option<f64>>,
121 properties: Option<Value>,
122 ) -> Self {
123 Self {
124 name,
125 content,
126 salience,
127 decay_factor,
128 properties,
129 kind_status: None,
130 }
131 }
132}
133
134#[derive(Clone, Debug, Default)]
136pub struct EdgeListFilter {
137 pub source_id: Option<Uuid>,
138 pub target_id: Option<Uuid>,
139 pub relations: Vec<EdgeRelation>,
141 pub min_weight: Option<f64>,
142 pub max_weight: Option<f64>,
143}
144
145impl From<EdgeListFilter> for EdgeFilter {
146 fn from(f: EdgeListFilter) -> Self {
147 EdgeFilter {
148 source_ids: f.source_id.into_iter().collect(),
149 target_ids: f.target_id.into_iter().collect(),
150 relations: f.relations,
151 min_weight: f.min_weight,
152 max_weight: f.max_weight,
153 ..Default::default()
154 }
155 }
156}
157
158impl KhiveRuntime {
163 pub async fn update_entity(
171 &self,
172 token: &NamespaceToken,
173 id: Uuid,
174 patch: EntityPatch,
175 ) -> RuntimeResult<Entity> {
176 let store = self.entities(token)?;
177 let mut entity = store
178 .get_entity(id)
179 .await?
180 .ok_or_else(|| RuntimeError::NotFound(format!("entity {id}")))?;
181
182 self.ensure_namespace(&entity.namespace, token, id)?;
183
184 let mut text_changed = false;
185 let mut changed_fields: Vec<&'static str> = Vec::new();
186
187 if let Some(name) = patch.name {
188 text_changed |= entity.name != name;
189 entity.name = name;
190 changed_fields.push("name");
191 }
192 if let Some(desc_patch) = patch.description {
193 text_changed |= entity.description != desc_patch;
194 entity.description = desc_patch;
195 changed_fields.push("description");
196 }
197 if let Some(props) = patch.properties {
198 let (merged, _) = merge_properties(
199 &entity.properties,
200 &Some(props),
201 EntityDedupMergePolicy::PreferFrom,
202 );
203 entity.properties = merged;
204 changed_fields.push("properties");
205 }
206 if let Some(tags) = patch.tags {
207 entity.tags = tags;
208 changed_fields.push("tags");
209 }
210
211 entity.updated_at = chrono::Utc::now().timestamp_micros();
212 store.upsert_entity(entity.clone()).await?;
213
214 if text_changed {
215 self.reindex_entity(token, &entity).await?;
216 }
217
218 let event_store = self.events(token)?;
219 let event = khive_storage::event::Event::new(
220 entity.namespace.clone(),
221 "update",
222 EventKind::EntityUpdated,
223 SubstrateKind::Entity,
224 "",
225 )
226 .with_target(entity.id)
227 .with_payload(serde_json::json!({
228 "id": entity.id,
229 "namespace": entity.namespace,
230 "changed_fields": changed_fields,
231 }));
232 event_store.append_event(event).await.map_err(|e| {
233 RuntimeError::Internal(format!("update_entity: event store write failed: {e}"))
234 })?;
235
236 Ok(entity)
237 }
238
239 pub async fn merge_entity(
252 &self,
253 token: &NamespaceToken,
254 into_id: Uuid,
255 from_id: Uuid,
256 strategy: EntityDedupMergePolicy,
257 dry_run: bool,
258 ) -> RuntimeResult<MergeSummary> {
259 if into_id == from_id {
260 return Err(RuntimeError::InvalidInput(
261 "cannot merge an entity into itself".into(),
262 ));
263 }
264 let ns = token.namespace().as_str().to_owned();
265 let sanitized_ns: String = ns
266 .chars()
267 .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
268 .collect();
269 let fts_table = format!("fts_entities_{}", sanitized_ns);
270 let vec_table = self.config().embedding_model.map(|model| {
271 let key: String = model
272 .to_string()
273 .chars()
274 .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
275 .collect();
276 format!("vec_{}", key)
277 });
278
279 let _ = self.entities(token)?;
282 let _ = self.graph(token)?;
283 let _ = self.text(token)?;
284 if self.config().embedding_model.is_some() {
285 let _ = self.vectors(token)?;
286 }
287
288 let pool = self.backend().pool_arc();
289
290 let (summary, updated_entity) = tokio::task::spawn_blocking(move || {
291 let guard = pool.writer()?;
292 guard.transaction(|conn| {
293 merge_entity_sql(
294 conn, ns, fts_table, vec_table, into_id, from_id, strategy, dry_run,
295 )
296 })
297 })
298 .await
299 .map_err(|e| RuntimeError::Internal(e.to_string()))??;
300
301 if !dry_run && self.config().embedding_model.is_some() {
304 self.reindex_entity(token, &updated_entity).await?;
305 }
306
307 let event_store = self.events(token)?;
308 let policy_str = match strategy {
311 EntityDedupMergePolicy::PreferInto => "prefer_into",
312 EntityDedupMergePolicy::PreferFrom => "prefer_from",
313 EntityDedupMergePolicy::Union => "union",
314 };
315 let event = khive_storage::event::Event::new(
316 updated_entity.namespace.clone(),
317 "merge",
318 EventKind::EntityMerged,
319 SubstrateKind::Entity,
320 "",
321 )
322 .with_target(summary.kept_id)
323 .with_payload(serde_json::json!({
324 "into_id": summary.kept_id,
325 "from_id": summary.removed_id,
326 "policy": policy_str,
327 "edges_rewired": summary.edges_rewired,
328 }));
329 event_store.append_event(event).await.map_err(|e| {
330 RuntimeError::Internal(format!("merge_entity: event store write failed: {e}"))
331 })?;
332
333 Ok(summary)
334 }
335
336 pub(crate) async fn reindex_entity(
344 &self,
345 token: &NamespaceToken,
346 entity: &Entity,
347 ) -> RuntimeResult<()> {
348 let body = match &entity.description {
349 Some(d) if !d.is_empty() => format!("{} {}", entity.name, d),
350 _ => entity.name.clone(),
351 };
352 let ns = entity.namespace.clone();
354 self.text(token)?
355 .upsert_document(TextDocument {
356 subject_id: entity.id,
357 kind: SubstrateKind::Entity,
358 title: Some(entity.name.clone()),
359 body: body.clone(),
360 tags: entity.tags.clone(),
361 namespace: ns.clone(),
362 metadata: entity.properties.clone(),
363 updated_at: chrono::Utc::now(),
364 })
365 .await?;
366
367 if self.config().embedding_model.is_some() {
368 let vector = self.embed(&body).await?;
369 self.vectors(token)?
370 .insert(
371 entity.id,
372 SubstrateKind::Entity,
373 &ns,
374 "entity.body",
375 vec![vector],
376 )
377 .await?;
378 }
379
380 Ok(())
381 }
382
383 pub(crate) async fn remove_from_indexes(
385 &self,
386 token: &NamespaceToken,
387 id: Uuid,
388 ) -> RuntimeResult<()> {
389 let ns = token.namespace().as_str().to_owned();
390 self.text(token)?.delete_document(&ns, id).await?;
391 if self.config().embedding_model.is_some() {
392 self.vectors(token)?.delete(id).await?;
393 }
394 Ok(())
395 }
396
397 pub(crate) async fn reindex_note(
399 &self,
400 token: &NamespaceToken,
401 note: &khive_storage::note::Note,
402 ) -> RuntimeResult<()> {
403 let ns = note.namespace.clone();
404 self.text_for_notes(token)?
405 .upsert_document(TextDocument {
406 subject_id: note.id,
407 kind: SubstrateKind::Note,
408 title: note.name.clone(),
409 body: note.content.clone(),
410 tags: Vec::new(),
411 namespace: ns.clone(),
412 metadata: note.properties.clone(),
413 updated_at: chrono::Utc::now(),
414 })
415 .await?;
416
417 if self.config().embedding_model.is_some() {
418 let vector = self.embed(¬e.content).await?;
419 self.vectors(token)?
420 .insert(
421 note.id,
422 SubstrateKind::Note,
423 &ns,
424 "note.content",
425 vec![vector],
426 )
427 .await?;
428 }
429 Ok(())
430 }
431
432 pub async fn update_note(
434 &self,
435 token: &NamespaceToken,
436 id: Uuid,
437 patch: NotePatch,
438 ) -> RuntimeResult<khive_storage::note::Note> {
439 let store = self.notes(token)?;
440 let mut note = store
441 .get_note(id)
442 .await?
443 .ok_or_else(|| RuntimeError::NotFound(format!("note {id}")))?;
444
445 if note.namespace != token.namespace().as_str() {
446 return Err(RuntimeError::NotFound(format!("note {id}")));
447 }
448
449 let mut text_changed = false;
450
451 if let Some(name_patch) = patch.name {
452 text_changed |= note.name != name_patch;
453 note.name = name_patch;
454 }
455 if let Some(content) = patch.content {
456 text_changed |= note.content != content;
457 note.content = content;
458 }
459 if let Some(salience_patch) = patch.salience {
460 note.salience = salience_patch.map(|s| s.clamp(0.0, 1.0));
461 }
462 if let Some(decay_patch) = patch.decay_factor {
463 note.decay_factor = decay_patch.map(|d| d.max(0.0));
464 }
465 if let Some(props) = patch.properties {
466 let (merged, _) = merge_properties(
467 ¬e.properties,
468 &Some(props),
469 EntityDedupMergePolicy::PreferFrom,
470 );
471 note.properties = merged;
472 }
473 if let Some(status) = patch.kind_status {
474 note.status = status;
475 }
476
477 note.updated_at = chrono::Utc::now().timestamp_micros();
478 store.upsert_note(note.clone()).await?;
479
480 if text_changed {
481 self.reindex_note(token, ¬e).await?;
482 }
483
484 Ok(note)
485 }
486
487 pub async fn merge_note(
496 &self,
497 token: &NamespaceToken,
498 into_id: Uuid,
499 from_id: Uuid,
500 strategy: EntityDedupMergePolicy,
501 content_strategy: ContentMergeStrategy,
502 dry_run: bool,
503 ) -> RuntimeResult<MergeSummary> {
504 if into_id == from_id {
505 return Err(RuntimeError::InvalidInput(
506 "cannot merge a note into itself".into(),
507 ));
508 }
509 let ns = token.namespace().as_str().to_string();
510 let sanitized_ns: String = ns
511 .chars()
512 .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
513 .collect();
514 let fts_table = format!("fts_notes_{}", sanitized_ns);
515 let vec_table = self.config().embedding_model.map(|model| {
516 let key: String = model
517 .to_string()
518 .chars()
519 .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
520 .collect();
521 format!("vec_{}", key)
522 });
523
524 let _ = self.notes(token)?;
525 let _ = self.graph(token)?;
526 let _ = self.text_for_notes(token)?;
527 if self.config().embedding_model.is_some() {
528 let _ = self.vectors(token)?;
529 }
530
531 let pool = self.backend().pool_arc();
532 let (summary, updated_note) = tokio::task::spawn_blocking(move || {
533 let guard = pool.writer()?;
534 guard.transaction(|conn| {
535 merge_note_sql(
536 conn,
537 ns,
538 fts_table,
539 vec_table,
540 into_id,
541 from_id,
542 strategy,
543 content_strategy,
544 dry_run,
545 )
546 })
547 })
548 .await
549 .map_err(|e| RuntimeError::Internal(e.to_string()))??;
550
551 if !dry_run && self.config().embedding_model.is_some() {
552 self.reindex_note(token, &updated_note).await?;
553 }
554 Ok(summary)
555 }
556}
557
558fn read_merge_entity(
564 conn: &rusqlite::Connection,
565 id: Uuid,
566 namespace: &str,
567) -> Result<Entity, SqliteError> {
568 let id_str = id.to_string();
569 let mut stmt = conn.prepare(
570 "SELECT id, namespace, kind, entity_type, name, description, properties, tags, \
571 created_at, updated_at, deleted_at, merged_into, merge_event_id \
572 FROM entities WHERE id = ?1 AND deleted_at IS NULL",
573 )?;
574 let mut rows = stmt.query(rusqlite::params![id_str])?;
575 let row = rows
576 .next()?
577 .ok_or_else(|| SqliteError::InvalidData(format!("entity {id} not found")))?;
578
579 let id_s: String = row.get(0)?;
580 let ns: String = row.get(1)?;
581 let kind: String = row.get(2)?;
582 let entity_type: Option<String> = row.get(3)?;
583 let name: String = row.get(4)?;
584 let description: Option<String> = row.get(5)?;
585 let properties_str: Option<String> = row.get(6)?;
586 let tags_str: String = row.get(7)?;
587 let created_at: i64 = row.get(8)?;
588 let updated_at: i64 = row.get(9)?;
589 let deleted_at: Option<i64> = row.get(10)?;
590 let merged_into_str: Option<String> = row.get(11)?;
591 let merge_event_id_str: Option<String> = row.get(12)?;
592
593 if ns != namespace {
594 return Err(SqliteError::InvalidData(format!(
595 "entity {id} belongs to namespace '{ns}', not '{namespace}'"
596 )));
597 }
598
599 let entity_id = Uuid::parse_str(&id_s).map_err(|e| SqliteError::InvalidData(e.to_string()))?;
600 let properties: Option<Value> = properties_str
601 .map(|s| {
602 serde_json::from_str::<Value>(&s).map_err(|e| SqliteError::InvalidData(e.to_string()))
603 })
604 .transpose()?;
605 let tags: Vec<String> =
606 serde_json::from_str(&tags_str).map_err(|e| SqliteError::InvalidData(e.to_string()))?;
607 let merged_into = merged_into_str
608 .as_deref()
609 .map(Uuid::parse_str)
610 .transpose()
611 .map_err(|e| SqliteError::InvalidData(e.to_string()))?;
612 let merge_event_id = merge_event_id_str
613 .as_deref()
614 .map(Uuid::parse_str)
615 .transpose()
616 .map_err(|e| SqliteError::InvalidData(e.to_string()))?;
617
618 Ok(Entity {
619 id: entity_id,
620 namespace: ns,
621 kind,
622 entity_type,
623 name,
624 description,
625 properties,
626 tags,
627 created_at,
628 updated_at,
629 deleted_at,
630 merged_into,
631 merge_event_id,
632 })
633}
634
635#[allow(clippy::too_many_arguments)]
643fn merge_entity_sql(
644 conn: &rusqlite::Connection,
645 namespace: String,
646 fts_table: String,
647 vec_table: Option<String>,
648 into_id: Uuid,
649 from_id: Uuid,
650 strategy: EntityDedupMergePolicy,
651 dry_run: bool,
652) -> Result<(MergeSummary, Entity), SqliteError> {
653 let into_entity = read_merge_entity(conn, into_id, &namespace)?;
654 let from_entity = read_merge_entity(conn, from_id, &namespace)?;
655
656 #[allow(dead_code)]
658 struct EdgeRow {
659 id: Uuid,
660 source_id: Uuid,
661 target_id: Uuid,
662 relation: String,
663 weight: f64,
664 created_at: i64,
665 updated_at: i64,
666 deleted_at: Option<i64>,
667 target_backend: Option<String>,
668 metadata: Option<String>,
669 }
670
671 let parse_id =
672 |s: String| Uuid::parse_str(&s).map_err(|e| SqliteError::InvalidData(e.to_string()));
673
674 let from_str = from_id.to_string();
675
676 let mut outbound: Vec<EdgeRow> = Vec::new();
677 {
678 let mut stmt = conn.prepare(
679 "SELECT id, source_id, target_id, relation, weight, created_at, \
680 updated_at, deleted_at, target_backend, metadata \
681 FROM graph_edges WHERE namespace = ?1 AND source_id = ?2",
682 )?;
683 let mut rows = stmt.query(rusqlite::params![&namespace, &from_str])?;
684 while let Some(row) = rows.next()? {
685 outbound.push(EdgeRow {
686 id: parse_id(row.get(0)?)?,
687 source_id: parse_id(row.get(1)?)?,
688 target_id: parse_id(row.get(2)?)?,
689 relation: row.get(3)?,
690 weight: row.get(4)?,
691 created_at: row.get(5)?,
692 updated_at: row.get(6)?,
693 deleted_at: row.get(7)?,
694 target_backend: row.get(8)?,
695 metadata: row.get(9)?,
696 });
697 }
698 }
699
700 let mut inbound: Vec<EdgeRow> = Vec::new();
701 {
702 let mut stmt = conn.prepare(
703 "SELECT id, source_id, target_id, relation, weight, created_at, \
704 updated_at, deleted_at, target_backend, metadata \
705 FROM graph_edges WHERE namespace = ?1 AND target_id = ?2",
706 )?;
707 let mut rows = stmt.query(rusqlite::params![&namespace, &from_str])?;
708 while let Some(row) = rows.next()? {
709 inbound.push(EdgeRow {
710 id: parse_id(row.get(0)?)?,
711 source_id: parse_id(row.get(1)?)?,
712 target_id: parse_id(row.get(2)?)?,
713 relation: row.get(3)?,
714 weight: row.get(4)?,
715 created_at: row.get(5)?,
716 updated_at: row.get(6)?,
717 deleted_at: row.get(7)?,
718 target_backend: row.get(8)?,
719 metadata: row.get(9)?,
720 });
721 }
722 }
723
724 let mut seen: HashSet<Uuid> = HashSet::new();
726 let mut all_edges: Vec<EdgeRow> = Vec::new();
727 for edge in outbound.into_iter().chain(inbound) {
728 if seen.insert(edge.id) {
729 all_edges.push(edge);
730 }
731 }
732
733 let (merged_props, properties_merged) =
735 merge_properties(&into_entity.properties, &from_entity.properties, strategy);
736 let merged_name = merge_string_field(&into_entity.name, &from_entity.name, strategy);
737 let merged_description =
738 merge_option_string_field(&into_entity.description, &from_entity.description, strategy);
739 let (merged_tags, tags_unioned) = union_tags(&into_entity.tags, &from_entity.tags);
740
741 let now = chrono::Utc::now().timestamp_micros();
742 let into_str = into_id.to_string();
743 let props_str = merged_props
744 .as_ref()
745 .map(|v| serde_json::to_string(v).unwrap_or_default());
746 let tags_json = serde_json::to_string(&merged_tags).unwrap_or_else(|_| "[]".to_string());
747
748 let mut edges_rewired = 0usize;
750 if !dry_run {
751 for edge in all_edges {
752 let new_src = if edge.source_id == from_id {
753 into_id
754 } else {
755 edge.source_id
756 };
757 let new_tgt = if edge.target_id == from_id {
758 into_id
759 } else {
760 edge.target_id
761 };
762
763 if new_src == new_tgt {
764 conn.execute(
765 "DELETE FROM graph_edges WHERE namespace = ?1 AND id = ?2",
766 rusqlite::params![&namespace, edge.id.to_string()],
767 )?;
768 continue;
769 }
770
771 let now_ts = chrono::Utc::now().timestamp();
772 conn.execute(
773 "INSERT INTO graph_edges \
774 (namespace, id, source_id, target_id, relation, weight, created_at, updated_at, deleted_at, target_backend, metadata) \
775 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11) \
776 ON CONFLICT(namespace, id) DO UPDATE SET \
777 source_id = excluded.source_id, \
778 target_id = excluded.target_id, \
779 relation = excluded.relation, \
780 weight = excluded.weight, \
781 updated_at = excluded.updated_at, \
782 metadata = excluded.metadata \
783 ON CONFLICT(namespace, source_id, target_id, relation) DO NOTHING",
784 rusqlite::params![
785 &namespace,
786 edge.id.to_string(),
787 new_src.to_string(),
788 new_tgt.to_string(),
789 &edge.relation,
790 edge.weight,
791 edge.created_at,
792 now_ts,
793 edge.deleted_at,
794 edge.target_backend,
795 edge.metadata,
796 ],
797 )?;
798 edges_rewired += 1;
799 }
800
801 conn.execute(
803 "INSERT OR REPLACE INTO entities \
804 (id, namespace, kind, name, description, properties, tags, \
805 created_at, updated_at, deleted_at, merged_into, merge_event_id) \
806 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
807 rusqlite::params![
808 &into_str,
809 &namespace,
810 &into_entity.kind,
811 &merged_name,
812 &merged_description,
813 &props_str,
814 &tags_json,
815 into_entity.created_at,
816 now,
817 into_entity.deleted_at,
818 Option::<String>::None,
819 Option::<String>::None,
820 ],
821 )?;
822
823 let fts_body = match &merged_description {
825 Some(d) if !d.is_empty() => format!("{} {}", merged_name, d),
826 _ => merged_name.clone(),
827 };
828 let kind_str = SubstrateKind::Entity.to_string();
829
830 conn.execute(
831 &format!(
832 "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
833 fts_table
834 ),
835 rusqlite::params![&namespace, &into_str],
836 )?;
837 conn.execute(
838 &format!(
839 "INSERT INTO {} \
840 (subject_id, kind, title, body, tags, namespace, metadata, updated_at) \
841 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
842 fts_table
843 ),
844 rusqlite::params![
845 &into_str,
846 &kind_str,
847 &merged_name,
848 &fts_body,
849 &tags_json,
850 &namespace,
851 &props_str,
852 now,
853 ],
854 )?;
855
856 conn.execute(
858 &format!(
859 "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
860 fts_table
861 ),
862 rusqlite::params![&namespace, &from_str],
863 )?;
864
865 if let Some(ref vec_tbl) = vec_table {
867 conn.execute(
868 &format!(
869 "DELETE FROM {} WHERE subject_id = ?1 AND namespace = ?2",
870 vec_tbl
871 ),
872 rusqlite::params![&from_str, &namespace],
873 )?;
874 }
875
876 let merge_event_id = Uuid::new_v4();
878 conn.execute(
879 "UPDATE entities \
880 SET deleted_at = ?1, merged_into = ?2, merge_event_id = ?3, updated_at = ?1 \
881 WHERE namespace = ?4 AND id = ?5 AND deleted_at IS NULL",
882 rusqlite::params![
883 now,
884 into_str,
885 merge_event_id.to_string(),
886 &namespace,
887 &from_str,
888 ],
889 )?;
890 }
891
892 let updated_entity = Entity {
893 id: into_id,
894 namespace,
895 kind: into_entity.kind,
896 entity_type: into_entity.entity_type,
897 name: merged_name,
898 description: merged_description,
899 properties: merged_props,
900 tags: merged_tags,
901 created_at: into_entity.created_at,
902 updated_at: now,
903 deleted_at: into_entity.deleted_at,
904 merged_into: None,
905 merge_event_id: None,
906 };
907
908 Ok((
909 MergeSummary {
910 kept_id: into_id,
911 removed_id: from_id,
912 edges_rewired,
913 properties_merged,
914 tags_unioned,
915 content_appended: false,
916 dry_run,
917 },
918 updated_entity,
919 ))
920}
921
922fn read_merge_note(
928 conn: &rusqlite::Connection,
929 id: Uuid,
930 namespace: &str,
931) -> Result<khive_storage::note::Note, SqliteError> {
932 use khive_storage::note::Note;
933 let id_str = id.to_string();
934 let mut stmt = conn.prepare(
935 "SELECT id, namespace, kind, status, name, content, salience, decay_factor, \
936 expires_at, properties, created_at, updated_at, deleted_at \
937 FROM notes WHERE id = ?1 AND deleted_at IS NULL",
938 )?;
939 let mut rows = stmt.query(rusqlite::params![id_str])?;
940 let row = rows
941 .next()?
942 .ok_or_else(|| SqliteError::InvalidData(format!("note {id} not found")))?;
943
944 let id_s: String = row.get(0)?;
945 let ns: String = row.get(1)?;
946 let kind: String = row.get(2)?;
947 let status: String = row.get(3)?;
948 let name: Option<String> = row.get(4)?;
949 let content: String = row.get(5)?;
950 let salience: Option<f64> = row.get(6)?;
951 let decay_factor: Option<f64> = row.get(7)?;
952 let expires_at: Option<i64> = row.get(8)?;
953 let properties_str: Option<String> = row.get(9)?;
954 let created_at: i64 = row.get(10)?;
955 let updated_at: i64 = row.get(11)?;
956 let deleted_at: Option<i64> = row.get(12)?;
957
958 if ns != namespace {
959 return Err(SqliteError::InvalidData(format!(
960 "note {id} belongs to namespace '{ns}', not '{namespace}'"
961 )));
962 }
963
964 let note_id = Uuid::parse_str(&id_s).map_err(|e| SqliteError::InvalidData(e.to_string()))?;
965 let properties: Option<serde_json::Value> = properties_str
966 .map(|s| serde_json::from_str(&s).map_err(|e| SqliteError::InvalidData(e.to_string())))
967 .transpose()?;
968
969 Ok(Note {
970 id: note_id,
971 namespace: ns,
972 kind,
973 status,
974 name,
975 content,
976 salience,
977 decay_factor,
978 expires_at,
979 properties,
980 created_at,
981 updated_at,
982 deleted_at,
983 })
984}
985
986fn max_option_f64(a: Option<f64>, b: Option<f64>) -> Option<f64> {
987 match (a, b) {
988 (Some(x), Some(y)) => Some(x.max(y)),
989 (Some(x), None) => Some(x),
990 (None, Some(y)) => Some(y),
991 (None, None) => None,
992 }
993}
994
995fn append_merge_history(props: Option<Value>, entry: Value) -> Result<Option<Value>, SqliteError> {
996 use serde_json::{json, Map};
997 let mut obj: Map<String, Value> = match props {
998 Some(Value::Object(m)) => m,
999 Some(other) => {
1000 let mut m = Map::new();
1001 m.insert("_value".into(), other);
1002 m
1003 }
1004 None => Map::new(),
1005 };
1006 let history = obj
1007 .entry("_merge_history".to_string())
1008 .or_insert_with(|| json!([]));
1009 if let Value::Array(arr) = history {
1010 arr.push(entry);
1011 }
1012 Ok(Some(Value::Object(obj)))
1013}
1014
1015#[allow(clippy::too_many_arguments)]
1023fn merge_note_sql(
1024 conn: &rusqlite::Connection,
1025 namespace: String,
1026 fts_table: String,
1027 vec_table: Option<String>,
1028 into_id: Uuid,
1029 from_id: Uuid,
1030 strategy: EntityDedupMergePolicy,
1031 content_strategy: ContentMergeStrategy,
1032 dry_run: bool,
1033) -> Result<(MergeSummary, khive_storage::note::Note), SqliteError> {
1034 let into_note = read_merge_note(conn, into_id, &namespace)?;
1035 let from_note = read_merge_note(conn, from_id, &namespace)?;
1036
1037 if into_note.kind != from_note.kind {
1038 return Err(SqliteError::InvalidData(format!(
1039 "cannot merge notes of different kinds: {} vs {}",
1040 into_note.kind, from_note.kind
1041 )));
1042 }
1043
1044 let now = chrono::Utc::now().timestamp_micros();
1045 let into_str = into_id.to_string();
1046 let from_str = from_id.to_string();
1047
1048 #[allow(dead_code)]
1050 struct EdgeRow {
1051 id: Uuid,
1052 source_id: Uuid,
1053 target_id: Uuid,
1054 relation: String,
1055 weight: f64,
1056 created_at: i64,
1057 updated_at: i64,
1058 deleted_at: Option<i64>,
1059 target_backend: Option<String>,
1060 metadata: Option<String>,
1061 }
1062 let parse_id =
1063 |s: String| Uuid::parse_str(&s).map_err(|e| SqliteError::InvalidData(e.to_string()));
1064
1065 let mut outbound: Vec<EdgeRow> = Vec::new();
1066 {
1067 let mut stmt = conn.prepare(
1068 "SELECT id, source_id, target_id, relation, weight, created_at, updated_at, deleted_at, target_backend, metadata \
1069 FROM graph_edges WHERE namespace = ?1 AND source_id = ?2",
1070 )?;
1071 let mut rows = stmt.query(rusqlite::params![&namespace, &from_str])?;
1072 while let Some(row) = rows.next()? {
1073 outbound.push(EdgeRow {
1074 id: parse_id(row.get(0)?)?,
1075 source_id: parse_id(row.get(1)?)?,
1076 target_id: parse_id(row.get(2)?)?,
1077 relation: row.get(3)?,
1078 weight: row.get(4)?,
1079 created_at: row.get(5)?,
1080 updated_at: row.get(6)?,
1081 deleted_at: row.get(7)?,
1082 target_backend: row.get(8)?,
1083 metadata: row.get(9)?,
1084 });
1085 }
1086 }
1087 let mut inbound: Vec<EdgeRow> = Vec::new();
1088 {
1089 let mut stmt = conn.prepare(
1090 "SELECT id, source_id, target_id, relation, weight, created_at, updated_at, deleted_at, target_backend, metadata \
1091 FROM graph_edges WHERE namespace = ?1 AND target_id = ?2",
1092 )?;
1093 let mut rows = stmt.query(rusqlite::params![&namespace, &from_str])?;
1094 while let Some(row) = rows.next()? {
1095 inbound.push(EdgeRow {
1096 id: parse_id(row.get(0)?)?,
1097 source_id: parse_id(row.get(1)?)?,
1098 target_id: parse_id(row.get(2)?)?,
1099 relation: row.get(3)?,
1100 weight: row.get(4)?,
1101 created_at: row.get(5)?,
1102 updated_at: row.get(6)?,
1103 deleted_at: row.get(7)?,
1104 target_backend: row.get(8)?,
1105 metadata: row.get(9)?,
1106 });
1107 }
1108 }
1109 let mut seen: HashSet<Uuid> = HashSet::new();
1110 let mut all_edges: Vec<EdgeRow> = Vec::new();
1111 for edge in outbound.into_iter().chain(inbound) {
1112 if seen.insert(edge.id) {
1113 all_edges.push(edge);
1114 }
1115 }
1116
1117 let (merged_content, content_appended) = match content_strategy {
1119 ContentMergeStrategy::Append => {
1120 if from_note.content.is_empty() {
1121 (into_note.content.clone(), false)
1122 } else {
1123 (
1124 format!("{}\n\n---\n\n{}", into_note.content, from_note.content),
1125 true,
1126 )
1127 }
1128 }
1129 ContentMergeStrategy::PreferInto => (into_note.content.clone(), false),
1130 ContentMergeStrategy::PreferFrom => (from_note.content.clone(), false),
1131 };
1132
1133 let merged_name = match strategy {
1134 EntityDedupMergePolicy::PreferFrom => from_note.name.clone().or(into_note.name.clone()),
1135 _ => into_note.name.clone().or(from_note.name.clone()),
1136 };
1137
1138 let (merged_props, properties_merged) =
1139 merge_properties(&into_note.properties, &from_note.properties, strategy);
1140
1141 let merge_history_entry = serde_json::json!({
1143 "merged_from": from_id.to_string(),
1144 "merged_at": now,
1145 "strategy": format!("{:?}", strategy),
1146 "content_strategy": format!("{:?}", content_strategy),
1147 });
1148 let merged_props = append_merge_history(merged_props, merge_history_entry)?;
1149
1150 let merged_salience = max_option_f64(into_note.salience, from_note.salience);
1151 let merged_expires_at = match (into_note.expires_at, from_note.expires_at) {
1152 (Some(a), Some(b)) => Some(a.max(b)),
1153 (Some(a), None) => Some(a),
1154 (None, Some(b)) => Some(b),
1155 (None, None) => None,
1156 };
1157
1158 let props_str = merged_props
1159 .as_ref()
1160 .map(|v| serde_json::to_string(v).unwrap_or_default());
1161
1162 let mut edges_rewired = 0usize;
1163 if !dry_run {
1164 for edge in all_edges {
1166 let new_src = if edge.source_id == from_id {
1167 into_id
1168 } else {
1169 edge.source_id
1170 };
1171 let new_tgt = if edge.target_id == from_id {
1172 into_id
1173 } else {
1174 edge.target_id
1175 };
1176 if new_src == new_tgt {
1177 conn.execute(
1178 "DELETE FROM graph_edges WHERE namespace = ?1 AND id = ?2",
1179 rusqlite::params![&namespace, edge.id.to_string()],
1180 )?;
1181 continue;
1182 }
1183 let now_ts = chrono::Utc::now().timestamp();
1184 conn.execute(
1185 "INSERT INTO graph_edges \
1186 (namespace, id, source_id, target_id, relation, weight, created_at, updated_at, deleted_at, target_backend, metadata) \
1187 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11) \
1188 ON CONFLICT(namespace, id) DO UPDATE SET \
1189 source_id = excluded.source_id, \
1190 target_id = excluded.target_id, \
1191 relation = excluded.relation, \
1192 weight = excluded.weight, \
1193 updated_at = excluded.updated_at, \
1194 metadata = excluded.metadata \
1195 ON CONFLICT(namespace, source_id, target_id, relation) DO NOTHING",
1196 rusqlite::params![
1197 &namespace,
1198 edge.id.to_string(),
1199 new_src.to_string(),
1200 new_tgt.to_string(),
1201 &edge.relation,
1202 edge.weight,
1203 edge.created_at,
1204 now_ts,
1205 edge.deleted_at,
1206 edge.target_backend,
1207 edge.metadata,
1208 ],
1209 )?;
1210 edges_rewired += 1;
1211 }
1212
1213 conn.execute(
1215 "INSERT OR REPLACE INTO notes \
1216 (id, namespace, kind, status, name, content, salience, decay_factor, \
1217 expires_at, properties, created_at, updated_at, deleted_at) \
1218 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
1219 rusqlite::params![
1220 &into_str,
1221 &namespace,
1222 &into_note.kind,
1223 &into_note.status,
1224 &merged_name,
1225 &merged_content,
1226 merged_salience,
1227 into_note.decay_factor,
1228 merged_expires_at,
1229 &props_str,
1230 into_note.created_at,
1231 now,
1232 into_note.deleted_at,
1233 ],
1234 )?;
1235
1236 conn.execute(
1238 &format!(
1239 "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
1240 fts_table
1241 ),
1242 rusqlite::params![&namespace, &into_str],
1243 )?;
1244 conn.execute(
1245 &format!(
1246 "INSERT INTO {} \
1247 (subject_id, kind, title, body, tags, namespace, metadata, updated_at) \
1248 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1249 fts_table
1250 ),
1251 rusqlite::params![
1252 &into_str,
1253 SubstrateKind::Note.to_string(),
1254 &merged_name,
1255 &merged_content,
1256 "[]",
1257 &namespace,
1258 &props_str,
1259 now,
1260 ],
1261 )?;
1262
1263 conn.execute(
1265 &format!(
1266 "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
1267 fts_table
1268 ),
1269 rusqlite::params![&namespace, &from_str],
1270 )?;
1271
1272 if let Some(ref vec_tbl) = vec_table {
1274 conn.execute(
1275 &format!(
1276 "DELETE FROM {} WHERE subject_id = ?1 AND namespace = ?2",
1277 vec_tbl
1278 ),
1279 rusqlite::params![&from_str, &namespace],
1280 )?;
1281 }
1282
1283 conn.execute(
1285 "UPDATE notes SET status = 'deleted', deleted_at = ?1, updated_at = ?1 \
1286 WHERE namespace = ?2 AND id = ?3 AND deleted_at IS NULL",
1287 rusqlite::params![now, &namespace, &from_str],
1288 )?;
1289 }
1290
1291 let updated_note = khive_storage::note::Note {
1292 id: into_id,
1293 namespace: namespace.clone(),
1294 kind: into_note.kind.clone(),
1295 status: into_note.status.clone(),
1296 name: merged_name,
1297 content: merged_content,
1298 salience: merged_salience,
1299 decay_factor: into_note.decay_factor,
1300 expires_at: merged_expires_at,
1301 properties: merged_props,
1302 created_at: into_note.created_at,
1303 updated_at: now,
1304 deleted_at: into_note.deleted_at,
1305 };
1306
1307 Ok((
1308 MergeSummary {
1309 kept_id: into_id,
1310 removed_id: from_id,
1311 edges_rewired,
1312 properties_merged,
1313 tags_unioned: 0,
1314 content_appended,
1315 dry_run,
1316 },
1317 updated_note,
1318 ))
1319}
1320
1321fn merge_string_field(into: &str, from: &str, strategy: EntityDedupMergePolicy) -> String {
1326 match strategy {
1327 EntityDedupMergePolicy::PreferInto | EntityDedupMergePolicy::Union => into.to_string(),
1328 EntityDedupMergePolicy::PreferFrom => from.to_string(),
1329 }
1330}
1331
1332fn merge_option_string_field(
1333 into: &Option<String>,
1334 from: &Option<String>,
1335 strategy: EntityDedupMergePolicy,
1336) -> Option<String> {
1337 match strategy {
1338 EntityDedupMergePolicy::PreferInto => {
1339 if into.is_some() {
1340 into.clone()
1341 } else {
1342 from.clone()
1343 }
1344 }
1345 EntityDedupMergePolicy::PreferFrom => {
1346 if from.is_some() {
1347 from.clone()
1348 } else {
1349 into.clone()
1350 }
1351 }
1352 EntityDedupMergePolicy::Union => {
1353 match (into, from) {
1355 (Some(a), _) if !a.is_empty() => Some(a.clone()),
1356 (_, Some(b)) => Some(b.clone()),
1357 _ => None,
1358 }
1359 }
1360 }
1361}
1362
1363fn merge_properties(
1365 into: &Option<Value>,
1366 from: &Option<Value>,
1367 strategy: EntityDedupMergePolicy,
1368) -> (Option<Value>, usize) {
1369 match (into, from) {
1370 (None, None) => (None, 0),
1371 (Some(a), None) => (Some(a.clone()), 0),
1372 (None, Some(b)) => {
1373 let count = if let Value::Object(m) = b { m.len() } else { 1 };
1374 (Some(b.clone()), count)
1375 }
1376 (Some(into_val), Some(from_val)) => {
1377 let (merged, added) = merge_json(into_val, from_val, strategy);
1378 (Some(merged), added)
1379 }
1380 }
1381}
1382
1383fn merge_json(into: &Value, from: &Value, strategy: EntityDedupMergePolicy) -> (Value, usize) {
1385 match (into, from, strategy) {
1386 (Value::Object(a), Value::Object(b), EntityDedupMergePolicy::Union) => {
1387 let mut result = a.clone();
1388 let mut added = 0usize;
1389 for (k, v_from) in b {
1390 if let Some(v_into) = a.get(k) {
1391 let (merged, sub_added) =
1392 merge_json(v_into, v_from, EntityDedupMergePolicy::Union);
1393 result.insert(k.clone(), merged);
1394 added += sub_added;
1395 } else {
1396 result.insert(k.clone(), v_from.clone());
1397 added += 1;
1398 }
1399 }
1400 (Value::Object(result), added)
1401 }
1402 (Value::Object(a), Value::Object(b), EntityDedupMergePolicy::PreferInto) => {
1403 let mut result = a.clone();
1404 let mut added = 0usize;
1405 for (k, v) in b {
1406 if !a.contains_key(k) {
1407 result.insert(k.clone(), v.clone());
1408 added += 1;
1409 }
1410 }
1411 (Value::Object(result), added)
1412 }
1413 (Value::Object(a), Value::Object(b), EntityDedupMergePolicy::PreferFrom) => {
1414 let mut result = a.clone();
1415 let mut added = 0usize;
1416 for (k, v) in b {
1417 result.insert(k.clone(), v.clone());
1418 if !a.contains_key(k) {
1419 added += 1;
1420 }
1421 }
1422 (Value::Object(result), added)
1423 }
1424 (_into_val, from_val, EntityDedupMergePolicy::PreferFrom) => (from_val.clone(), 1),
1426 _ => (into.clone(), 0),
1427 }
1428}
1429
1430fn union_tags(into: &[String], from: &[String]) -> (Vec<String>, usize) {
1431 let mut seen: HashSet<&str> = into.iter().map(|s| s.as_str()).collect();
1432 let mut result: Vec<String> = into.to_vec();
1433 let mut added = 0usize;
1434 for tag in from {
1435 if seen.insert(tag.as_str()) {
1436 result.push(tag.clone());
1437 added += 1;
1438 }
1439 }
1440 (result, added)
1441}
1442
1443#[cfg(test)]
1448mod tests {
1449 use super::*;
1450 use crate::runtime::{KhiveRuntime, NamespaceToken};
1451 use khive_storage::types::{Direction, TextFilter, TextQueryMode, TextSearchRequest};
1452
1453 fn rt() -> KhiveRuntime {
1454 KhiveRuntime::memory().unwrap()
1455 }
1456
1457 async fn fts_hit(rt: &KhiveRuntime, token: &NamespaceToken, query: &str) -> Vec<Uuid> {
1459 let ns = token.namespace().as_str().to_string();
1460 rt.text(token)
1461 .unwrap()
1462 .search(TextSearchRequest {
1463 query: query.to_string(),
1464 mode: TextQueryMode::Plain,
1465 filter: Some(TextFilter {
1466 namespaces: vec![ns],
1467 ..Default::default()
1468 }),
1469 top_k: 50,
1470 snippet_chars: 100,
1471 })
1472 .await
1473 .unwrap()
1474 .into_iter()
1475 .map(|h| h.subject_id)
1476 .collect()
1477 }
1478
1479 #[tokio::test]
1480 async fn update_entity_patch_changes_only_specified_fields() {
1481 let rt = rt();
1482 let tok = NamespaceToken::local();
1483 let entity = rt
1484 .create_entity(
1485 &tok,
1486 "concept",
1487 None,
1488 "OriginalName",
1489 Some("orig desc"),
1490 Some(serde_json::json!({"k":"v"})),
1491 vec![],
1492 )
1493 .await
1494 .unwrap();
1495
1496 let updated = rt
1497 .update_entity(
1498 &tok,
1499 entity.id,
1500 EntityPatch {
1501 description: Some(Some("new desc".to_string())),
1502 ..Default::default()
1503 },
1504 )
1505 .await
1506 .unwrap();
1507
1508 assert_eq!(updated.name, "OriginalName");
1509 assert_eq!(updated.description.as_deref(), Some("new desc"));
1510 assert_eq!(updated.properties, Some(serde_json::json!({"k":"v"})));
1511 }
1512
1513 #[tokio::test]
1514 async fn update_entity_clear_description_with_some_none() {
1515 let rt = rt();
1516 let tok = NamespaceToken::local();
1517 let entity = rt
1518 .create_entity(
1519 &tok,
1520 "concept",
1521 None,
1522 "ClearDesc",
1523 Some("has description"),
1524 None,
1525 vec![],
1526 )
1527 .await
1528 .unwrap();
1529
1530 let updated = rt
1531 .update_entity(
1532 &tok,
1533 entity.id,
1534 EntityPatch {
1535 description: Some(None),
1536 ..Default::default()
1537 },
1538 )
1539 .await
1540 .unwrap();
1541
1542 assert!(
1543 updated.description.is_none(),
1544 "description should be cleared"
1545 );
1546 }
1547
1548 #[tokio::test]
1549 async fn update_entity_reindexes_when_name_changes() {
1550 let rt = rt();
1551 let tok = NamespaceToken::local();
1552 let entity = rt
1553 .create_entity(&tok, "concept", None, "OldName", None, None, vec![])
1554 .await
1555 .unwrap();
1556
1557 let hits_before = fts_hit(&rt, &tok, "OldName").await;
1559 assert!(
1560 hits_before.contains(&entity.id),
1561 "entity should be findable by old name"
1562 );
1563
1564 rt.update_entity(
1565 &tok,
1566 entity.id,
1567 EntityPatch {
1568 name: Some("NewName".to_string()),
1569 ..Default::default()
1570 },
1571 )
1572 .await
1573 .unwrap();
1574
1575 let hits_old = fts_hit(&rt, &tok, "OldName").await;
1576 let hits_new = fts_hit(&rt, &tok, "NewName").await;
1577
1578 assert!(
1580 !hits_old.contains(&entity.id),
1581 "old name should no longer match after rename"
1582 );
1583 assert!(
1584 hits_new.contains(&entity.id),
1585 "new name should be findable after rename"
1586 );
1587 }
1588
1589 #[tokio::test]
1590 async fn update_entity_properties_merges_preserving_existing_keys() {
1591 let rt = rt();
1592 let tok = NamespaceToken::local();
1593 let entity = rt
1594 .create_entity(
1595 &tok,
1596 "concept",
1597 None,
1598 "MergeProps",
1599 None,
1600 Some(serde_json::json!({
1601 "domain": "inference",
1602 "repo": "lattice",
1603 "status": "researched",
1604 })),
1605 vec![],
1606 )
1607 .await
1608 .unwrap();
1609
1610 let updated = rt
1611 .update_entity(
1612 &tok,
1613 entity.id,
1614 EntityPatch {
1615 properties: Some(serde_json::json!({"status": "implemented"})),
1616 ..Default::default()
1617 },
1618 )
1619 .await
1620 .unwrap();
1621
1622 let props = updated.properties.expect("properties should remain set");
1623 assert_eq!(props["domain"], "inference", "domain key must be preserved");
1624 assert_eq!(props["repo"], "lattice", "repo key must be preserved");
1625 assert_eq!(
1626 props["status"], "implemented",
1627 "status key must be updated by patch"
1628 );
1629 }
1630
1631 #[tokio::test]
1632 async fn update_entity_skips_reindex_when_only_properties_change() {
1633 let rt = rt();
1634 let tok = NamespaceToken::local();
1635 let entity = rt
1636 .create_entity(&tok, "concept", None, "StableIndexed", None, None, vec![])
1637 .await
1638 .unwrap();
1639
1640 let hits_before = fts_hit(&rt, &tok, "StableIndexed").await;
1642 assert!(hits_before.contains(&entity.id));
1643
1644 rt.update_entity(
1646 &tok,
1647 entity.id,
1648 EntityPatch {
1649 properties: Some(serde_json::json!({"new": "prop"})),
1650 ..Default::default()
1651 },
1652 )
1653 .await
1654 .unwrap();
1655
1656 let hits_after = fts_hit(&rt, &tok, "StableIndexed").await;
1657 assert!(
1658 hits_after.contains(&entity.id),
1659 "still findable after props-only patch"
1660 );
1661 }
1662
1663 #[tokio::test]
1664 async fn merge_entity_rewires_edges() {
1665 let rt = rt();
1666 let tok = NamespaceToken::local();
1667 let a = rt
1668 .create_entity(&tok, "concept", None, "A", None, None, vec![])
1669 .await
1670 .unwrap();
1671 let b = rt
1672 .create_entity(&tok, "concept", None, "B", None, None, vec![])
1673 .await
1674 .unwrap();
1675 let c = rt
1676 .create_entity(&tok, "concept", None, "C", None, None, vec![])
1677 .await
1678 .unwrap();
1679 let d = rt
1680 .create_entity(&tok, "concept", None, "D", None, None, vec![])
1681 .await
1682 .unwrap();
1683
1684 rt.link(&tok, a.id, b.id, EdgeRelation::Extends, 1.0, None)
1686 .await
1687 .unwrap();
1688 rt.link(&tok, c.id, b.id, EdgeRelation::Extends, 1.0, None)
1689 .await
1690 .unwrap();
1691
1692 let summary = rt
1693 .merge_entity(&tok, d.id, b.id, EntityDedupMergePolicy::PreferInto, false)
1694 .await
1695 .unwrap();
1696
1697 assert_eq!(summary.kept_id, d.id);
1698 assert_eq!(summary.removed_id, b.id);
1699 assert_eq!(summary.edges_rewired, 2);
1700
1701 let a_neighbors = rt
1703 .neighbors(&tok, a.id, Direction::Out, None, None)
1704 .await
1705 .unwrap();
1706 assert_eq!(a_neighbors.len(), 1);
1707 assert_eq!(a_neighbors[0].node_id, d.id);
1708
1709 let c_neighbors = rt
1710 .neighbors(&tok, c.id, Direction::Out, None, None)
1711 .await
1712 .unwrap();
1713 assert_eq!(c_neighbors.len(), 1);
1714 assert_eq!(c_neighbors[0].node_id, d.id);
1715 }
1716
1717 #[tokio::test]
1718 async fn merge_entity_self_merge_rejected() {
1719 let rt = rt();
1720 let tok = NamespaceToken::local();
1721 let a = rt
1722 .create_entity(&tok, "concept", None, "A", None, None, vec![])
1723 .await
1724 .unwrap();
1725 let err = rt
1726 .merge_entity(&tok, a.id, a.id, EntityDedupMergePolicy::PreferInto, false)
1727 .await
1728 .unwrap_err();
1729 assert!(
1730 format!("{err:?}").contains("cannot merge an entity into itself"),
1731 "expected self-merge rejection, got: {err:?}"
1732 );
1733 }
1734
1735 #[tokio::test]
1736 async fn merge_entity_prefer_into_strategy() {
1737 let rt = rt();
1738 let tok = NamespaceToken::local();
1739 let into = rt
1740 .create_entity(
1741 &tok,
1742 "concept",
1743 None,
1744 "Into",
1745 None,
1746 Some(serde_json::json!({"a": 1})),
1747 vec![],
1748 )
1749 .await
1750 .unwrap();
1751 let from = rt
1752 .create_entity(
1753 &tok,
1754 "concept",
1755 None,
1756 "From",
1757 None,
1758 Some(serde_json::json!({"a": 2, "b": 3})),
1759 vec![],
1760 )
1761 .await
1762 .unwrap();
1763
1764 rt.merge_entity(
1765 &tok,
1766 into.id,
1767 from.id,
1768 EntityDedupMergePolicy::PreferInto,
1769 false,
1770 )
1771 .await
1772 .unwrap();
1773
1774 let kept = rt.get_entity(&tok, into.id).await.unwrap();
1775 let props = kept.properties.unwrap();
1776 assert_eq!(props["a"], 1);
1778 assert_eq!(props["b"], 3);
1779 }
1780
1781 #[tokio::test]
1782 async fn merge_entity_prefer_from_strategy() {
1783 let rt = rt();
1784 let tok = NamespaceToken::local();
1785 let into = rt
1786 .create_entity(
1787 &tok,
1788 "concept",
1789 None,
1790 "Into",
1791 None,
1792 Some(serde_json::json!({"a": 1})),
1793 vec![],
1794 )
1795 .await
1796 .unwrap();
1797 let from = rt
1798 .create_entity(
1799 &tok,
1800 "concept",
1801 None,
1802 "From",
1803 None,
1804 Some(serde_json::json!({"a": 2, "b": 3})),
1805 vec![],
1806 )
1807 .await
1808 .unwrap();
1809
1810 rt.merge_entity(
1811 &tok,
1812 into.id,
1813 from.id,
1814 EntityDedupMergePolicy::PreferFrom,
1815 false,
1816 )
1817 .await
1818 .unwrap();
1819
1820 let kept = rt.get_entity(&tok, into.id).await.unwrap();
1821 let props = kept.properties.unwrap();
1822 assert_eq!(props["a"], 2);
1824 assert_eq!(props["b"], 3);
1825 }
1826
1827 #[tokio::test]
1828 async fn merge_entity_union_strategy() {
1829 let rt = rt();
1830 let tok = NamespaceToken::local();
1831 let into = rt
1832 .create_entity(
1833 &tok,
1834 "concept",
1835 None,
1836 "Into",
1837 None,
1838 Some(serde_json::json!({"a": 1})),
1839 vec![],
1840 )
1841 .await
1842 .unwrap();
1843 let from = rt
1844 .create_entity(
1845 &tok,
1846 "concept",
1847 None,
1848 "From",
1849 None,
1850 Some(serde_json::json!({"a": 2, "b": 3})),
1851 vec![],
1852 )
1853 .await
1854 .unwrap();
1855
1856 rt.merge_entity(&tok, into.id, from.id, EntityDedupMergePolicy::Union, false)
1857 .await
1858 .unwrap();
1859
1860 let kept = rt.get_entity(&tok, into.id).await.unwrap();
1861 let props = kept.properties.unwrap();
1862 assert_eq!(props["a"], 1);
1864 assert_eq!(props["b"], 3);
1865 }
1866
1867 #[tokio::test]
1868 async fn merge_entity_unions_tags() {
1869 let rt = rt();
1870 let tok = NamespaceToken::local();
1871 let into = rt
1872 .create_entity(
1873 &tok,
1874 "concept",
1875 None,
1876 "Into",
1877 None,
1878 None,
1879 vec!["x".to_string(), "y".to_string()],
1880 )
1881 .await
1882 .unwrap();
1883 let from = rt
1884 .create_entity(
1885 &tok,
1886 "concept",
1887 None,
1888 "From",
1889 None,
1890 None,
1891 vec!["y".to_string(), "z".to_string()],
1892 )
1893 .await
1894 .unwrap();
1895
1896 rt.merge_entity(
1897 &tok,
1898 into.id,
1899 from.id,
1900 EntityDedupMergePolicy::PreferInto,
1901 false,
1902 )
1903 .await
1904 .unwrap();
1905
1906 let kept = rt.get_entity(&tok, into.id).await.unwrap();
1907 let mut tags = kept.tags.clone();
1908 tags.sort();
1909 assert_eq!(tags, vec!["x", "y", "z"]);
1910 }
1911
1912 #[tokio::test]
1913 async fn merge_entity_drops_self_loops() {
1914 let rt = rt();
1915 let tok = NamespaceToken::local();
1916 let a = rt
1917 .create_entity(&tok, "concept", None, "A", None, None, vec![])
1918 .await
1919 .unwrap();
1920 let b = rt
1921 .create_entity(&tok, "concept", None, "B", None, None, vec![])
1922 .await
1923 .unwrap();
1924
1925 rt.link(&tok, a.id, b.id, EdgeRelation::Extends, 1.0, None)
1927 .await
1928 .unwrap();
1929
1930 let summary = rt
1931 .merge_entity(&tok, a.id, b.id, EntityDedupMergePolicy::PreferInto, false)
1932 .await
1933 .unwrap();
1934
1935 assert_eq!(
1936 summary.edges_rewired, 0,
1937 "self-loop should be dropped, not rewired"
1938 );
1939
1940 let a_out = rt
1941 .neighbors(&tok, a.id, Direction::Out, None, None)
1942 .await
1943 .unwrap();
1944 assert!(a_out.is_empty(), "no self-loop should remain");
1945 }
1946
1947 #[test]
1950 fn union_tags_deduplicates() {
1951 let (tags, added) = union_tags(
1952 &["x".to_string(), "y".to_string()],
1953 &["y".to_string(), "z".to_string()],
1954 );
1955 let mut sorted = tags.clone();
1956 sorted.sort();
1957 assert_eq!(sorted, vec!["x", "y", "z"]);
1958 assert_eq!(added, 1);
1959 }
1960
1961 #[test]
1962 fn merge_properties_prefer_into_fills_missing_keys() {
1963 let a = serde_json::json!({"a": 1});
1964 let b = serde_json::json!({"a": 99, "b": 2});
1965 let (merged, added) =
1966 merge_properties(&Some(a), &Some(b), EntityDedupMergePolicy::PreferInto);
1967 let m = merged.unwrap();
1968 assert_eq!(m["a"], 1);
1969 assert_eq!(m["b"], 2);
1970 assert_eq!(added, 1);
1971 }
1972
1973 #[tokio::test]
1976 async fn merge_entity_tombstones_source_with_provenance() {
1977 let rt = rt();
1978 let tok = NamespaceToken::local();
1979 let into = rt
1980 .create_entity(&tok, "concept", None, "Into", None, None, vec![])
1981 .await
1982 .unwrap();
1983 let from = rt
1984 .create_entity(&tok, "concept", None, "From", None, None, vec![])
1985 .await
1986 .unwrap();
1987 let from_id = from.id;
1988
1989 rt.merge_entity(
1990 &tok,
1991 into.id,
1992 from_id,
1993 EntityDedupMergePolicy::PreferInto,
1994 false,
1995 )
1996 .await
1997 .unwrap();
1998
1999 assert!(
2001 rt.get_entity(&tok, from_id).await.is_err(),
2002 "tombstoned source should not be returned by get_entity"
2003 );
2004
2005 let pool = rt.backend().pool_arc();
2007 let (deleted_at, merged_into): (Option<i64>, Option<String>) =
2008 tokio::task::spawn_blocking(move || {
2009 let guard = pool.writer().unwrap();
2010 guard
2011 .conn()
2012 .query_row(
2013 "SELECT deleted_at, merged_into FROM entities WHERE id = ?1",
2014 [from_id.to_string()],
2015 |row| Ok((row.get(0)?, row.get(1)?)),
2016 )
2017 .unwrap()
2018 })
2019 .await
2020 .unwrap();
2021 assert!(
2022 deleted_at.is_some(),
2023 "tombstoned entity must have deleted_at set"
2024 );
2025 assert_eq!(
2026 merged_into.as_deref(),
2027 Some(into.id.to_string().as_str()),
2028 "merged_into must point to into_id"
2029 );
2030 }
2031
2032 #[tokio::test]
2033 async fn merge_note_same_kind_appends_content() {
2034 let rt = rt();
2035 let tok = NamespaceToken::local();
2036 let into = rt
2037 .create_note(
2038 &tok,
2039 "observation",
2040 None,
2041 "Into content",
2042 None,
2043 None,
2044 vec![],
2045 )
2046 .await
2047 .unwrap();
2048 let from = rt
2049 .create_note(
2050 &tok,
2051 "observation",
2052 None,
2053 "From content",
2054 None,
2055 None,
2056 vec![],
2057 )
2058 .await
2059 .unwrap();
2060 let from_id = from.id;
2061
2062 let summary = rt
2063 .merge_note(
2064 &tok,
2065 into.id,
2066 from_id,
2067 EntityDedupMergePolicy::PreferInto,
2068 ContentMergeStrategy::Append,
2069 false,
2070 )
2071 .await
2072 .unwrap();
2073
2074 assert_eq!(summary.kept_id, into.id);
2075 assert_eq!(summary.removed_id, from_id);
2076 assert!(summary.content_appended);
2077 assert!(!summary.dry_run);
2078
2079 let from_store = rt.notes(&tok).unwrap();
2081 assert!(
2082 from_store.get_note(from_id).await.unwrap().is_none(),
2083 "merged-from note should be soft-deleted"
2084 );
2085 }
2086
2087 #[tokio::test]
2088 async fn merge_note_different_kinds_rejected() {
2089 let rt = rt();
2090 let tok = NamespaceToken::local();
2091 let into = rt
2092 .create_note(&tok, "observation", None, "Into", None, None, vec![])
2093 .await
2094 .unwrap();
2095 let from = rt
2096 .create_note(&tok, "decision", None, "From", None, None, vec![])
2097 .await
2098 .unwrap();
2099
2100 let result = rt
2101 .merge_note(
2102 &tok,
2103 into.id,
2104 from.id,
2105 EntityDedupMergePolicy::PreferInto,
2106 ContentMergeStrategy::Append,
2107 false,
2108 )
2109 .await;
2110 assert!(result.is_err(), "merging different note kinds must fail");
2111 }
2112
2113 #[tokio::test]
2114 async fn merge_note_dry_run_leaves_notes_unchanged() {
2115 let rt = rt();
2116 let tok = NamespaceToken::local();
2117 let into = rt
2118 .create_note(
2119 &tok,
2120 "observation",
2121 None,
2122 "Into content",
2123 None,
2124 None,
2125 vec![],
2126 )
2127 .await
2128 .unwrap();
2129 let from = rt
2130 .create_note(
2131 &tok,
2132 "observation",
2133 None,
2134 "From content",
2135 None,
2136 None,
2137 vec![],
2138 )
2139 .await
2140 .unwrap();
2141 let into_id = into.id;
2142 let from_id = from.id;
2143
2144 let summary = rt
2145 .merge_note(
2146 &tok,
2147 into_id,
2148 from_id,
2149 EntityDedupMergePolicy::PreferInto,
2150 ContentMergeStrategy::Append,
2151 true,
2152 )
2153 .await
2154 .unwrap();
2155
2156 assert!(summary.dry_run);
2157
2158 let store = rt.notes(&tok).unwrap();
2160 let into_after = store.get_note(into_id).await.unwrap().unwrap();
2161 let from_after = store.get_note(from_id).await.unwrap().unwrap();
2162 assert_eq!(
2163 into_after.content, "Into content",
2164 "dry_run must not mutate into-note"
2165 );
2166 assert_eq!(
2167 from_after.content, "From content",
2168 "dry_run must not mutate from-note"
2169 );
2170 }
2171
2172 #[tokio::test]
2173 async fn update_edge_updates_properties() {
2174 use khive_storage::EdgeRelation;
2175 let rt = rt();
2176 let tok = NamespaceToken::local();
2177 let a = rt
2178 .create_entity(&tok, "concept", None, "A", None, None, vec![])
2179 .await
2180 .unwrap();
2181 let b = rt
2182 .create_entity(&tok, "concept", None, "B", None, None, vec![])
2183 .await
2184 .unwrap();
2185 let edge = rt
2186 .link(&tok, a.id, b.id, EdgeRelation::Extends, 0.5, None)
2187 .await
2188 .unwrap();
2189 let edge_id: Uuid = edge.id.into();
2190
2191 let updated = rt
2192 .update_edge(
2193 &tok,
2194 edge_id,
2195 EdgePatch {
2196 properties: Some(serde_json::json!({"source": "manual"})),
2197 ..Default::default()
2198 },
2199 )
2200 .await
2201 .unwrap();
2202
2203 assert_eq!(updated.metadata.as_ref().unwrap()["source"], "manual");
2204 assert!((updated.weight - 0.5).abs() < 0.001, "weight unchanged");
2205 }
2206}