1use std::collections::HashSet;
8
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use uuid::Uuid;
12
13use khive_db::SqliteError;
14use khive_storage::types::{EdgeFilter, TextDocument};
15use khive_storage::{EdgeRelation, Entity, SubstrateKind};
16use khive_types::EventKind;
17use rusqlite::OptionalExtension;
18
19use crate::error::{RuntimeError, RuntimeResult};
20use crate::operations::canonical_edge_endpoints;
21use crate::runtime::{KhiveRuntime, NamespaceToken};
22
23#[derive(Clone, Debug, Default)]
45pub struct EntityPatch {
46 pub name: Option<String>,
47 pub description: Option<Option<String>>,
48 pub properties: Option<Value>,
49 pub tags: Option<Vec<String>>,
50}
51
52#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
54#[serde(rename_all = "snake_case")]
55pub enum EntityDedupMergePolicy {
56 #[default]
59 PreferInto,
60 PreferFrom,
62 Union,
64}
65
66#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
68#[serde(rename_all = "snake_case")]
69pub enum ContentMergeStrategy {
70 #[default]
71 Append,
72 PreferInto,
73 PreferFrom,
74}
75
76#[derive(Clone, Debug, Serialize, Deserialize)]
78pub struct MergeSummary {
79 pub kept_id: Uuid,
80 pub removed_id: Uuid,
81 pub edges_rewired: usize,
82 pub properties_merged: usize,
83 pub tags_unioned: usize,
84 pub content_appended: bool,
85 pub dry_run: bool,
86}
87
88#[derive(Clone, Debug, Default)]
93pub struct EdgePatch {
94 pub relation: Option<EdgeRelation>,
95 pub weight: Option<f64>,
96 pub properties: Option<Value>,
97}
98
99#[derive(Clone, Debug, Default)]
106pub struct NotePatch {
107 pub name: Option<Option<String>>,
108 pub content: Option<String>,
109 pub salience: Option<Option<f64>>,
110 pub decay_factor: Option<Option<f64>>,
111 pub properties: Option<Value>,
112 pub(crate) kind_status: Option<String>,
113}
114
115impl NotePatch {
116 pub fn new(
119 name: Option<Option<String>>,
120 content: Option<String>,
121 salience: Option<Option<f64>>,
122 decay_factor: Option<Option<f64>>,
123 properties: Option<Value>,
124 ) -> Self {
125 Self {
126 name,
127 content,
128 salience,
129 decay_factor,
130 properties,
131 kind_status: None,
132 }
133 }
134}
135
136#[derive(Clone, Debug, Default)]
138pub struct EdgeListFilter {
139 pub source_id: Option<Uuid>,
140 pub target_id: Option<Uuid>,
141 pub relations: Vec<EdgeRelation>,
143 pub min_weight: Option<f64>,
144 pub max_weight: Option<f64>,
145}
146
147impl From<EdgeListFilter> for EdgeFilter {
148 fn from(f: EdgeListFilter) -> Self {
149 EdgeFilter {
150 source_ids: f.source_id.into_iter().collect(),
151 target_ids: f.target_id.into_iter().collect(),
152 relations: f.relations,
153 min_weight: f.min_weight,
154 max_weight: f.max_weight,
155 ..Default::default()
156 }
157 }
158}
159
160impl KhiveRuntime {
165 pub async fn update_entity(
173 &self,
174 token: &NamespaceToken,
175 id: Uuid,
176 patch: EntityPatch,
177 ) -> RuntimeResult<Entity> {
178 let store = self.entities(token)?;
179 let mut entity = store
180 .get_entity(id)
181 .await?
182 .ok_or_else(|| RuntimeError::NotFound(format!("entity {id}")))?;
183
184 Self::ensure_namespace(&entity.namespace, token.namespace().as_str())?;
185
186 let mut text_changed = false;
187 let mut changed_fields: Vec<&'static str> = Vec::new();
188
189 if let Some(name) = patch.name {
190 text_changed |= entity.name != name;
191 entity.name = name;
192 changed_fields.push("name");
193 }
194 if let Some(desc_patch) = patch.description {
195 text_changed |= entity.description != desc_patch;
196 entity.description = desc_patch;
197 changed_fields.push("description");
198 }
199 if let Some(props) = patch.properties {
200 let (merged, _) = merge_properties(
201 &entity.properties,
202 &Some(props),
203 EntityDedupMergePolicy::PreferFrom,
204 );
205 entity.properties = merged;
206 changed_fields.push("properties");
207 }
208 if let Some(tags) = patch.tags {
209 entity.tags = tags;
210 changed_fields.push("tags");
211 }
212
213 entity.updated_at = chrono::Utc::now().timestamp_micros();
214 store.upsert_entity(entity.clone()).await?;
215
216 if text_changed {
217 self.reindex_entity(token, &entity).await?;
218 }
219
220 let event_store = self.events(token)?;
221 let event = khive_storage::event::Event::new(
222 entity.namespace.clone(),
223 "update",
224 EventKind::EntityUpdated,
225 SubstrateKind::Entity,
226 "",
227 )
228 .with_target(entity.id)
229 .with_payload(serde_json::json!({
230 "id": entity.id,
231 "namespace": entity.namespace,
232 "changed_fields": changed_fields,
233 }));
234 event_store.append_event(event).await.map_err(|e| {
235 RuntimeError::Internal(format!("update_entity: event store write failed: {e}"))
236 })?;
237
238 Ok(entity)
239 }
240
241 pub async fn merge_entity(
254 &self,
255 token: &NamespaceToken,
256 into_id: Uuid,
257 from_id: Uuid,
258 strategy: EntityDedupMergePolicy,
259 dry_run: bool,
260 ) -> RuntimeResult<MergeSummary> {
261 if into_id == from_id {
262 return Err(RuntimeError::InvalidInput(
263 "cannot merge an entity into itself".into(),
264 ));
265 }
266 {
270 let into_entity = self.get_entity(token, into_id).await?;
271 let from_entity = self.get_entity(token, from_id).await?;
272 if into_entity.kind != from_entity.kind {
273 return Err(RuntimeError::InvalidInput(format!(
274 "cannot merge entities of different kinds: into={} ({}), from={} ({}); \
275 merge requires both entities to share the same kind",
276 into_id, into_entity.kind, from_id, from_entity.kind
277 )));
278 }
279 }
280 let ns = token.namespace().as_str().to_owned();
281 let sanitized_ns: String = ns
282 .chars()
283 .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
284 .collect();
285 let fts_table = format!("fts_entities_{}", sanitized_ns);
286 let vec_table = self.config().embedding_model.map(|model| {
287 let key: String = model
288 .to_string()
289 .chars()
290 .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
291 .collect();
292 format!("vec_{}", key)
293 });
294
295 let _ = self.entities(token)?;
298 let _ = self.graph(token)?;
299 let _ = self.text(token)?;
300 if self.config().embedding_model.is_some() {
301 let _ = self.vectors(token)?;
302 }
303
304 let pool = self.backend().pool_arc();
305
306 let (summary, updated_entity) = tokio::task::spawn_blocking(move || {
307 let guard = pool.writer()?;
308 guard.transaction(|conn| {
309 merge_entity_sql(
310 conn, ns, fts_table, vec_table, into_id, from_id, strategy, dry_run,
311 )
312 })
313 })
314 .await
315 .map_err(|e| RuntimeError::Internal(e.to_string()))??;
316
317 if !dry_run && self.config().embedding_model.is_some() {
320 self.reindex_entity(token, &updated_entity).await?;
321 }
322
323 let event_store = self.events(token)?;
324 let policy_str = match strategy {
327 EntityDedupMergePolicy::PreferInto => "prefer_into",
328 EntityDedupMergePolicy::PreferFrom => "prefer_from",
329 EntityDedupMergePolicy::Union => "union",
330 };
331 let event = khive_storage::event::Event::new(
332 updated_entity.namespace.clone(),
333 "merge",
334 EventKind::EntityMerged,
335 SubstrateKind::Entity,
336 "",
337 )
338 .with_target(summary.kept_id)
339 .with_payload(serde_json::json!({
340 "into_id": summary.kept_id,
341 "from_id": summary.removed_id,
342 "policy": policy_str,
343 "edges_rewired": summary.edges_rewired,
344 }));
345 event_store.append_event(event).await.map_err(|e| {
346 RuntimeError::Internal(format!("merge_entity: event store write failed: {e}"))
347 })?;
348
349 Ok(summary)
350 }
351
352 pub(crate) async fn reindex_entity(
360 &self,
361 token: &NamespaceToken,
362 entity: &Entity,
363 ) -> RuntimeResult<()> {
364 let body = match &entity.description {
365 Some(d) if !d.is_empty() => format!("{} {}", entity.name, d),
366 _ => entity.name.clone(),
367 };
368 let ns = entity.namespace.clone();
370 self.text(token)?
371 .upsert_document(TextDocument {
372 subject_id: entity.id,
373 kind: SubstrateKind::Entity,
374 title: Some(entity.name.clone()),
375 body: body.clone(),
376 tags: entity.tags.clone(),
377 namespace: ns.clone(),
378 metadata: entity.properties.clone(),
379 updated_at: chrono::Utc::now(),
380 })
381 .await?;
382
383 if self.config().embedding_model.is_some() {
384 let vector = self.embed(&body).await?;
385 self.vectors(token)?
386 .insert(
387 entity.id,
388 SubstrateKind::Entity,
389 &ns,
390 "entity.body",
391 vec![vector],
392 )
393 .await?;
394 }
395
396 Ok(())
397 }
398
399 pub(crate) async fn remove_from_indexes(
401 &self,
402 token: &NamespaceToken,
403 id: Uuid,
404 ) -> RuntimeResult<()> {
405 let ns = token.namespace().as_str().to_owned();
406 self.text(token)?.delete_document(&ns, id).await?;
407 if self.config().embedding_model.is_some() {
408 self.vectors(token)?.delete(id).await?;
409 }
410 Ok(())
411 }
412
413 pub(crate) async fn reindex_note(
415 &self,
416 token: &NamespaceToken,
417 note: &khive_storage::note::Note,
418 ) -> RuntimeResult<()> {
419 let ns = note.namespace.clone();
420 self.text_for_notes(token)?
421 .upsert_document(TextDocument {
422 subject_id: note.id,
423 kind: SubstrateKind::Note,
424 title: note.name.clone(),
425 body: note.content.clone(),
426 tags: Vec::new(),
427 namespace: ns.clone(),
428 metadata: note.properties.clone(),
429 updated_at: chrono::Utc::now(),
430 })
431 .await?;
432
433 if self.config().embedding_model.is_some() {
434 let vector = self.embed(¬e.content).await?;
435 self.vectors(token)?
436 .insert(
437 note.id,
438 SubstrateKind::Note,
439 &ns,
440 "note.content",
441 vec![vector],
442 )
443 .await?;
444 }
445 Ok(())
446 }
447
448 pub async fn update_note(
450 &self,
451 token: &NamespaceToken,
452 id: Uuid,
453 patch: NotePatch,
454 ) -> RuntimeResult<khive_storage::note::Note> {
455 let store = self.notes(token)?;
456 let mut note = store
457 .get_note(id)
458 .await?
459 .ok_or_else(|| RuntimeError::NotFound(format!("note {id}")))?;
460
461 Self::ensure_namespace(¬e.namespace, token.namespace().as_str())?;
462
463 let mut text_changed = false;
464
465 if let Some(name_patch) = patch.name {
466 text_changed |= note.name != name_patch;
467 note.name = name_patch;
468 }
469 if let Some(content) = patch.content {
470 text_changed |= note.content != content;
471 note.content = content;
472 }
473 if let Some(salience_patch) = patch.salience {
474 note.salience = salience_patch.map(|s| s.clamp(0.0, 1.0));
475 }
476 if let Some(decay_patch) = patch.decay_factor {
477 note.decay_factor = decay_patch.map(|d| d.max(0.0));
478 }
479 if let Some(props) = patch.properties {
480 let (merged, _) = merge_properties(
481 ¬e.properties,
482 &Some(props),
483 EntityDedupMergePolicy::PreferFrom,
484 );
485 note.properties = merged;
486 }
487 if let Some(status) = patch.kind_status {
488 note.status = status;
489 }
490
491 note.updated_at = chrono::Utc::now().timestamp_micros();
492 store.upsert_note(note.clone()).await?;
493
494 if text_changed {
495 self.reindex_note(token, ¬e).await?;
496 }
497
498 Ok(note)
499 }
500
501 pub async fn merge_note(
510 &self,
511 token: &NamespaceToken,
512 into_id: Uuid,
513 from_id: Uuid,
514 strategy: EntityDedupMergePolicy,
515 content_strategy: ContentMergeStrategy,
516 dry_run: bool,
517 ) -> RuntimeResult<MergeSummary> {
518 if into_id == from_id {
519 return Err(RuntimeError::InvalidInput(
520 "cannot merge a note into itself".into(),
521 ));
522 }
523 let ns = token.namespace().as_str().to_string();
524 let sanitized_ns: String = ns
525 .chars()
526 .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
527 .collect();
528 let fts_table = format!("fts_notes_{}", sanitized_ns);
529 let vec_table = self.config().embedding_model.map(|model| {
530 let key: String = model
531 .to_string()
532 .chars()
533 .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
534 .collect();
535 format!("vec_{}", key)
536 });
537
538 let note_store = self.notes(token)?;
539 let into_note = note_store
540 .get_note(into_id)
541 .await?
542 .ok_or_else(|| RuntimeError::NotFound("not found in this namespace".into()))?;
543 Self::ensure_namespace(&into_note.namespace, &ns)?;
544
545 let from_note = note_store
546 .get_note(from_id)
547 .await?
548 .ok_or_else(|| RuntimeError::NotFound("not found in this namespace".into()))?;
549 Self::ensure_namespace(&from_note.namespace, &ns)?;
550
551 let _ = self.graph(token)?;
552 let _ = self.text_for_notes(token)?;
553 if self.config().embedding_model.is_some() {
554 let _ = self.vectors(token)?;
555 }
556
557 let pool = self.backend().pool_arc();
558 let (summary, updated_note) = tokio::task::spawn_blocking(move || {
559 let guard = pool.writer()?;
560 guard.transaction(|conn| {
561 merge_note_sql(
562 conn,
563 ns,
564 fts_table,
565 vec_table,
566 into_id,
567 from_id,
568 strategy,
569 content_strategy,
570 dry_run,
571 )
572 })
573 })
574 .await
575 .map_err(|e| RuntimeError::Internal(e.to_string()))??;
576
577 if !dry_run && self.config().embedding_model.is_some() {
578 self.reindex_note(token, &updated_note).await?;
579 }
580 Ok(summary)
581 }
582}
583
584fn read_merge_entity(
590 conn: &rusqlite::Connection,
591 id: Uuid,
592 namespace: &str,
593) -> Result<Entity, SqliteError> {
594 let id_str = id.to_string();
595 let mut stmt = conn.prepare(
596 "SELECT id, namespace, kind, entity_type, name, description, properties, tags, \
597 created_at, updated_at, deleted_at, merged_into, merge_event_id \
598 FROM entities WHERE id = ?1 AND deleted_at IS NULL",
599 )?;
600 let mut rows = stmt.query(rusqlite::params![id_str])?;
601 let row = rows
602 .next()?
603 .ok_or_else(|| SqliteError::InvalidData(format!("entity {id} not found")))?;
604
605 let id_s: String = row.get(0)?;
606 let ns: String = row.get(1)?;
607 let kind: String = row.get(2)?;
608 let entity_type: Option<String> = row.get(3)?;
609 let name: String = row.get(4)?;
610 let description: Option<String> = row.get(5)?;
611 let properties_str: Option<String> = row.get(6)?;
612 let tags_str: String = row.get(7)?;
613 let created_at: i64 = row.get(8)?;
614 let updated_at: i64 = row.get(9)?;
615 let deleted_at: Option<i64> = row.get(10)?;
616 let merged_into_str: Option<String> = row.get(11)?;
617 let merge_event_id_str: Option<String> = row.get(12)?;
618
619 if ns != namespace {
620 return Err(SqliteError::InvalidData(format!(
621 "entity {id} belongs to namespace '{ns}', not '{namespace}'"
622 )));
623 }
624
625 let entity_id = Uuid::parse_str(&id_s).map_err(|e| SqliteError::InvalidData(e.to_string()))?;
626 let properties: Option<Value> = properties_str
627 .map(|s| {
628 serde_json::from_str::<Value>(&s).map_err(|e| SqliteError::InvalidData(e.to_string()))
629 })
630 .transpose()?;
631 let tags: Vec<String> =
632 serde_json::from_str(&tags_str).map_err(|e| SqliteError::InvalidData(e.to_string()))?;
633 let merged_into = merged_into_str
634 .as_deref()
635 .map(Uuid::parse_str)
636 .transpose()
637 .map_err(|e| SqliteError::InvalidData(e.to_string()))?;
638 let merge_event_id = merge_event_id_str
639 .as_deref()
640 .map(Uuid::parse_str)
641 .transpose()
642 .map_err(|e| SqliteError::InvalidData(e.to_string()))?;
643
644 Ok(Entity {
645 id: entity_id,
646 namespace: ns,
647 kind,
648 entity_type,
649 name,
650 description,
651 properties,
652 tags,
653 created_at,
654 updated_at,
655 deleted_at,
656 merged_into,
657 merge_event_id,
658 })
659}
660
661#[allow(clippy::too_many_arguments)]
669fn merge_entity_sql(
670 conn: &rusqlite::Connection,
671 namespace: String,
672 fts_table: String,
673 vec_table: Option<String>,
674 into_id: Uuid,
675 from_id: Uuid,
676 strategy: EntityDedupMergePolicy,
677 dry_run: bool,
678) -> Result<(MergeSummary, Entity), SqliteError> {
679 let into_entity = read_merge_entity(conn, into_id, &namespace)?;
680 let from_entity = read_merge_entity(conn, from_id, &namespace)?;
681
682 #[allow(dead_code)]
684 struct EdgeRow {
685 id: Uuid,
686 source_id: Uuid,
687 target_id: Uuid,
688 relation: String,
689 weight: f64,
690 created_at: i64,
691 updated_at: i64,
692 deleted_at: Option<i64>,
693 target_backend: Option<String>,
694 metadata: Option<String>,
695 }
696
697 let parse_id =
698 |s: String| Uuid::parse_str(&s).map_err(|e| SqliteError::InvalidData(e.to_string()));
699
700 let from_str = from_id.to_string();
701
702 let mut outbound: Vec<EdgeRow> = Vec::new();
703 {
704 let mut stmt = conn.prepare(
705 "SELECT id, source_id, target_id, relation, weight, created_at, \
706 updated_at, deleted_at, target_backend, metadata \
707 FROM graph_edges WHERE namespace = ?1 AND source_id = ?2",
708 )?;
709 let mut rows = stmt.query(rusqlite::params![&namespace, &from_str])?;
710 while let Some(row) = rows.next()? {
711 outbound.push(EdgeRow {
712 id: parse_id(row.get(0)?)?,
713 source_id: parse_id(row.get(1)?)?,
714 target_id: parse_id(row.get(2)?)?,
715 relation: row.get(3)?,
716 weight: row.get(4)?,
717 created_at: row.get(5)?,
718 updated_at: row.get(6)?,
719 deleted_at: row.get(7)?,
720 target_backend: row.get(8)?,
721 metadata: row.get(9)?,
722 });
723 }
724 }
725
726 let mut inbound: Vec<EdgeRow> = Vec::new();
727 {
728 let mut stmt = conn.prepare(
729 "SELECT id, source_id, target_id, relation, weight, created_at, \
730 updated_at, deleted_at, target_backend, metadata \
731 FROM graph_edges WHERE namespace = ?1 AND target_id = ?2",
732 )?;
733 let mut rows = stmt.query(rusqlite::params![&namespace, &from_str])?;
734 while let Some(row) = rows.next()? {
735 inbound.push(EdgeRow {
736 id: parse_id(row.get(0)?)?,
737 source_id: parse_id(row.get(1)?)?,
738 target_id: parse_id(row.get(2)?)?,
739 relation: row.get(3)?,
740 weight: row.get(4)?,
741 created_at: row.get(5)?,
742 updated_at: row.get(6)?,
743 deleted_at: row.get(7)?,
744 target_backend: row.get(8)?,
745 metadata: row.get(9)?,
746 });
747 }
748 }
749
750 let mut seen: HashSet<Uuid> = HashSet::new();
752 let mut all_edges: Vec<EdgeRow> = Vec::new();
753 for edge in outbound.into_iter().chain(inbound) {
754 if seen.insert(edge.id) {
755 all_edges.push(edge);
756 }
757 }
758
759 let (merged_props, properties_merged) =
761 merge_properties(&into_entity.properties, &from_entity.properties, strategy);
762 let merged_name = merge_string_field(&into_entity.name, &from_entity.name, strategy);
763 let merged_description =
764 merge_option_string_field(&into_entity.description, &from_entity.description, strategy);
765 let (merged_tags, tags_unioned) = union_tags(&into_entity.tags, &from_entity.tags);
766
767 let now = chrono::Utc::now().timestamp_micros();
768 let into_str = into_id.to_string();
769 let props_str = merged_props
770 .as_ref()
771 .map(|v| serde_json::to_string(v).unwrap_or_default());
772 let tags_json = serde_json::to_string(&merged_tags).unwrap_or_else(|_| "[]".to_string());
773
774 let mut edges_rewired = 0usize;
776 if !dry_run {
777 for edge in all_edges {
778 let raw_src = if edge.source_id == from_id {
779 into_id
780 } else {
781 edge.source_id
782 };
783 let raw_tgt = if edge.target_id == from_id {
784 into_id
785 } else {
786 edge.target_id
787 };
788 let (new_src, new_tgt) = match edge.relation.parse::<EdgeRelation>() {
791 Ok(rel) => canonical_edge_endpoints(rel, raw_src, raw_tgt),
792 Err(_) => (raw_src, raw_tgt),
793 };
794
795 if new_src == new_tgt {
796 conn.execute(
797 "DELETE FROM graph_edges WHERE namespace = ?1 AND id = ?2",
798 rusqlite::params![&namespace, edge.id.to_string()],
799 )?;
800 continue;
801 }
802
803 let now_ts = chrono::Utc::now().timestamp();
804 let conflict_id: Option<String> = {
818 let conflict_src = new_src.to_string();
819 let conflict_tgt = new_tgt.to_string();
820 conn.query_row(
821 "SELECT id FROM graph_edges \
822 WHERE namespace = ?1 AND source_id = ?2 AND target_id = ?3 \
823 AND relation = ?4 AND id != ?5",
824 rusqlite::params![
825 &namespace,
826 &conflict_src,
827 &conflict_tgt,
828 &edge.relation,
829 edge.id.to_string(),
830 ],
831 |row| row.get(0),
832 )
833 .optional()
834 .map_err(SqliteError::Rusqlite)?
835 };
836
837 let changed = if let Some(existing_id) = conflict_id {
838 conn.execute(
841 "DELETE FROM graph_edges WHERE namespace = ?1 AND id = ?2",
842 rusqlite::params![&namespace, edge.id.to_string()],
843 )?;
844 conn.execute(
845 "UPDATE graph_edges SET \
846 weight = ?1, updated_at = ?2, deleted_at = NULL, \
847 target_backend = ?3, metadata = ?4 \
848 WHERE namespace = ?5 AND id = ?6",
849 rusqlite::params![
850 edge.weight,
851 now_ts,
852 edge.target_backend,
853 edge.metadata,
854 &namespace,
855 &existing_id,
856 ],
857 )?
858 } else {
859 conn.execute(
862 "UPDATE graph_edges SET \
863 source_id = ?1, target_id = ?2, updated_at = ?3 \
864 WHERE namespace = ?4 AND id = ?5",
865 rusqlite::params![
866 new_src.to_string(),
867 new_tgt.to_string(),
868 now_ts,
869 &namespace,
870 edge.id.to_string(),
871 ],
872 )?
873 };
874 if changed > 0 {
875 edges_rewired += 1;
876 }
877 }
878
879 conn.execute(
881 "INSERT OR REPLACE INTO entities \
882 (id, namespace, kind, name, description, properties, tags, \
883 created_at, updated_at, deleted_at, merged_into, merge_event_id) \
884 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
885 rusqlite::params![
886 &into_str,
887 &namespace,
888 &into_entity.kind,
889 &merged_name,
890 &merged_description,
891 &props_str,
892 &tags_json,
893 into_entity.created_at,
894 now,
895 into_entity.deleted_at,
896 Option::<String>::None,
897 Option::<String>::None,
898 ],
899 )?;
900
901 let fts_body = match &merged_description {
903 Some(d) if !d.is_empty() => format!("{} {}", merged_name, d),
904 _ => merged_name.clone(),
905 };
906 let kind_str = SubstrateKind::Entity.to_string();
907
908 conn.execute(
909 &format!(
910 "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
911 fts_table
912 ),
913 rusqlite::params![&namespace, &into_str],
914 )?;
915 conn.execute(
916 &format!(
917 "INSERT INTO {} \
918 (subject_id, kind, title, body, tags, namespace, metadata, updated_at) \
919 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
920 fts_table
921 ),
922 rusqlite::params![
923 &into_str,
924 &kind_str,
925 &merged_name,
926 &fts_body,
927 &tags_json,
928 &namespace,
929 &props_str,
930 now,
931 ],
932 )?;
933
934 conn.execute(
936 &format!(
937 "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
938 fts_table
939 ),
940 rusqlite::params![&namespace, &from_str],
941 )?;
942
943 if let Some(ref vec_tbl) = vec_table {
945 conn.execute(
946 &format!(
947 "DELETE FROM {} WHERE subject_id = ?1 AND namespace = ?2",
948 vec_tbl
949 ),
950 rusqlite::params![&from_str, &namespace],
951 )?;
952 }
953
954 let merge_event_id = Uuid::new_v4();
956 conn.execute(
957 "UPDATE entities \
958 SET deleted_at = ?1, merged_into = ?2, merge_event_id = ?3, updated_at = ?1 \
959 WHERE namespace = ?4 AND id = ?5 AND deleted_at IS NULL",
960 rusqlite::params![
961 now,
962 into_str,
963 merge_event_id.to_string(),
964 &namespace,
965 &from_str,
966 ],
967 )?;
968 }
969
970 let updated_entity = Entity {
971 id: into_id,
972 namespace,
973 kind: into_entity.kind,
974 entity_type: into_entity.entity_type,
975 name: merged_name,
976 description: merged_description,
977 properties: merged_props,
978 tags: merged_tags,
979 created_at: into_entity.created_at,
980 updated_at: now,
981 deleted_at: into_entity.deleted_at,
982 merged_into: None,
983 merge_event_id: None,
984 };
985
986 Ok((
987 MergeSummary {
988 kept_id: into_id,
989 removed_id: from_id,
990 edges_rewired,
991 properties_merged,
992 tags_unioned,
993 content_appended: false,
994 dry_run,
995 },
996 updated_entity,
997 ))
998}
999
1000fn read_merge_note(
1006 conn: &rusqlite::Connection,
1007 id: Uuid,
1008 namespace: &str,
1009) -> Result<khive_storage::note::Note, SqliteError> {
1010 use khive_storage::note::Note;
1011 let id_str = id.to_string();
1012 let mut stmt = conn.prepare(
1013 "SELECT id, namespace, kind, status, name, content, salience, decay_factor, \
1014 expires_at, properties, created_at, updated_at, deleted_at \
1015 FROM notes WHERE id = ?1 AND deleted_at IS NULL",
1016 )?;
1017 let mut rows = stmt.query(rusqlite::params![id_str])?;
1018 let row = rows
1019 .next()?
1020 .ok_or_else(|| SqliteError::InvalidData(format!("note {id} not found")))?;
1021
1022 let id_s: String = row.get(0)?;
1023 let ns: String = row.get(1)?;
1024 let kind: String = row.get(2)?;
1025 let status: String = row.get(3)?;
1026 let name: Option<String> = row.get(4)?;
1027 let content: String = row.get(5)?;
1028 let salience: Option<f64> = row.get(6)?;
1029 let decay_factor: Option<f64> = row.get(7)?;
1030 let expires_at: Option<i64> = row.get(8)?;
1031 let properties_str: Option<String> = row.get(9)?;
1032 let created_at: i64 = row.get(10)?;
1033 let updated_at: i64 = row.get(11)?;
1034 let deleted_at: Option<i64> = row.get(12)?;
1035
1036 if ns != namespace {
1037 return Err(SqliteError::InvalidData(format!(
1038 "note {id} belongs to namespace '{ns}', not '{namespace}'"
1039 )));
1040 }
1041
1042 let note_id = Uuid::parse_str(&id_s).map_err(|e| SqliteError::InvalidData(e.to_string()))?;
1043 let properties: Option<serde_json::Value> = properties_str
1044 .map(|s| serde_json::from_str(&s).map_err(|e| SqliteError::InvalidData(e.to_string())))
1045 .transpose()?;
1046
1047 Ok(Note {
1048 id: note_id,
1049 namespace: ns,
1050 kind,
1051 status,
1052 name,
1053 content,
1054 salience,
1055 decay_factor,
1056 expires_at,
1057 properties,
1058 created_at,
1059 updated_at,
1060 deleted_at,
1061 })
1062}
1063
1064fn max_option_f64(a: Option<f64>, b: Option<f64>) -> Option<f64> {
1065 match (a, b) {
1066 (Some(x), Some(y)) => Some(x.max(y)),
1067 (Some(x), None) => Some(x),
1068 (None, Some(y)) => Some(y),
1069 (None, None) => None,
1070 }
1071}
1072
1073fn append_merge_history(props: Option<Value>, entry: Value) -> Result<Option<Value>, SqliteError> {
1074 use serde_json::{json, Map};
1075 let mut obj: Map<String, Value> = match props {
1076 Some(Value::Object(m)) => m,
1077 Some(other) => {
1078 let mut m = Map::new();
1079 m.insert("_value".into(), other);
1080 m
1081 }
1082 None => Map::new(),
1083 };
1084 let history = obj
1085 .entry("_merge_history".to_string())
1086 .or_insert_with(|| json!([]));
1087 if let Value::Array(arr) = history {
1088 arr.push(entry);
1089 }
1090 Ok(Some(Value::Object(obj)))
1091}
1092
1093#[allow(clippy::too_many_arguments)]
1101fn merge_note_sql(
1102 conn: &rusqlite::Connection,
1103 namespace: String,
1104 fts_table: String,
1105 vec_table: Option<String>,
1106 into_id: Uuid,
1107 from_id: Uuid,
1108 strategy: EntityDedupMergePolicy,
1109 content_strategy: ContentMergeStrategy,
1110 dry_run: bool,
1111) -> Result<(MergeSummary, khive_storage::note::Note), SqliteError> {
1112 let into_note = read_merge_note(conn, into_id, &namespace)?;
1113 let from_note = read_merge_note(conn, from_id, &namespace)?;
1114
1115 if into_note.kind != from_note.kind {
1116 return Err(SqliteError::InvalidData(format!(
1117 "cannot merge notes of different kinds: {} vs {}",
1118 into_note.kind, from_note.kind
1119 )));
1120 }
1121
1122 let now = chrono::Utc::now().timestamp_micros();
1123 let into_str = into_id.to_string();
1124 let from_str = from_id.to_string();
1125
1126 #[allow(dead_code)]
1128 struct EdgeRow {
1129 id: Uuid,
1130 source_id: Uuid,
1131 target_id: Uuid,
1132 relation: String,
1133 weight: f64,
1134 created_at: i64,
1135 updated_at: i64,
1136 deleted_at: Option<i64>,
1137 target_backend: Option<String>,
1138 metadata: Option<String>,
1139 }
1140 let parse_id =
1141 |s: String| Uuid::parse_str(&s).map_err(|e| SqliteError::InvalidData(e.to_string()));
1142
1143 let mut outbound: Vec<EdgeRow> = Vec::new();
1144 {
1145 let mut stmt = conn.prepare(
1146 "SELECT id, source_id, target_id, relation, weight, created_at, updated_at, deleted_at, target_backend, metadata \
1147 FROM graph_edges WHERE namespace = ?1 AND source_id = ?2",
1148 )?;
1149 let mut rows = stmt.query(rusqlite::params![&namespace, &from_str])?;
1150 while let Some(row) = rows.next()? {
1151 outbound.push(EdgeRow {
1152 id: parse_id(row.get(0)?)?,
1153 source_id: parse_id(row.get(1)?)?,
1154 target_id: parse_id(row.get(2)?)?,
1155 relation: row.get(3)?,
1156 weight: row.get(4)?,
1157 created_at: row.get(5)?,
1158 updated_at: row.get(6)?,
1159 deleted_at: row.get(7)?,
1160 target_backend: row.get(8)?,
1161 metadata: row.get(9)?,
1162 });
1163 }
1164 }
1165 let mut inbound: Vec<EdgeRow> = Vec::new();
1166 {
1167 let mut stmt = conn.prepare(
1168 "SELECT id, source_id, target_id, relation, weight, created_at, updated_at, deleted_at, target_backend, metadata \
1169 FROM graph_edges WHERE namespace = ?1 AND target_id = ?2",
1170 )?;
1171 let mut rows = stmt.query(rusqlite::params![&namespace, &from_str])?;
1172 while let Some(row) = rows.next()? {
1173 inbound.push(EdgeRow {
1174 id: parse_id(row.get(0)?)?,
1175 source_id: parse_id(row.get(1)?)?,
1176 target_id: parse_id(row.get(2)?)?,
1177 relation: row.get(3)?,
1178 weight: row.get(4)?,
1179 created_at: row.get(5)?,
1180 updated_at: row.get(6)?,
1181 deleted_at: row.get(7)?,
1182 target_backend: row.get(8)?,
1183 metadata: row.get(9)?,
1184 });
1185 }
1186 }
1187 let mut seen: HashSet<Uuid> = HashSet::new();
1188 let mut all_edges: Vec<EdgeRow> = Vec::new();
1189 for edge in outbound.into_iter().chain(inbound) {
1190 if seen.insert(edge.id) {
1191 all_edges.push(edge);
1192 }
1193 }
1194
1195 let (merged_content, content_appended) = match content_strategy {
1197 ContentMergeStrategy::Append => {
1198 if from_note.content.is_empty() {
1199 (into_note.content.clone(), false)
1200 } else {
1201 (
1202 format!("{}\n\n---\n\n{}", into_note.content, from_note.content),
1203 true,
1204 )
1205 }
1206 }
1207 ContentMergeStrategy::PreferInto => (into_note.content.clone(), false),
1208 ContentMergeStrategy::PreferFrom => (from_note.content.clone(), false),
1209 };
1210
1211 let merged_name = match strategy {
1212 EntityDedupMergePolicy::PreferFrom => from_note.name.clone().or(into_note.name.clone()),
1213 _ => into_note.name.clone().or(from_note.name.clone()),
1214 };
1215
1216 let (merged_props, properties_merged) =
1217 merge_properties(&into_note.properties, &from_note.properties, strategy);
1218
1219 let merge_history_entry = serde_json::json!({
1221 "merged_from": from_id.to_string(),
1222 "merged_at": now,
1223 "strategy": format!("{:?}", strategy),
1224 "content_strategy": format!("{:?}", content_strategy),
1225 });
1226 let merged_props = append_merge_history(merged_props, merge_history_entry)?;
1227
1228 let merged_salience = max_option_f64(into_note.salience, from_note.salience);
1229 let merged_expires_at = match (into_note.expires_at, from_note.expires_at) {
1230 (Some(a), Some(b)) => Some(a.max(b)),
1231 (Some(a), None) => Some(a),
1232 (None, Some(b)) => Some(b),
1233 (None, None) => None,
1234 };
1235
1236 let props_str = merged_props
1237 .as_ref()
1238 .map(|v| serde_json::to_string(v).unwrap_or_default());
1239
1240 let mut edges_rewired = 0usize;
1241 if !dry_run {
1242 for edge in all_edges {
1244 let raw_src = if edge.source_id == from_id {
1245 into_id
1246 } else {
1247 edge.source_id
1248 };
1249 let raw_tgt = if edge.target_id == from_id {
1250 into_id
1251 } else {
1252 edge.target_id
1253 };
1254 let (new_src, new_tgt) = match edge.relation.parse::<EdgeRelation>() {
1256 Ok(rel) => canonical_edge_endpoints(rel, raw_src, raw_tgt),
1257 Err(_) => (raw_src, raw_tgt),
1258 };
1259 if new_src == new_tgt {
1260 conn.execute(
1261 "DELETE FROM graph_edges WHERE namespace = ?1 AND id = ?2",
1262 rusqlite::params![&namespace, edge.id.to_string()],
1263 )?;
1264 continue;
1265 }
1266 let now_ts = chrono::Utc::now().timestamp();
1267 let conflict_id: Option<String> = {
1270 let conflict_src = new_src.to_string();
1271 let conflict_tgt = new_tgt.to_string();
1272 conn.query_row(
1273 "SELECT id FROM graph_edges \
1274 WHERE namespace = ?1 AND source_id = ?2 AND target_id = ?3 \
1275 AND relation = ?4 AND id != ?5",
1276 rusqlite::params![
1277 &namespace,
1278 &conflict_src,
1279 &conflict_tgt,
1280 &edge.relation,
1281 edge.id.to_string(),
1282 ],
1283 |row| row.get(0),
1284 )
1285 .optional()
1286 .map_err(SqliteError::Rusqlite)?
1287 };
1288
1289 let changed = if let Some(existing_id) = conflict_id {
1290 conn.execute(
1291 "DELETE FROM graph_edges WHERE namespace = ?1 AND id = ?2",
1292 rusqlite::params![&namespace, edge.id.to_string()],
1293 )?;
1294 conn.execute(
1295 "UPDATE graph_edges SET \
1296 weight = ?1, updated_at = ?2, deleted_at = NULL, \
1297 target_backend = ?3, metadata = ?4 \
1298 WHERE namespace = ?5 AND id = ?6",
1299 rusqlite::params![
1300 edge.weight,
1301 now_ts,
1302 edge.target_backend,
1303 edge.metadata,
1304 &namespace,
1305 &existing_id,
1306 ],
1307 )?
1308 } else {
1309 conn.execute(
1310 "UPDATE graph_edges SET \
1311 source_id = ?1, target_id = ?2, updated_at = ?3 \
1312 WHERE namespace = ?4 AND id = ?5",
1313 rusqlite::params![
1314 new_src.to_string(),
1315 new_tgt.to_string(),
1316 now_ts,
1317 &namespace,
1318 edge.id.to_string(),
1319 ],
1320 )?
1321 };
1322 if changed > 0 {
1323 edges_rewired += 1;
1324 }
1325 }
1326
1327 conn.execute(
1329 "INSERT OR REPLACE INTO notes \
1330 (id, namespace, kind, status, name, content, salience, decay_factor, \
1331 expires_at, properties, created_at, updated_at, deleted_at) \
1332 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
1333 rusqlite::params![
1334 &into_str,
1335 &namespace,
1336 &into_note.kind,
1337 &into_note.status,
1338 &merged_name,
1339 &merged_content,
1340 merged_salience,
1341 into_note.decay_factor,
1342 merged_expires_at,
1343 &props_str,
1344 into_note.created_at,
1345 now,
1346 into_note.deleted_at,
1347 ],
1348 )?;
1349
1350 conn.execute(
1352 &format!(
1353 "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
1354 fts_table
1355 ),
1356 rusqlite::params![&namespace, &into_str],
1357 )?;
1358 conn.execute(
1359 &format!(
1360 "INSERT INTO {} \
1361 (subject_id, kind, title, body, tags, namespace, metadata, updated_at) \
1362 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1363 fts_table
1364 ),
1365 rusqlite::params![
1366 &into_str,
1367 SubstrateKind::Note.to_string(),
1368 &merged_name,
1369 &merged_content,
1370 "[]",
1371 &namespace,
1372 &props_str,
1373 now,
1374 ],
1375 )?;
1376
1377 conn.execute(
1379 &format!(
1380 "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
1381 fts_table
1382 ),
1383 rusqlite::params![&namespace, &from_str],
1384 )?;
1385
1386 if let Some(ref vec_tbl) = vec_table {
1388 conn.execute(
1389 &format!(
1390 "DELETE FROM {} WHERE subject_id = ?1 AND namespace = ?2",
1391 vec_tbl
1392 ),
1393 rusqlite::params![&from_str, &namespace],
1394 )?;
1395 }
1396
1397 conn.execute(
1399 "UPDATE notes SET status = 'deleted', deleted_at = ?1, updated_at = ?1 \
1400 WHERE namespace = ?2 AND id = ?3 AND deleted_at IS NULL",
1401 rusqlite::params![now, &namespace, &from_str],
1402 )?;
1403 }
1404
1405 let updated_note = khive_storage::note::Note {
1406 id: into_id,
1407 namespace: namespace.clone(),
1408 kind: into_note.kind.clone(),
1409 status: into_note.status.clone(),
1410 name: merged_name,
1411 content: merged_content,
1412 salience: merged_salience,
1413 decay_factor: into_note.decay_factor,
1414 expires_at: merged_expires_at,
1415 properties: merged_props,
1416 created_at: into_note.created_at,
1417 updated_at: now,
1418 deleted_at: into_note.deleted_at,
1419 };
1420
1421 Ok((
1422 MergeSummary {
1423 kept_id: into_id,
1424 removed_id: from_id,
1425 edges_rewired,
1426 properties_merged,
1427 tags_unioned: 0,
1428 content_appended,
1429 dry_run,
1430 },
1431 updated_note,
1432 ))
1433}
1434
1435fn merge_string_field(into: &str, from: &str, strategy: EntityDedupMergePolicy) -> String {
1440 match strategy {
1441 EntityDedupMergePolicy::PreferInto | EntityDedupMergePolicy::Union => into.to_string(),
1442 EntityDedupMergePolicy::PreferFrom => from.to_string(),
1443 }
1444}
1445
1446fn merge_option_string_field(
1447 into: &Option<String>,
1448 from: &Option<String>,
1449 strategy: EntityDedupMergePolicy,
1450) -> Option<String> {
1451 match strategy {
1452 EntityDedupMergePolicy::PreferInto => {
1453 if into.is_some() {
1454 into.clone()
1455 } else {
1456 from.clone()
1457 }
1458 }
1459 EntityDedupMergePolicy::PreferFrom => {
1460 if from.is_some() {
1461 from.clone()
1462 } else {
1463 into.clone()
1464 }
1465 }
1466 EntityDedupMergePolicy::Union => {
1467 match (into, from) {
1469 (Some(a), _) if !a.is_empty() => Some(a.clone()),
1470 (_, Some(b)) => Some(b.clone()),
1471 _ => None,
1472 }
1473 }
1474 }
1475}
1476
1477fn merge_properties(
1479 into: &Option<Value>,
1480 from: &Option<Value>,
1481 strategy: EntityDedupMergePolicy,
1482) -> (Option<Value>, usize) {
1483 match (into, from) {
1484 (None, None) => (None, 0),
1485 (Some(a), None) => (Some(a.clone()), 0),
1486 (None, Some(b)) => {
1487 let count = if let Value::Object(m) = b { m.len() } else { 1 };
1488 (Some(b.clone()), count)
1489 }
1490 (Some(into_val), Some(from_val)) => {
1491 let (merged, added) = merge_json(into_val, from_val, strategy);
1492 (Some(merged), added)
1493 }
1494 }
1495}
1496
1497fn merge_json(into: &Value, from: &Value, strategy: EntityDedupMergePolicy) -> (Value, usize) {
1499 match (into, from, strategy) {
1500 (Value::Object(a), Value::Object(b), EntityDedupMergePolicy::Union) => {
1501 let mut result = a.clone();
1502 let mut added = 0usize;
1503 for (k, v_from) in b {
1504 if let Some(v_into) = a.get(k) {
1505 let (merged, sub_added) =
1506 merge_json(v_into, v_from, EntityDedupMergePolicy::Union);
1507 result.insert(k.clone(), merged);
1508 added += sub_added;
1509 } else {
1510 result.insert(k.clone(), v_from.clone());
1511 added += 1;
1512 }
1513 }
1514 (Value::Object(result), added)
1515 }
1516 (Value::Object(a), Value::Object(b), EntityDedupMergePolicy::PreferInto) => {
1517 let mut result = a.clone();
1518 let mut added = 0usize;
1519 for (k, v) in b {
1520 if !a.contains_key(k) {
1521 result.insert(k.clone(), v.clone());
1522 added += 1;
1523 }
1524 }
1525 (Value::Object(result), added)
1526 }
1527 (Value::Object(a), Value::Object(b), EntityDedupMergePolicy::PreferFrom) => {
1528 let mut result = a.clone();
1529 let mut added = 0usize;
1530 for (k, v) in b {
1531 result.insert(k.clone(), v.clone());
1532 if !a.contains_key(k) {
1533 added += 1;
1534 }
1535 }
1536 (Value::Object(result), added)
1537 }
1538 (_into_val, from_val, EntityDedupMergePolicy::PreferFrom) => (from_val.clone(), 1),
1540 _ => (into.clone(), 0),
1541 }
1542}
1543
1544fn union_tags(into: &[String], from: &[String]) -> (Vec<String>, usize) {
1545 let mut seen: HashSet<&str> = into.iter().map(|s| s.as_str()).collect();
1546 let mut result: Vec<String> = into.to_vec();
1547 let mut added = 0usize;
1548 for tag in from {
1549 if seen.insert(tag.as_str()) {
1550 result.push(tag.clone());
1551 added += 1;
1552 }
1553 }
1554 (result, added)
1555}
1556
1557#[cfg(test)]
1562mod tests {
1563 use super::*;
1564 use crate::runtime::{KhiveRuntime, NamespaceToken};
1565 use khive_storage::types::{Direction, TextFilter, TextQueryMode, TextSearchRequest};
1566
1567 fn rt() -> KhiveRuntime {
1568 KhiveRuntime::memory().unwrap()
1569 }
1570
1571 async fn fts_hit(rt: &KhiveRuntime, token: &NamespaceToken, query: &str) -> Vec<Uuid> {
1573 let ns = token.namespace().as_str().to_string();
1574 rt.text(token)
1575 .unwrap()
1576 .search(TextSearchRequest {
1577 query: query.to_string(),
1578 mode: TextQueryMode::Plain,
1579 filter: Some(TextFilter {
1580 namespaces: vec![ns],
1581 ..Default::default()
1582 }),
1583 top_k: 50,
1584 snippet_chars: 100,
1585 })
1586 .await
1587 .unwrap()
1588 .into_iter()
1589 .map(|h| h.subject_id)
1590 .collect()
1591 }
1592
1593 #[tokio::test]
1594 async fn update_entity_patch_changes_only_specified_fields() {
1595 let rt = rt();
1596 let tok = NamespaceToken::local();
1597 let entity = rt
1598 .create_entity(
1599 &tok,
1600 "concept",
1601 None,
1602 "OriginalName",
1603 Some("orig desc"),
1604 Some(serde_json::json!({"k":"v"})),
1605 vec![],
1606 )
1607 .await
1608 .unwrap();
1609
1610 let updated = rt
1611 .update_entity(
1612 &tok,
1613 entity.id,
1614 EntityPatch {
1615 description: Some(Some("new desc".to_string())),
1616 ..Default::default()
1617 },
1618 )
1619 .await
1620 .unwrap();
1621
1622 assert_eq!(updated.name, "OriginalName");
1623 assert_eq!(updated.description.as_deref(), Some("new desc"));
1624 assert_eq!(updated.properties, Some(serde_json::json!({"k":"v"})));
1625 }
1626
1627 #[tokio::test]
1628 async fn update_entity_clear_description_with_some_none() {
1629 let rt = rt();
1630 let tok = NamespaceToken::local();
1631 let entity = rt
1632 .create_entity(
1633 &tok,
1634 "concept",
1635 None,
1636 "ClearDesc",
1637 Some("has description"),
1638 None,
1639 vec![],
1640 )
1641 .await
1642 .unwrap();
1643
1644 let updated = rt
1645 .update_entity(
1646 &tok,
1647 entity.id,
1648 EntityPatch {
1649 description: Some(None),
1650 ..Default::default()
1651 },
1652 )
1653 .await
1654 .unwrap();
1655
1656 assert!(
1657 updated.description.is_none(),
1658 "description should be cleared"
1659 );
1660 }
1661
1662 #[tokio::test]
1663 async fn update_entity_reindexes_when_name_changes() {
1664 let rt = rt();
1665 let tok = NamespaceToken::local();
1666 let entity = rt
1667 .create_entity(&tok, "concept", None, "OldName", None, None, vec![])
1668 .await
1669 .unwrap();
1670
1671 let hits_before = fts_hit(&rt, &tok, "OldName").await;
1673 assert!(
1674 hits_before.contains(&entity.id),
1675 "entity should be findable by old name"
1676 );
1677
1678 rt.update_entity(
1679 &tok,
1680 entity.id,
1681 EntityPatch {
1682 name: Some("NewName".to_string()),
1683 ..Default::default()
1684 },
1685 )
1686 .await
1687 .unwrap();
1688
1689 let hits_old = fts_hit(&rt, &tok, "OldName").await;
1690 let hits_new = fts_hit(&rt, &tok, "NewName").await;
1691
1692 assert!(
1694 !hits_old.contains(&entity.id),
1695 "old name should no longer match after rename"
1696 );
1697 assert!(
1698 hits_new.contains(&entity.id),
1699 "new name should be findable after rename"
1700 );
1701 }
1702
1703 #[tokio::test]
1704 async fn update_entity_properties_merges_preserving_existing_keys() {
1705 let rt = rt();
1706 let tok = NamespaceToken::local();
1707 let entity = rt
1708 .create_entity(
1709 &tok,
1710 "concept",
1711 None,
1712 "MergeProps",
1713 None,
1714 Some(serde_json::json!({
1715 "domain": "inference",
1716 "repo": "lattice",
1717 "status": "researched",
1718 })),
1719 vec![],
1720 )
1721 .await
1722 .unwrap();
1723
1724 let updated = rt
1725 .update_entity(
1726 &tok,
1727 entity.id,
1728 EntityPatch {
1729 properties: Some(serde_json::json!({"status": "implemented"})),
1730 ..Default::default()
1731 },
1732 )
1733 .await
1734 .unwrap();
1735
1736 let props = updated.properties.expect("properties should remain set");
1737 assert_eq!(props["domain"], "inference", "domain key must be preserved");
1738 assert_eq!(props["repo"], "lattice", "repo key must be preserved");
1739 assert_eq!(
1740 props["status"], "implemented",
1741 "status key must be updated by patch"
1742 );
1743 }
1744
1745 #[tokio::test]
1746 async fn update_entity_skips_reindex_when_only_properties_change() {
1747 let rt = rt();
1748 let tok = NamespaceToken::local();
1749 let entity = rt
1750 .create_entity(&tok, "concept", None, "StableIndexed", None, None, vec![])
1751 .await
1752 .unwrap();
1753
1754 let hits_before = fts_hit(&rt, &tok, "StableIndexed").await;
1756 assert!(hits_before.contains(&entity.id));
1757
1758 rt.update_entity(
1760 &tok,
1761 entity.id,
1762 EntityPatch {
1763 properties: Some(serde_json::json!({"new": "prop"})),
1764 ..Default::default()
1765 },
1766 )
1767 .await
1768 .unwrap();
1769
1770 let hits_after = fts_hit(&rt, &tok, "StableIndexed").await;
1771 assert!(
1772 hits_after.contains(&entity.id),
1773 "still findable after props-only patch"
1774 );
1775 }
1776
1777 #[tokio::test]
1778 async fn merge_entity_rewires_edges() {
1779 let rt = rt();
1780 let tok = NamespaceToken::local();
1781 let a = rt
1782 .create_entity(&tok, "concept", None, "A", None, None, vec![])
1783 .await
1784 .unwrap();
1785 let b = rt
1786 .create_entity(&tok, "concept", None, "B", None, None, vec![])
1787 .await
1788 .unwrap();
1789 let c = rt
1790 .create_entity(&tok, "concept", None, "C", None, None, vec![])
1791 .await
1792 .unwrap();
1793 let d = rt
1794 .create_entity(&tok, "concept", None, "D", None, None, vec![])
1795 .await
1796 .unwrap();
1797
1798 rt.link(&tok, a.id, b.id, EdgeRelation::Extends, 1.0, None)
1800 .await
1801 .unwrap();
1802 rt.link(&tok, c.id, b.id, EdgeRelation::Extends, 1.0, None)
1803 .await
1804 .unwrap();
1805
1806 let summary = rt
1807 .merge_entity(&tok, d.id, b.id, EntityDedupMergePolicy::PreferInto, false)
1808 .await
1809 .unwrap();
1810
1811 assert_eq!(summary.kept_id, d.id);
1812 assert_eq!(summary.removed_id, b.id);
1813 assert_eq!(summary.edges_rewired, 2);
1814
1815 let a_neighbors = rt
1817 .neighbors(&tok, a.id, Direction::Out, None, None)
1818 .await
1819 .unwrap();
1820 assert_eq!(a_neighbors.len(), 1);
1821 assert_eq!(a_neighbors[0].node_id, d.id);
1822
1823 let c_neighbors = rt
1824 .neighbors(&tok, c.id, Direction::Out, None, None)
1825 .await
1826 .unwrap();
1827 assert_eq!(c_neighbors.len(), 1);
1828 assert_eq!(c_neighbors[0].node_id, d.id);
1829 }
1830
1831 #[tokio::test]
1832 async fn merge_entity_self_merge_rejected() {
1833 let rt = rt();
1834 let tok = NamespaceToken::local();
1835 let a = rt
1836 .create_entity(&tok, "concept", None, "A", None, None, vec![])
1837 .await
1838 .unwrap();
1839 let err = rt
1840 .merge_entity(&tok, a.id, a.id, EntityDedupMergePolicy::PreferInto, false)
1841 .await
1842 .unwrap_err();
1843 assert!(
1844 format!("{err:?}").contains("cannot merge an entity into itself"),
1845 "expected self-merge rejection, got: {err:?}"
1846 );
1847 }
1848
1849 #[tokio::test]
1850 async fn merge_entity_prefer_into_strategy() {
1851 let rt = rt();
1852 let tok = NamespaceToken::local();
1853 let into = rt
1854 .create_entity(
1855 &tok,
1856 "concept",
1857 None,
1858 "Into",
1859 None,
1860 Some(serde_json::json!({"a": 1})),
1861 vec![],
1862 )
1863 .await
1864 .unwrap();
1865 let from = rt
1866 .create_entity(
1867 &tok,
1868 "concept",
1869 None,
1870 "From",
1871 None,
1872 Some(serde_json::json!({"a": 2, "b": 3})),
1873 vec![],
1874 )
1875 .await
1876 .unwrap();
1877
1878 rt.merge_entity(
1879 &tok,
1880 into.id,
1881 from.id,
1882 EntityDedupMergePolicy::PreferInto,
1883 false,
1884 )
1885 .await
1886 .unwrap();
1887
1888 let kept = rt.get_entity(&tok, into.id).await.unwrap();
1889 let props = kept.properties.unwrap();
1890 assert_eq!(props["a"], 1);
1892 assert_eq!(props["b"], 3);
1893 }
1894
1895 #[tokio::test]
1896 async fn merge_entity_prefer_from_strategy() {
1897 let rt = rt();
1898 let tok = NamespaceToken::local();
1899 let into = rt
1900 .create_entity(
1901 &tok,
1902 "concept",
1903 None,
1904 "Into",
1905 None,
1906 Some(serde_json::json!({"a": 1})),
1907 vec![],
1908 )
1909 .await
1910 .unwrap();
1911 let from = rt
1912 .create_entity(
1913 &tok,
1914 "concept",
1915 None,
1916 "From",
1917 None,
1918 Some(serde_json::json!({"a": 2, "b": 3})),
1919 vec![],
1920 )
1921 .await
1922 .unwrap();
1923
1924 rt.merge_entity(
1925 &tok,
1926 into.id,
1927 from.id,
1928 EntityDedupMergePolicy::PreferFrom,
1929 false,
1930 )
1931 .await
1932 .unwrap();
1933
1934 let kept = rt.get_entity(&tok, into.id).await.unwrap();
1935 let props = kept.properties.unwrap();
1936 assert_eq!(props["a"], 2);
1938 assert_eq!(props["b"], 3);
1939 }
1940
1941 #[tokio::test]
1942 async fn merge_entity_union_strategy() {
1943 let rt = rt();
1944 let tok = NamespaceToken::local();
1945 let into = rt
1946 .create_entity(
1947 &tok,
1948 "concept",
1949 None,
1950 "Into",
1951 None,
1952 Some(serde_json::json!({"a": 1})),
1953 vec![],
1954 )
1955 .await
1956 .unwrap();
1957 let from = rt
1958 .create_entity(
1959 &tok,
1960 "concept",
1961 None,
1962 "From",
1963 None,
1964 Some(serde_json::json!({"a": 2, "b": 3})),
1965 vec![],
1966 )
1967 .await
1968 .unwrap();
1969
1970 rt.merge_entity(&tok, into.id, from.id, EntityDedupMergePolicy::Union, false)
1971 .await
1972 .unwrap();
1973
1974 let kept = rt.get_entity(&tok, into.id).await.unwrap();
1975 let props = kept.properties.unwrap();
1976 assert_eq!(props["a"], 1);
1978 assert_eq!(props["b"], 3);
1979 }
1980
1981 #[tokio::test]
1982 async fn merge_entity_unions_tags() {
1983 let rt = rt();
1984 let tok = NamespaceToken::local();
1985 let into = rt
1986 .create_entity(
1987 &tok,
1988 "concept",
1989 None,
1990 "Into",
1991 None,
1992 None,
1993 vec!["x".to_string(), "y".to_string()],
1994 )
1995 .await
1996 .unwrap();
1997 let from = rt
1998 .create_entity(
1999 &tok,
2000 "concept",
2001 None,
2002 "From",
2003 None,
2004 None,
2005 vec!["y".to_string(), "z".to_string()],
2006 )
2007 .await
2008 .unwrap();
2009
2010 rt.merge_entity(
2011 &tok,
2012 into.id,
2013 from.id,
2014 EntityDedupMergePolicy::PreferInto,
2015 false,
2016 )
2017 .await
2018 .unwrap();
2019
2020 let kept = rt.get_entity(&tok, into.id).await.unwrap();
2021 let mut tags = kept.tags.clone();
2022 tags.sort();
2023 assert_eq!(tags, vec!["x", "y", "z"]);
2024 }
2025
2026 #[tokio::test]
2027 async fn merge_entity_drops_self_loops() {
2028 let rt = rt();
2029 let tok = NamespaceToken::local();
2030 let a = rt
2031 .create_entity(&tok, "concept", None, "A", None, None, vec![])
2032 .await
2033 .unwrap();
2034 let b = rt
2035 .create_entity(&tok, "concept", None, "B", None, None, vec![])
2036 .await
2037 .unwrap();
2038
2039 rt.link(&tok, a.id, b.id, EdgeRelation::Extends, 1.0, None)
2041 .await
2042 .unwrap();
2043
2044 let summary = rt
2045 .merge_entity(&tok, a.id, b.id, EntityDedupMergePolicy::PreferInto, false)
2046 .await
2047 .unwrap();
2048
2049 assert_eq!(
2050 summary.edges_rewired, 0,
2051 "self-loop should be dropped, not rewired"
2052 );
2053
2054 let a_out = rt
2055 .neighbors(&tok, a.id, Direction::Out, None, None)
2056 .await
2057 .unwrap();
2058 assert!(a_out.is_empty(), "no self-loop should remain");
2059 }
2060
2061 #[test]
2064 fn union_tags_deduplicates() {
2065 let (tags, added) = union_tags(
2066 &["x".to_string(), "y".to_string()],
2067 &["y".to_string(), "z".to_string()],
2068 );
2069 let mut sorted = tags.clone();
2070 sorted.sort();
2071 assert_eq!(sorted, vec!["x", "y", "z"]);
2072 assert_eq!(added, 1);
2073 }
2074
2075 #[test]
2076 fn merge_properties_prefer_into_fills_missing_keys() {
2077 let a = serde_json::json!({"a": 1});
2078 let b = serde_json::json!({"a": 99, "b": 2});
2079 let (merged, added) =
2080 merge_properties(&Some(a), &Some(b), EntityDedupMergePolicy::PreferInto);
2081 let m = merged.unwrap();
2082 assert_eq!(m["a"], 1);
2083 assert_eq!(m["b"], 2);
2084 assert_eq!(added, 1);
2085 }
2086
2087 #[tokio::test]
2090 async fn merge_entity_tombstones_source_with_provenance() {
2091 let rt = rt();
2092 let tok = NamespaceToken::local();
2093 let into = rt
2094 .create_entity(&tok, "concept", None, "Into", None, None, vec![])
2095 .await
2096 .unwrap();
2097 let from = rt
2098 .create_entity(&tok, "concept", None, "From", None, None, vec![])
2099 .await
2100 .unwrap();
2101 let from_id = from.id;
2102
2103 rt.merge_entity(
2104 &tok,
2105 into.id,
2106 from_id,
2107 EntityDedupMergePolicy::PreferInto,
2108 false,
2109 )
2110 .await
2111 .unwrap();
2112
2113 assert!(
2115 rt.get_entity(&tok, from_id).await.is_err(),
2116 "tombstoned source should not be returned by get_entity"
2117 );
2118
2119 let pool = rt.backend().pool_arc();
2121 let (deleted_at, merged_into): (Option<i64>, Option<String>) =
2122 tokio::task::spawn_blocking(move || {
2123 let guard = pool.writer().unwrap();
2124 guard
2125 .conn()
2126 .query_row(
2127 "SELECT deleted_at, merged_into FROM entities WHERE id = ?1",
2128 [from_id.to_string()],
2129 |row| Ok((row.get(0)?, row.get(1)?)),
2130 )
2131 .unwrap()
2132 })
2133 .await
2134 .unwrap();
2135 assert!(
2136 deleted_at.is_some(),
2137 "tombstoned entity must have deleted_at set"
2138 );
2139 assert_eq!(
2140 merged_into.as_deref(),
2141 Some(into.id.to_string().as_str()),
2142 "merged_into must point to into_id"
2143 );
2144 }
2145
2146 #[tokio::test]
2147 async fn merge_note_same_kind_appends_content() {
2148 let rt = rt();
2149 let tok = NamespaceToken::local();
2150 let into = rt
2151 .create_note(
2152 &tok,
2153 "observation",
2154 None,
2155 "Into content",
2156 None,
2157 None,
2158 vec![],
2159 )
2160 .await
2161 .unwrap();
2162 let from = rt
2163 .create_note(
2164 &tok,
2165 "observation",
2166 None,
2167 "From content",
2168 None,
2169 None,
2170 vec![],
2171 )
2172 .await
2173 .unwrap();
2174 let from_id = from.id;
2175
2176 let summary = rt
2177 .merge_note(
2178 &tok,
2179 into.id,
2180 from_id,
2181 EntityDedupMergePolicy::PreferInto,
2182 ContentMergeStrategy::Append,
2183 false,
2184 )
2185 .await
2186 .unwrap();
2187
2188 assert_eq!(summary.kept_id, into.id);
2189 assert_eq!(summary.removed_id, from_id);
2190 assert!(summary.content_appended);
2191 assert!(!summary.dry_run);
2192
2193 let from_store = rt.notes(&tok).unwrap();
2195 assert!(
2196 from_store.get_note(from_id).await.unwrap().is_none(),
2197 "merged-from note should be soft-deleted"
2198 );
2199 }
2200
2201 #[tokio::test]
2202 async fn merge_note_different_kinds_rejected() {
2203 let rt = rt();
2204 let tok = NamespaceToken::local();
2205 let into = rt
2206 .create_note(&tok, "observation", None, "Into", None, None, vec![])
2207 .await
2208 .unwrap();
2209 let from = rt
2210 .create_note(&tok, "decision", None, "From", None, None, vec![])
2211 .await
2212 .unwrap();
2213
2214 let result = rt
2215 .merge_note(
2216 &tok,
2217 into.id,
2218 from.id,
2219 EntityDedupMergePolicy::PreferInto,
2220 ContentMergeStrategy::Append,
2221 false,
2222 )
2223 .await;
2224 assert!(result.is_err(), "merging different note kinds must fail");
2225 }
2226
2227 #[tokio::test]
2228 async fn merge_note_dry_run_leaves_notes_unchanged() {
2229 let rt = rt();
2230 let tok = NamespaceToken::local();
2231 let into = rt
2232 .create_note(
2233 &tok,
2234 "observation",
2235 None,
2236 "Into content",
2237 None,
2238 None,
2239 vec![],
2240 )
2241 .await
2242 .unwrap();
2243 let from = rt
2244 .create_note(
2245 &tok,
2246 "observation",
2247 None,
2248 "From content",
2249 None,
2250 None,
2251 vec![],
2252 )
2253 .await
2254 .unwrap();
2255 let into_id = into.id;
2256 let from_id = from.id;
2257
2258 let summary = rt
2259 .merge_note(
2260 &tok,
2261 into_id,
2262 from_id,
2263 EntityDedupMergePolicy::PreferInto,
2264 ContentMergeStrategy::Append,
2265 true,
2266 )
2267 .await
2268 .unwrap();
2269
2270 assert!(summary.dry_run);
2271
2272 let store = rt.notes(&tok).unwrap();
2274 let into_after = store.get_note(into_id).await.unwrap().unwrap();
2275 let from_after = store.get_note(from_id).await.unwrap().unwrap();
2276 assert_eq!(
2277 into_after.content, "Into content",
2278 "dry_run must not mutate into-note"
2279 );
2280 assert_eq!(
2281 from_after.content, "From content",
2282 "dry_run must not mutate from-note"
2283 );
2284 }
2285
2286 #[tokio::test]
2287 async fn update_edge_updates_properties() {
2288 use khive_storage::EdgeRelation;
2289 let rt = rt();
2290 let tok = NamespaceToken::local();
2291 let a = rt
2292 .create_entity(&tok, "concept", None, "A", None, None, vec![])
2293 .await
2294 .unwrap();
2295 let b = rt
2296 .create_entity(&tok, "concept", None, "B", None, None, vec![])
2297 .await
2298 .unwrap();
2299 let edge = rt
2300 .link(&tok, a.id, b.id, EdgeRelation::Extends, 0.5, None)
2301 .await
2302 .unwrap();
2303 let edge_id: Uuid = edge.id.into();
2304
2305 let updated = rt
2306 .update_edge(
2307 &tok,
2308 edge_id,
2309 EdgePatch {
2310 properties: Some(serde_json::json!({"source": "manual"})),
2311 ..Default::default()
2312 },
2313 )
2314 .await
2315 .unwrap();
2316
2317 assert_eq!(updated.metadata.as_ref().unwrap()["source"], "manual");
2318 assert!((updated.weight - 0.5).abs() < 0.001, "weight unchanged");
2319 }
2320
2321 #[tokio::test]
2326 async fn merge_entity_survives_shared_edge_to_third_party() {
2327 use khive_storage::EdgeRelation;
2328 let rt = rt();
2329 let tok = NamespaceToken::local();
2330
2331 let a = rt
2334 .create_entity(&tok, "concept", None, "A", None, None, vec![])
2335 .await
2336 .unwrap();
2337 let b = rt
2338 .create_entity(&tok, "concept", None, "B", None, None, vec![])
2339 .await
2340 .unwrap();
2341 let shared = rt
2342 .create_entity(&tok, "concept", None, "Shared", None, None, vec![])
2343 .await
2344 .unwrap();
2345
2346 rt.link(&tok, a.id, shared.id, EdgeRelation::Extends, 1.0, None)
2349 .await
2350 .unwrap();
2351 rt.link(&tok, b.id, shared.id, EdgeRelation::Extends, 1.0, None)
2352 .await
2353 .unwrap();
2354
2355 let summary = rt
2357 .merge_entity(
2358 &tok,
2359 a.id,
2360 b.id,
2361 crate::EntityDedupMergePolicy::PreferInto,
2362 false,
2363 )
2364 .await
2365 .expect(
2366 "C1: merge must succeed even when both entities share an edge to a third party",
2367 );
2368
2369 assert_eq!(summary.kept_id, a.id);
2370 assert_eq!(summary.removed_id, b.id);
2371 let a_edges = rt
2379 .list_edges(
2380 &tok,
2381 crate::EdgeListFilter {
2382 source_id: Some(a.id),
2383 target_id: Some(shared.id),
2384 relations: vec![EdgeRelation::Extends],
2385 ..Default::default()
2386 },
2387 10,
2388 )
2389 .await
2390 .unwrap();
2391 assert_eq!(
2392 a_edges.len(),
2393 1,
2394 "C1: exactly one live A→shared Extends edge must exist after merge; got: {a_edges:?}"
2395 );
2396
2397 let b_after = rt.entities(&tok).unwrap().get_entity(b.id).await.unwrap();
2400 assert!(
2401 b_after.is_none(),
2402 "C3: from_entity must be tombstoned (get_entity returns None for deleted) after merge; got: {b_after:?}"
2403 );
2404 }
2405
2406 #[tokio::test]
2410 async fn merge_entity_cross_kind_rejected_at_runtime() {
2411 let rt = rt();
2412 let tok = NamespaceToken::local();
2413
2414 let concept = rt
2415 .create_entity(&tok, "concept", None, "H2Concept", None, None, vec![])
2416 .await
2417 .unwrap();
2418 let project = rt
2419 .create_entity(&tok, "project", None, "H2Project", None, None, vec![])
2420 .await
2421 .unwrap();
2422
2423 let err = rt
2425 .merge_entity(
2426 &tok,
2427 concept.id,
2428 project.id,
2429 crate::EntityDedupMergePolicy::PreferInto,
2430 false,
2431 )
2432 .await
2433 .expect_err("H2: cross-kind merge must be rejected by runtime");
2434 assert!(
2435 matches!(err, crate::RuntimeError::InvalidInput(_)),
2436 "H2: expected InvalidInput, got: {err:?}"
2437 );
2438
2439 let concept_after = rt.get_entity(&tok, concept.id).await;
2441 let project_after = rt.get_entity(&tok, project.id).await;
2442 assert!(
2443 concept_after.is_ok(),
2444 "H2: concept must remain live after rejected merge; got: {concept_after:?}"
2445 );
2446 assert!(
2447 project_after.is_ok(),
2448 "H2: project must remain live after rejected merge; got: {project_after:?}"
2449 );
2450 }
2451
2452 #[tokio::test]
2454 async fn merge_entity_same_kind_succeeds() {
2455 let rt = rt();
2456 let tok = NamespaceToken::local();
2457
2458 let c1 = rt
2459 .create_entity(&tok, "concept", None, "Concept1", None, None, vec![])
2460 .await
2461 .unwrap();
2462 let c2 = rt
2463 .create_entity(&tok, "concept", None, "Concept2", None, None, vec![])
2464 .await
2465 .unwrap();
2466
2467 let summary = rt
2468 .merge_entity(
2469 &tok,
2470 c1.id,
2471 c2.id,
2472 crate::EntityDedupMergePolicy::PreferInto,
2473 false,
2474 )
2475 .await
2476 .expect("same-kind merge must succeed");
2477 assert_eq!(summary.kept_id, c1.id);
2478 assert_eq!(summary.removed_id, c2.id);
2479
2480 let c2_after = rt.entities(&tok).unwrap().get_entity(c2.id).await.unwrap();
2482 assert!(c2_after.is_none(), "from_entity must be tombstoned");
2483 }
2484
2485 #[tokio::test]
2488 async fn merge_note_cross_namespace_either_id_returns_not_found() {
2489 use crate::error::RuntimeError;
2490 use crate::Namespace;
2491
2492 let rt = rt();
2493 let ns_a = NamespaceToken::for_namespace(Namespace::parse("ns-a").unwrap());
2494 let ns_b = NamespaceToken::for_namespace(Namespace::parse("ns-b").unwrap());
2495
2496 let into_a = rt
2497 .create_note(&ns_a, "observation", None, "Into A", None, None, vec![])
2498 .await
2499 .unwrap();
2500 let from_a = rt
2501 .create_note(&ns_a, "observation", None, "From A", None, None, vec![])
2502 .await
2503 .unwrap();
2504 let note_b = rt
2505 .create_note(&ns_b, "observation", None, "Note B", None, None, vec![])
2506 .await
2507 .unwrap();
2508
2509 let foreign_into = rt
2511 .merge_note(
2512 &ns_a,
2513 note_b.id,
2514 from_a.id,
2515 EntityDedupMergePolicy::PreferInto,
2516 ContentMergeStrategy::Append,
2517 false,
2518 )
2519 .await;
2520 assert!(
2521 matches!(foreign_into, Err(RuntimeError::NotFound(_))),
2522 "foreign into_id must be denied before merge, got {foreign_into:?}"
2523 );
2524
2525 let foreign_from = rt
2527 .merge_note(
2528 &ns_a,
2529 into_a.id,
2530 note_b.id,
2531 EntityDedupMergePolicy::PreferInto,
2532 ContentMergeStrategy::Append,
2533 false,
2534 )
2535 .await;
2536 assert!(
2537 matches!(foreign_from, Err(RuntimeError::NotFound(_))),
2538 "foreign from_id must be denied before merge, got {foreign_from:?}"
2539 );
2540 }
2541}