1use std::collections::HashSet;
12
13use serde::{Deserialize, Serialize};
14use serde_json::Value;
15use uuid::Uuid;
16
17use khive_db::SqliteError;
18use khive_storage::types::{EdgeFilter, TextDocument};
19use khive_storage::{EdgeRelation, Entity, SubstrateKind};
20use khive_types::EventKind;
21use rusqlite::OptionalExtension;
22
23use crate::error::{RuntimeError, RuntimeResult};
24use crate::operations::canonical_edge_endpoints;
25use crate::runtime::{KhiveRuntime, NamespaceToken};
26
27#[derive(Clone, Debug, Default)]
49pub struct EntityPatch {
50 pub name: Option<String>,
51 pub description: Option<Option<String>>,
52 pub properties: Option<Value>,
53 pub tags: Option<Vec<String>>,
54}
55
56#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
58#[serde(rename_all = "snake_case")]
59pub enum EntityDedupMergePolicy {
60 #[default]
63 PreferInto,
64 PreferFrom,
66 Union,
68}
69
70#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
72#[serde(rename_all = "snake_case")]
73pub enum ContentMergeStrategy {
74 #[default]
75 Append,
76 PreferInto,
77 PreferFrom,
78}
79
80#[derive(Clone, Debug, Serialize, Deserialize)]
82pub struct MergeSummary {
83 pub kept_id: Uuid,
84 pub removed_id: Uuid,
85 pub edges_rewired: usize,
86 pub properties_merged: usize,
87 pub tags_unioned: usize,
88 pub content_appended: bool,
89 pub dry_run: bool,
90}
91
92#[derive(Clone, Debug, Default)]
97pub struct EdgePatch {
98 pub relation: Option<EdgeRelation>,
99 pub weight: Option<f64>,
100 pub properties: Option<Value>,
101}
102
103#[derive(Clone, Debug, Default)]
110pub struct NotePatch {
111 pub name: Option<Option<String>>,
112 pub content: Option<String>,
113 pub salience: Option<Option<f64>>,
114 pub decay_factor: Option<Option<f64>>,
115 pub properties: Option<Value>,
116 pub(crate) kind_status: Option<String>,
117}
118
119impl NotePatch {
120 pub fn new(
123 name: Option<Option<String>>,
124 content: Option<String>,
125 salience: Option<Option<f64>>,
126 decay_factor: Option<Option<f64>>,
127 properties: Option<Value>,
128 ) -> Self {
129 Self {
130 name,
131 content,
132 salience,
133 decay_factor,
134 properties,
135 kind_status: None,
136 }
137 }
138}
139
140#[derive(Clone, Debug, Default)]
142pub struct EdgeListFilter {
143 pub source_id: Option<Uuid>,
144 pub target_id: Option<Uuid>,
145 pub relations: Vec<EdgeRelation>,
147 pub min_weight: Option<f64>,
148 pub max_weight: Option<f64>,
149}
150
151impl From<EdgeListFilter> for EdgeFilter {
152 fn from(f: EdgeListFilter) -> Self {
153 EdgeFilter {
154 source_ids: f.source_id.into_iter().collect(),
155 target_ids: f.target_id.into_iter().collect(),
156 relations: f.relations,
157 min_weight: f.min_weight,
158 max_weight: f.max_weight,
159 ..Default::default()
160 }
161 }
162}
163
164impl KhiveRuntime {
169 pub async fn update_entity(
177 &self,
178 token: &NamespaceToken,
179 id: Uuid,
180 patch: EntityPatch,
181 ) -> RuntimeResult<Entity> {
182 let store = self.entities(token)?;
183 let mut entity = store
184 .get_entity(id)
185 .await?
186 .ok_or_else(|| RuntimeError::NotFound(format!("entity {id}")))?;
187
188 Self::ensure_namespace(&entity.namespace, token.namespace().as_str())?;
189
190 let mut text_changed = false;
191 let mut changed_fields: Vec<&'static str> = Vec::new();
192
193 if let Some(name) = patch.name {
194 text_changed |= entity.name != name;
195 entity.name = name;
196 changed_fields.push("name");
197 }
198 if let Some(desc_patch) = patch.description {
199 text_changed |= entity.description != desc_patch;
200 entity.description = desc_patch;
201 changed_fields.push("description");
202 }
203 if let Some(props) = patch.properties {
204 let (merged, _) = merge_properties(
205 &entity.properties,
206 &Some(props),
207 EntityDedupMergePolicy::PreferFrom,
208 );
209 entity.properties = merged;
210 changed_fields.push("properties");
211 }
212 if let Some(tags) = patch.tags {
213 entity.tags = tags;
214 changed_fields.push("tags");
215 }
216
217 entity.updated_at = chrono::Utc::now().timestamp_micros();
218 store.upsert_entity(entity.clone()).await?;
219
220 if text_changed {
221 self.reindex_entity(token, &entity).await?;
222 }
223
224 let event_store = self.events(token)?;
225 let event = khive_storage::event::Event::new(
226 entity.namespace.clone(),
227 "update",
228 EventKind::EntityUpdated,
229 SubstrateKind::Entity,
230 "",
231 )
232 .with_target(entity.id)
233 .with_payload(serde_json::json!({
234 "id": entity.id,
235 "namespace": entity.namespace,
236 "changed_fields": changed_fields,
237 }));
238 event_store.append_event(event).await.map_err(|e| {
239 RuntimeError::Internal(format!("update_entity: event store write failed: {e}"))
240 })?;
241
242 Ok(entity)
243 }
244
245 pub async fn merge_entity(
258 &self,
259 token: &NamespaceToken,
260 into_id: Uuid,
261 from_id: Uuid,
262 strategy: EntityDedupMergePolicy,
263 dry_run: bool,
264 ) -> RuntimeResult<MergeSummary> {
265 if into_id == from_id {
266 return Err(RuntimeError::InvalidInput(
267 "cannot merge an entity into itself".into(),
268 ));
269 }
270 {
274 let into_entity = self.get_entity(token, into_id).await?;
275 let from_entity = self.get_entity(token, from_id).await?;
276 if into_entity.kind != from_entity.kind {
277 return Err(RuntimeError::InvalidInput(format!(
278 "cannot merge entities of different kinds: into={} ({}), from={} ({}); \
279 merge requires both entities to share the same kind",
280 into_id, into_entity.kind, from_id, from_entity.kind
281 )));
282 }
283 }
284 let ns = token.namespace().as_str().to_owned();
285 let sanitized_ns: String = ns
286 .chars()
287 .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
288 .collect();
289 let fts_table = format!("fts_entities_{}", sanitized_ns);
290 let vec_table = self.config().embedding_model.map(|model| {
291 let key: String = model
292 .to_string()
293 .chars()
294 .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
295 .collect();
296 format!("vec_{}", key)
297 });
298
299 let _ = self.entities(token)?;
302 let _ = self.graph(token)?;
303 let _ = self.text(token)?;
304 if self.config().embedding_model.is_some() {
305 let _ = self.vectors(token)?;
306 }
307
308 let pool = self.backend().pool_arc();
309
310 let (summary, updated_entity) = tokio::task::spawn_blocking(move || {
311 let guard = pool.writer()?;
312 guard.transaction(|conn| {
313 merge_entity_sql(
314 conn, ns, fts_table, vec_table, into_id, from_id, strategy, dry_run,
315 )
316 })
317 })
318 .await
319 .map_err(|e| RuntimeError::Internal(e.to_string()))??;
320
321 if !dry_run && self.config().embedding_model.is_some() {
324 self.reindex_entity(token, &updated_entity).await?;
325 }
326
327 let event_store = self.events(token)?;
328 let policy_str = match strategy {
331 EntityDedupMergePolicy::PreferInto => "prefer_into",
332 EntityDedupMergePolicy::PreferFrom => "prefer_from",
333 EntityDedupMergePolicy::Union => "union",
334 };
335 let event = khive_storage::event::Event::new(
336 updated_entity.namespace.clone(),
337 "merge",
338 EventKind::EntityMerged,
339 SubstrateKind::Entity,
340 "",
341 )
342 .with_target(summary.kept_id)
343 .with_payload(serde_json::json!({
344 "into_id": summary.kept_id,
345 "from_id": summary.removed_id,
346 "policy": policy_str,
347 "edges_rewired": summary.edges_rewired,
348 }));
349 event_store.append_event(event).await.map_err(|e| {
350 RuntimeError::Internal(format!("merge_entity: event store write failed: {e}"))
351 })?;
352
353 Ok(summary)
354 }
355
356 pub(crate) async fn reindex_entity(
364 &self,
365 token: &NamespaceToken,
366 entity: &Entity,
367 ) -> RuntimeResult<()> {
368 let body = match &entity.description {
369 Some(d) if !d.is_empty() => format!("{} {}", entity.name, d),
370 _ => entity.name.clone(),
371 };
372 let ns = entity.namespace.clone();
374 self.text(token)?
375 .upsert_document(TextDocument {
376 subject_id: entity.id,
377 kind: SubstrateKind::Entity,
378 title: Some(entity.name.clone()),
379 body: body.clone(),
380 tags: entity.tags.clone(),
381 namespace: ns.clone(),
382 metadata: entity.properties.clone(),
383 updated_at: chrono::Utc::now(),
384 })
385 .await?;
386
387 if self.config().embedding_model.is_some() {
388 let vector = self.embed(&body).await?;
389 self.vectors(token)?
390 .insert(
391 entity.id,
392 SubstrateKind::Entity,
393 &ns,
394 "entity.body",
395 vec![vector],
396 )
397 .await?;
398 }
399
400 Ok(())
401 }
402
403 pub(crate) async fn remove_from_indexes(
405 &self,
406 token: &NamespaceToken,
407 id: Uuid,
408 ) -> RuntimeResult<()> {
409 let ns = token.namespace().as_str().to_owned();
410 self.text(token)?.delete_document(&ns, id).await?;
411 if self.config().embedding_model.is_some() {
412 self.vectors(token)?.delete(id).await?;
413 }
414 Ok(())
415 }
416
417 pub(crate) async fn reindex_note(
419 &self,
420 token: &NamespaceToken,
421 note: &khive_storage::note::Note,
422 ) -> RuntimeResult<()> {
423 let ns = note.namespace.clone();
424 self.text_for_notes(token)?
425 .upsert_document(TextDocument {
426 subject_id: note.id,
427 kind: SubstrateKind::Note,
428 title: note.name.clone(),
429 body: note.content.clone(),
430 tags: Vec::new(),
431 namespace: ns.clone(),
432 metadata: note.properties.clone(),
433 updated_at: chrono::Utc::now(),
434 })
435 .await?;
436
437 if self.config().embedding_model.is_some() {
438 let vector = self.embed(¬e.content).await?;
439 self.vectors(token)?
440 .insert(
441 note.id,
442 SubstrateKind::Note,
443 &ns,
444 "note.content",
445 vec![vector],
446 )
447 .await?;
448 }
449 Ok(())
450 }
451
452 pub async fn update_note(
454 &self,
455 token: &NamespaceToken,
456 id: Uuid,
457 patch: NotePatch,
458 ) -> RuntimeResult<khive_storage::note::Note> {
459 let store = self.notes(token)?;
460 let mut note = store
461 .get_note(id)
462 .await?
463 .ok_or_else(|| RuntimeError::NotFound(format!("note {id}")))?;
464
465 Self::ensure_namespace(¬e.namespace, token.namespace().as_str())?;
466
467 let mut text_changed = false;
468
469 if let Some(name_patch) = patch.name {
470 text_changed |= note.name != name_patch;
471 note.name = name_patch;
472 }
473 if let Some(content) = patch.content {
474 text_changed |= note.content != content;
475 note.content = content;
476 }
477 if let Some(salience_patch) = patch.salience {
478 if let Some(s) = salience_patch {
481 if !s.is_finite() || !(0.0..=1.0).contains(&s) {
482 return Err(crate::RuntimeError::InvalidInput(format!(
483 "salience must be a finite value in [0.0, 1.0]; got {s}"
484 )));
485 }
486 }
487 note.salience = salience_patch;
488 }
489 if let Some(decay_patch) = patch.decay_factor {
490 if let Some(d) = decay_patch {
492 if !d.is_finite() || d < 0.0 {
493 return Err(crate::RuntimeError::InvalidInput(format!(
494 "decay_factor must be a finite value >= 0.0; got {d}"
495 )));
496 }
497 }
498 note.decay_factor = decay_patch;
499 }
500 if let Some(props) = patch.properties {
501 let (merged, _) = merge_properties(
502 ¬e.properties,
503 &Some(props),
504 EntityDedupMergePolicy::PreferFrom,
505 );
506 note.properties = merged;
507 }
508 if let Some(status) = patch.kind_status {
509 note.status = status;
510 }
511
512 note.updated_at = chrono::Utc::now().timestamp_micros();
513 store.upsert_note(note.clone()).await?;
514
515 if text_changed {
516 self.reindex_note(token, ¬e).await?;
517 }
518
519 Ok(note)
520 }
521
522 pub async fn merge_note(
531 &self,
532 token: &NamespaceToken,
533 into_id: Uuid,
534 from_id: Uuid,
535 strategy: EntityDedupMergePolicy,
536 content_strategy: ContentMergeStrategy,
537 dry_run: bool,
538 ) -> RuntimeResult<MergeSummary> {
539 if into_id == from_id {
540 return Err(RuntimeError::InvalidInput(
541 "cannot merge a note into itself".into(),
542 ));
543 }
544 let ns = token.namespace().as_str().to_string();
545 let sanitized_ns: String = ns
546 .chars()
547 .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
548 .collect();
549 let fts_table = format!("fts_notes_{}", sanitized_ns);
550 let vec_table = self.config().embedding_model.map(|model| {
551 let key: String = model
552 .to_string()
553 .chars()
554 .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
555 .collect();
556 format!("vec_{}", key)
557 });
558
559 let note_store = self.notes(token)?;
560 let into_note = note_store
561 .get_note(into_id)
562 .await?
563 .ok_or_else(|| RuntimeError::NotFound("not found in this namespace".into()))?;
564 Self::ensure_namespace(&into_note.namespace, &ns)?;
565
566 let from_note = note_store
567 .get_note(from_id)
568 .await?
569 .ok_or_else(|| RuntimeError::NotFound("not found in this namespace".into()))?;
570 Self::ensure_namespace(&from_note.namespace, &ns)?;
571
572 let _ = self.graph(token)?;
573 let _ = self.text_for_notes(token)?;
574 if self.config().embedding_model.is_some() {
575 let _ = self.vectors(token)?;
576 }
577
578 let pool = self.backend().pool_arc();
579 let (summary, updated_note) = tokio::task::spawn_blocking(move || {
580 let guard = pool.writer()?;
581 guard.transaction(|conn| {
582 merge_note_sql(
583 conn,
584 ns,
585 fts_table,
586 vec_table,
587 into_id,
588 from_id,
589 strategy,
590 content_strategy,
591 dry_run,
592 )
593 })
594 })
595 .await
596 .map_err(|e| RuntimeError::Internal(e.to_string()))??;
597
598 if !dry_run && self.config().embedding_model.is_some() {
599 self.reindex_note(token, &updated_note).await?;
600 }
601 Ok(summary)
602 }
603}
604
605fn read_merge_entity(
611 conn: &rusqlite::Connection,
612 id: Uuid,
613 namespace: &str,
614) -> Result<Entity, SqliteError> {
615 let id_str = id.to_string();
616 let mut stmt = conn.prepare(
617 "SELECT id, namespace, kind, entity_type, name, description, properties, tags, \
618 created_at, updated_at, deleted_at, merged_into, merge_event_id \
619 FROM entities WHERE id = ?1 AND deleted_at IS NULL",
620 )?;
621 let mut rows = stmt.query(rusqlite::params![id_str])?;
622 let row = rows
623 .next()?
624 .ok_or_else(|| SqliteError::InvalidData(format!("entity {id} not found")))?;
625
626 let id_s: String = row.get(0)?;
627 let ns: String = row.get(1)?;
628 let kind: String = row.get(2)?;
629 let entity_type: Option<String> = row.get(3)?;
630 let name: String = row.get(4)?;
631 let description: Option<String> = row.get(5)?;
632 let properties_str: Option<String> = row.get(6)?;
633 let tags_str: String = row.get(7)?;
634 let created_at: i64 = row.get(8)?;
635 let updated_at: i64 = row.get(9)?;
636 let deleted_at: Option<i64> = row.get(10)?;
637 let merged_into_str: Option<String> = row.get(11)?;
638 let merge_event_id_str: Option<String> = row.get(12)?;
639
640 if ns != namespace {
641 return Err(SqliteError::InvalidData(format!(
642 "entity {id} belongs to namespace '{ns}', not '{namespace}'"
643 )));
644 }
645
646 let entity_id = Uuid::parse_str(&id_s).map_err(|e| SqliteError::InvalidData(e.to_string()))?;
647 let properties: Option<Value> = properties_str
648 .map(|s| {
649 serde_json::from_str::<Value>(&s).map_err(|e| SqliteError::InvalidData(e.to_string()))
650 })
651 .transpose()?;
652 let tags: Vec<String> =
653 serde_json::from_str(&tags_str).map_err(|e| SqliteError::InvalidData(e.to_string()))?;
654 let merged_into = merged_into_str
655 .as_deref()
656 .map(Uuid::parse_str)
657 .transpose()
658 .map_err(|e| SqliteError::InvalidData(e.to_string()))?;
659 let merge_event_id = merge_event_id_str
660 .as_deref()
661 .map(Uuid::parse_str)
662 .transpose()
663 .map_err(|e| SqliteError::InvalidData(e.to_string()))?;
664
665 Ok(Entity {
666 id: entity_id,
667 namespace: ns,
668 kind,
669 entity_type,
670 name,
671 description,
672 properties,
673 tags,
674 created_at,
675 updated_at,
676 deleted_at,
677 merged_into,
678 merge_event_id,
679 })
680}
681
682#[allow(clippy::too_many_arguments)]
690fn merge_entity_sql(
691 conn: &rusqlite::Connection,
692 namespace: String,
693 fts_table: String,
694 vec_table: Option<String>,
695 into_id: Uuid,
696 from_id: Uuid,
697 strategy: EntityDedupMergePolicy,
698 dry_run: bool,
699) -> Result<(MergeSummary, Entity), SqliteError> {
700 let into_entity = read_merge_entity(conn, into_id, &namespace)?;
701 let from_entity = read_merge_entity(conn, from_id, &namespace)?;
702
703 #[allow(dead_code)]
705 struct EdgeRow {
706 id: Uuid,
707 source_id: Uuid,
708 target_id: Uuid,
709 relation: String,
710 weight: f64,
711 created_at: i64,
712 updated_at: i64,
713 deleted_at: Option<i64>,
714 target_backend: Option<String>,
715 metadata: Option<String>,
716 }
717
718 let parse_id =
719 |s: String| Uuid::parse_str(&s).map_err(|e| SqliteError::InvalidData(e.to_string()));
720
721 let from_str = from_id.to_string();
722
723 let mut outbound: Vec<EdgeRow> = Vec::new();
724 {
725 let mut stmt = conn.prepare(
726 "SELECT id, source_id, target_id, relation, weight, created_at, \
727 updated_at, deleted_at, target_backend, metadata \
728 FROM graph_edges WHERE namespace = ?1 AND source_id = ?2",
729 )?;
730 let mut rows = stmt.query(rusqlite::params![&namespace, &from_str])?;
731 while let Some(row) = rows.next()? {
732 outbound.push(EdgeRow {
733 id: parse_id(row.get(0)?)?,
734 source_id: parse_id(row.get(1)?)?,
735 target_id: parse_id(row.get(2)?)?,
736 relation: row.get(3)?,
737 weight: row.get(4)?,
738 created_at: row.get(5)?,
739 updated_at: row.get(6)?,
740 deleted_at: row.get(7)?,
741 target_backend: row.get(8)?,
742 metadata: row.get(9)?,
743 });
744 }
745 }
746
747 let mut inbound: Vec<EdgeRow> = Vec::new();
748 {
749 let mut stmt = conn.prepare(
750 "SELECT id, source_id, target_id, relation, weight, created_at, \
751 updated_at, deleted_at, target_backend, metadata \
752 FROM graph_edges WHERE namespace = ?1 AND target_id = ?2",
753 )?;
754 let mut rows = stmt.query(rusqlite::params![&namespace, &from_str])?;
755 while let Some(row) = rows.next()? {
756 inbound.push(EdgeRow {
757 id: parse_id(row.get(0)?)?,
758 source_id: parse_id(row.get(1)?)?,
759 target_id: parse_id(row.get(2)?)?,
760 relation: row.get(3)?,
761 weight: row.get(4)?,
762 created_at: row.get(5)?,
763 updated_at: row.get(6)?,
764 deleted_at: row.get(7)?,
765 target_backend: row.get(8)?,
766 metadata: row.get(9)?,
767 });
768 }
769 }
770
771 let mut seen: HashSet<Uuid> = HashSet::new();
773 let mut all_edges: Vec<EdgeRow> = Vec::new();
774 for edge in outbound.into_iter().chain(inbound) {
775 if seen.insert(edge.id) {
776 all_edges.push(edge);
777 }
778 }
779
780 let (merged_props, properties_merged) =
782 merge_properties(&into_entity.properties, &from_entity.properties, strategy);
783 let merged_name = merge_string_field(&into_entity.name, &from_entity.name, strategy);
784 let merged_description =
785 merge_option_string_field(&into_entity.description, &from_entity.description, strategy);
786 let (merged_tags, tags_unioned) = union_tags(&into_entity.tags, &from_entity.tags);
787
788 let now = chrono::Utc::now().timestamp_micros();
789 let into_str = into_id.to_string();
790 let props_str = merged_props
791 .as_ref()
792 .map(|v| serde_json::to_string(v).unwrap_or_default());
793 let tags_json = serde_json::to_string(&merged_tags).unwrap_or_else(|_| "[]".to_string());
794
795 let mut edges_rewired = 0usize;
797 if !dry_run {
798 for edge in all_edges {
799 let raw_src = if edge.source_id == from_id {
800 into_id
801 } else {
802 edge.source_id
803 };
804 let raw_tgt = if edge.target_id == from_id {
805 into_id
806 } else {
807 edge.target_id
808 };
809 let (new_src, new_tgt) = match edge.relation.parse::<EdgeRelation>() {
812 Ok(rel) => canonical_edge_endpoints(rel, raw_src, raw_tgt),
813 Err(_) => (raw_src, raw_tgt),
814 };
815
816 if new_src == new_tgt {
817 conn.execute(
818 "DELETE FROM graph_edges WHERE namespace = ?1 AND id = ?2",
819 rusqlite::params![&namespace, edge.id.to_string()],
820 )?;
821 continue;
822 }
823
824 let now_ts = chrono::Utc::now().timestamp();
825 let conflict_id: Option<String> = {
839 let conflict_src = new_src.to_string();
840 let conflict_tgt = new_tgt.to_string();
841 conn.query_row(
842 "SELECT id FROM graph_edges \
843 WHERE namespace = ?1 AND source_id = ?2 AND target_id = ?3 \
844 AND relation = ?4 AND id != ?5",
845 rusqlite::params![
846 &namespace,
847 &conflict_src,
848 &conflict_tgt,
849 &edge.relation,
850 edge.id.to_string(),
851 ],
852 |row| row.get(0),
853 )
854 .optional()
855 .map_err(SqliteError::Rusqlite)?
856 };
857
858 let changed = if let Some(existing_id) = conflict_id {
859 conn.execute(
862 "DELETE FROM graph_edges WHERE namespace = ?1 AND id = ?2",
863 rusqlite::params![&namespace, edge.id.to_string()],
864 )?;
865 conn.execute(
866 "UPDATE graph_edges SET \
867 weight = ?1, updated_at = ?2, deleted_at = NULL, \
868 target_backend = ?3, metadata = ?4 \
869 WHERE namespace = ?5 AND id = ?6",
870 rusqlite::params![
871 edge.weight,
872 now_ts,
873 edge.target_backend,
874 edge.metadata,
875 &namespace,
876 &existing_id,
877 ],
878 )?
879 } else {
880 conn.execute(
883 "UPDATE graph_edges SET \
884 source_id = ?1, target_id = ?2, updated_at = ?3 \
885 WHERE namespace = ?4 AND id = ?5",
886 rusqlite::params![
887 new_src.to_string(),
888 new_tgt.to_string(),
889 now_ts,
890 &namespace,
891 edge.id.to_string(),
892 ],
893 )?
894 };
895 if changed > 0 {
896 edges_rewired += 1;
897 }
898 }
899
900 conn.execute(
902 "INSERT OR REPLACE INTO entities \
903 (id, namespace, kind, name, description, properties, tags, \
904 created_at, updated_at, deleted_at, merged_into, merge_event_id) \
905 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
906 rusqlite::params![
907 &into_str,
908 &namespace,
909 &into_entity.kind,
910 &merged_name,
911 &merged_description,
912 &props_str,
913 &tags_json,
914 into_entity.created_at,
915 now,
916 into_entity.deleted_at,
917 Option::<String>::None,
918 Option::<String>::None,
919 ],
920 )?;
921
922 let fts_body = match &merged_description {
924 Some(d) if !d.is_empty() => format!("{} {}", merged_name, d),
925 _ => merged_name.clone(),
926 };
927 let kind_str = SubstrateKind::Entity.to_string();
928
929 conn.execute(
930 &format!(
931 "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
932 fts_table
933 ),
934 rusqlite::params![&namespace, &into_str],
935 )?;
936 conn.execute(
937 &format!(
938 "INSERT INTO {} \
939 (subject_id, kind, title, body, tags, namespace, metadata, updated_at) \
940 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
941 fts_table
942 ),
943 rusqlite::params![
944 &into_str,
945 &kind_str,
946 &merged_name,
947 &fts_body,
948 &tags_json,
949 &namespace,
950 &props_str,
951 now,
952 ],
953 )?;
954
955 conn.execute(
957 &format!(
958 "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
959 fts_table
960 ),
961 rusqlite::params![&namespace, &from_str],
962 )?;
963
964 if let Some(ref vec_tbl) = vec_table {
966 conn.execute(
967 &format!(
968 "DELETE FROM {} WHERE subject_id = ?1 AND namespace = ?2",
969 vec_tbl
970 ),
971 rusqlite::params![&from_str, &namespace],
972 )?;
973 }
974
975 let merge_event_id = Uuid::new_v4();
977 conn.execute(
978 "UPDATE entities \
979 SET deleted_at = ?1, merged_into = ?2, merge_event_id = ?3, updated_at = ?1 \
980 WHERE namespace = ?4 AND id = ?5 AND deleted_at IS NULL",
981 rusqlite::params![
982 now,
983 into_str,
984 merge_event_id.to_string(),
985 &namespace,
986 &from_str,
987 ],
988 )?;
989 }
990
991 let updated_entity = Entity {
992 id: into_id,
993 namespace,
994 kind: into_entity.kind,
995 entity_type: into_entity.entity_type,
996 name: merged_name,
997 description: merged_description,
998 properties: merged_props,
999 tags: merged_tags,
1000 created_at: into_entity.created_at,
1001 updated_at: now,
1002 deleted_at: into_entity.deleted_at,
1003 merged_into: None,
1004 merge_event_id: None,
1005 };
1006
1007 Ok((
1008 MergeSummary {
1009 kept_id: into_id,
1010 removed_id: from_id,
1011 edges_rewired,
1012 properties_merged,
1013 tags_unioned,
1014 content_appended: false,
1015 dry_run,
1016 },
1017 updated_entity,
1018 ))
1019}
1020
1021fn read_merge_note(
1027 conn: &rusqlite::Connection,
1028 id: Uuid,
1029 namespace: &str,
1030) -> Result<khive_storage::note::Note, SqliteError> {
1031 use khive_storage::note::Note;
1032 let id_str = id.to_string();
1033 let mut stmt = conn.prepare(
1034 "SELECT id, namespace, kind, status, name, content, salience, decay_factor, \
1035 expires_at, properties, created_at, updated_at, deleted_at \
1036 FROM notes WHERE id = ?1 AND deleted_at IS NULL",
1037 )?;
1038 let mut rows = stmt.query(rusqlite::params![id_str])?;
1039 let row = rows
1040 .next()?
1041 .ok_or_else(|| SqliteError::InvalidData(format!("note {id} not found")))?;
1042
1043 let id_s: String = row.get(0)?;
1044 let ns: String = row.get(1)?;
1045 let kind: String = row.get(2)?;
1046 let status: String = row.get(3)?;
1047 let name: Option<String> = row.get(4)?;
1048 let content: String = row.get(5)?;
1049 let salience: Option<f64> = row.get(6)?;
1050 let decay_factor: Option<f64> = row.get(7)?;
1051 let expires_at: Option<i64> = row.get(8)?;
1052 let properties_str: Option<String> = row.get(9)?;
1053 let created_at: i64 = row.get(10)?;
1054 let updated_at: i64 = row.get(11)?;
1055 let deleted_at: Option<i64> = row.get(12)?;
1056
1057 if ns != namespace {
1058 return Err(SqliteError::InvalidData(format!(
1059 "note {id} belongs to namespace '{ns}', not '{namespace}'"
1060 )));
1061 }
1062
1063 let note_id = Uuid::parse_str(&id_s).map_err(|e| SqliteError::InvalidData(e.to_string()))?;
1064 let properties: Option<serde_json::Value> = properties_str
1065 .map(|s| serde_json::from_str(&s).map_err(|e| SqliteError::InvalidData(e.to_string())))
1066 .transpose()?;
1067
1068 Ok(Note {
1069 id: note_id,
1070 namespace: ns,
1071 kind,
1072 status,
1073 name,
1074 content,
1075 salience,
1076 decay_factor,
1077 expires_at,
1078 properties,
1079 created_at,
1080 updated_at,
1081 deleted_at,
1082 })
1083}
1084
1085fn max_option_f64(a: Option<f64>, b: Option<f64>) -> Option<f64> {
1086 match (a, b) {
1087 (Some(x), Some(y)) => Some(x.max(y)),
1088 (Some(x), None) => Some(x),
1089 (None, Some(y)) => Some(y),
1090 (None, None) => None,
1091 }
1092}
1093
1094fn append_merge_history(props: Option<Value>, entry: Value) -> Result<Option<Value>, SqliteError> {
1095 use serde_json::{json, Map};
1096 let mut obj: Map<String, Value> = match props {
1097 Some(Value::Object(m)) => m,
1098 Some(other) => {
1099 let mut m = Map::new();
1100 m.insert("_value".into(), other);
1101 m
1102 }
1103 None => Map::new(),
1104 };
1105 let history = obj
1106 .entry("_merge_history".to_string())
1107 .or_insert_with(|| json!([]));
1108 if let Value::Array(arr) = history {
1109 arr.push(entry);
1110 }
1111 Ok(Some(Value::Object(obj)))
1112}
1113
1114#[allow(clippy::too_many_arguments)]
1122fn merge_note_sql(
1123 conn: &rusqlite::Connection,
1124 namespace: String,
1125 fts_table: String,
1126 vec_table: Option<String>,
1127 into_id: Uuid,
1128 from_id: Uuid,
1129 strategy: EntityDedupMergePolicy,
1130 content_strategy: ContentMergeStrategy,
1131 dry_run: bool,
1132) -> Result<(MergeSummary, khive_storage::note::Note), SqliteError> {
1133 let into_note = read_merge_note(conn, into_id, &namespace)?;
1134 let from_note = read_merge_note(conn, from_id, &namespace)?;
1135
1136 if into_note.kind != from_note.kind {
1137 return Err(SqliteError::InvalidData(format!(
1138 "cannot merge notes of different kinds: {} vs {}",
1139 into_note.kind, from_note.kind
1140 )));
1141 }
1142
1143 let now = chrono::Utc::now().timestamp_micros();
1144 let into_str = into_id.to_string();
1145 let from_str = from_id.to_string();
1146
1147 #[allow(dead_code)]
1149 struct EdgeRow {
1150 id: Uuid,
1151 source_id: Uuid,
1152 target_id: Uuid,
1153 relation: String,
1154 weight: f64,
1155 created_at: i64,
1156 updated_at: i64,
1157 deleted_at: Option<i64>,
1158 target_backend: Option<String>,
1159 metadata: Option<String>,
1160 }
1161 let parse_id =
1162 |s: String| Uuid::parse_str(&s).map_err(|e| SqliteError::InvalidData(e.to_string()));
1163
1164 let mut outbound: Vec<EdgeRow> = Vec::new();
1165 {
1166 let mut stmt = conn.prepare(
1167 "SELECT id, source_id, target_id, relation, weight, created_at, updated_at, deleted_at, target_backend, metadata \
1168 FROM graph_edges WHERE namespace = ?1 AND source_id = ?2",
1169 )?;
1170 let mut rows = stmt.query(rusqlite::params![&namespace, &from_str])?;
1171 while let Some(row) = rows.next()? {
1172 outbound.push(EdgeRow {
1173 id: parse_id(row.get(0)?)?,
1174 source_id: parse_id(row.get(1)?)?,
1175 target_id: parse_id(row.get(2)?)?,
1176 relation: row.get(3)?,
1177 weight: row.get(4)?,
1178 created_at: row.get(5)?,
1179 updated_at: row.get(6)?,
1180 deleted_at: row.get(7)?,
1181 target_backend: row.get(8)?,
1182 metadata: row.get(9)?,
1183 });
1184 }
1185 }
1186 let mut inbound: Vec<EdgeRow> = Vec::new();
1187 {
1188 let mut stmt = conn.prepare(
1189 "SELECT id, source_id, target_id, relation, weight, created_at, updated_at, deleted_at, target_backend, metadata \
1190 FROM graph_edges WHERE namespace = ?1 AND target_id = ?2",
1191 )?;
1192 let mut rows = stmt.query(rusqlite::params![&namespace, &from_str])?;
1193 while let Some(row) = rows.next()? {
1194 inbound.push(EdgeRow {
1195 id: parse_id(row.get(0)?)?,
1196 source_id: parse_id(row.get(1)?)?,
1197 target_id: parse_id(row.get(2)?)?,
1198 relation: row.get(3)?,
1199 weight: row.get(4)?,
1200 created_at: row.get(5)?,
1201 updated_at: row.get(6)?,
1202 deleted_at: row.get(7)?,
1203 target_backend: row.get(8)?,
1204 metadata: row.get(9)?,
1205 });
1206 }
1207 }
1208 let mut seen: HashSet<Uuid> = HashSet::new();
1209 let mut all_edges: Vec<EdgeRow> = Vec::new();
1210 for edge in outbound.into_iter().chain(inbound) {
1211 if seen.insert(edge.id) {
1212 all_edges.push(edge);
1213 }
1214 }
1215
1216 let (merged_content, content_appended) = match content_strategy {
1218 ContentMergeStrategy::Append => {
1219 if from_note.content.is_empty() {
1220 (into_note.content.clone(), false)
1221 } else {
1222 (
1223 format!("{}\n\n---\n\n{}", into_note.content, from_note.content),
1224 true,
1225 )
1226 }
1227 }
1228 ContentMergeStrategy::PreferInto => (into_note.content.clone(), false),
1229 ContentMergeStrategy::PreferFrom => (from_note.content.clone(), false),
1230 };
1231
1232 let merged_name = match strategy {
1233 EntityDedupMergePolicy::PreferFrom => from_note.name.clone().or(into_note.name.clone()),
1234 _ => into_note.name.clone().or(from_note.name.clone()),
1235 };
1236
1237 let (merged_props, properties_merged) =
1238 merge_properties(&into_note.properties, &from_note.properties, strategy);
1239
1240 let merge_history_entry = serde_json::json!({
1242 "merged_from": from_id.to_string(),
1243 "merged_at": now,
1244 "strategy": format!("{:?}", strategy),
1245 "content_strategy": format!("{:?}", content_strategy),
1246 });
1247 let merged_props = append_merge_history(merged_props, merge_history_entry)?;
1248
1249 let merged_salience = max_option_f64(into_note.salience, from_note.salience);
1250 let merged_expires_at = match (into_note.expires_at, from_note.expires_at) {
1251 (Some(a), Some(b)) => Some(a.max(b)),
1252 (Some(a), None) => Some(a),
1253 (None, Some(b)) => Some(b),
1254 (None, None) => None,
1255 };
1256
1257 let props_str = merged_props
1258 .as_ref()
1259 .map(|v| serde_json::to_string(v).unwrap_or_default());
1260
1261 let mut edges_rewired = 0usize;
1262 if !dry_run {
1263 for edge in all_edges {
1265 let raw_src = if edge.source_id == from_id {
1266 into_id
1267 } else {
1268 edge.source_id
1269 };
1270 let raw_tgt = if edge.target_id == from_id {
1271 into_id
1272 } else {
1273 edge.target_id
1274 };
1275 let (new_src, new_tgt) = match edge.relation.parse::<EdgeRelation>() {
1277 Ok(rel) => canonical_edge_endpoints(rel, raw_src, raw_tgt),
1278 Err(_) => (raw_src, raw_tgt),
1279 };
1280 if new_src == new_tgt {
1281 conn.execute(
1282 "DELETE FROM graph_edges WHERE namespace = ?1 AND id = ?2",
1283 rusqlite::params![&namespace, edge.id.to_string()],
1284 )?;
1285 continue;
1286 }
1287 let now_ts = chrono::Utc::now().timestamp();
1288 let conflict_id: Option<String> = {
1291 let conflict_src = new_src.to_string();
1292 let conflict_tgt = new_tgt.to_string();
1293 conn.query_row(
1294 "SELECT id FROM graph_edges \
1295 WHERE namespace = ?1 AND source_id = ?2 AND target_id = ?3 \
1296 AND relation = ?4 AND id != ?5",
1297 rusqlite::params![
1298 &namespace,
1299 &conflict_src,
1300 &conflict_tgt,
1301 &edge.relation,
1302 edge.id.to_string(),
1303 ],
1304 |row| row.get(0),
1305 )
1306 .optional()
1307 .map_err(SqliteError::Rusqlite)?
1308 };
1309
1310 let changed = if let Some(existing_id) = conflict_id {
1311 conn.execute(
1312 "DELETE FROM graph_edges WHERE namespace = ?1 AND id = ?2",
1313 rusqlite::params![&namespace, edge.id.to_string()],
1314 )?;
1315 conn.execute(
1316 "UPDATE graph_edges SET \
1317 weight = ?1, updated_at = ?2, deleted_at = NULL, \
1318 target_backend = ?3, metadata = ?4 \
1319 WHERE namespace = ?5 AND id = ?6",
1320 rusqlite::params![
1321 edge.weight,
1322 now_ts,
1323 edge.target_backend,
1324 edge.metadata,
1325 &namespace,
1326 &existing_id,
1327 ],
1328 )?
1329 } else {
1330 conn.execute(
1331 "UPDATE graph_edges SET \
1332 source_id = ?1, target_id = ?2, updated_at = ?3 \
1333 WHERE namespace = ?4 AND id = ?5",
1334 rusqlite::params![
1335 new_src.to_string(),
1336 new_tgt.to_string(),
1337 now_ts,
1338 &namespace,
1339 edge.id.to_string(),
1340 ],
1341 )?
1342 };
1343 if changed > 0 {
1344 edges_rewired += 1;
1345 }
1346 }
1347
1348 conn.execute(
1350 "INSERT OR REPLACE INTO notes \
1351 (id, namespace, kind, status, name, content, salience, decay_factor, \
1352 expires_at, properties, created_at, updated_at, deleted_at) \
1353 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
1354 rusqlite::params![
1355 &into_str,
1356 &namespace,
1357 &into_note.kind,
1358 &into_note.status,
1359 &merged_name,
1360 &merged_content,
1361 merged_salience,
1362 into_note.decay_factor,
1363 merged_expires_at,
1364 &props_str,
1365 into_note.created_at,
1366 now,
1367 into_note.deleted_at,
1368 ],
1369 )?;
1370
1371 conn.execute(
1373 &format!(
1374 "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
1375 fts_table
1376 ),
1377 rusqlite::params![&namespace, &into_str],
1378 )?;
1379 conn.execute(
1380 &format!(
1381 "INSERT INTO {} \
1382 (subject_id, kind, title, body, tags, namespace, metadata, updated_at) \
1383 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1384 fts_table
1385 ),
1386 rusqlite::params![
1387 &into_str,
1388 SubstrateKind::Note.to_string(),
1389 &merged_name,
1390 &merged_content,
1391 "[]",
1392 &namespace,
1393 &props_str,
1394 now,
1395 ],
1396 )?;
1397
1398 conn.execute(
1400 &format!(
1401 "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
1402 fts_table
1403 ),
1404 rusqlite::params![&namespace, &from_str],
1405 )?;
1406
1407 if let Some(ref vec_tbl) = vec_table {
1409 conn.execute(
1410 &format!(
1411 "DELETE FROM {} WHERE subject_id = ?1 AND namespace = ?2",
1412 vec_tbl
1413 ),
1414 rusqlite::params![&from_str, &namespace],
1415 )?;
1416 }
1417
1418 conn.execute(
1420 "UPDATE notes SET status = 'deleted', deleted_at = ?1, updated_at = ?1 \
1421 WHERE namespace = ?2 AND id = ?3 AND deleted_at IS NULL",
1422 rusqlite::params![now, &namespace, &from_str],
1423 )?;
1424 }
1425
1426 let updated_note = khive_storage::note::Note {
1427 id: into_id,
1428 namespace: namespace.clone(),
1429 kind: into_note.kind.clone(),
1430 status: into_note.status.clone(),
1431 name: merged_name,
1432 content: merged_content,
1433 salience: merged_salience,
1434 decay_factor: into_note.decay_factor,
1435 expires_at: merged_expires_at,
1436 properties: merged_props,
1437 created_at: into_note.created_at,
1438 updated_at: now,
1439 deleted_at: into_note.deleted_at,
1440 };
1441
1442 Ok((
1443 MergeSummary {
1444 kept_id: into_id,
1445 removed_id: from_id,
1446 edges_rewired,
1447 properties_merged,
1448 tags_unioned: 0,
1449 content_appended,
1450 dry_run,
1451 },
1452 updated_note,
1453 ))
1454}
1455
1456fn merge_string_field(into: &str, from: &str, strategy: EntityDedupMergePolicy) -> String {
1461 match strategy {
1462 EntityDedupMergePolicy::PreferInto | EntityDedupMergePolicy::Union => into.to_string(),
1463 EntityDedupMergePolicy::PreferFrom => from.to_string(),
1464 }
1465}
1466
1467fn merge_option_string_field(
1468 into: &Option<String>,
1469 from: &Option<String>,
1470 strategy: EntityDedupMergePolicy,
1471) -> Option<String> {
1472 match strategy {
1473 EntityDedupMergePolicy::PreferInto => {
1474 if into.is_some() {
1475 into.clone()
1476 } else {
1477 from.clone()
1478 }
1479 }
1480 EntityDedupMergePolicy::PreferFrom => {
1481 if from.is_some() {
1482 from.clone()
1483 } else {
1484 into.clone()
1485 }
1486 }
1487 EntityDedupMergePolicy::Union => {
1488 match (into, from) {
1490 (Some(a), _) if !a.is_empty() => Some(a.clone()),
1491 (_, Some(b)) => Some(b.clone()),
1492 _ => None,
1493 }
1494 }
1495 }
1496}
1497
1498fn merge_properties(
1500 into: &Option<Value>,
1501 from: &Option<Value>,
1502 strategy: EntityDedupMergePolicy,
1503) -> (Option<Value>, usize) {
1504 match (into, from) {
1505 (None, None) => (None, 0),
1506 (Some(a), None) => (Some(a.clone()), 0),
1507 (None, Some(b)) => {
1508 let count = if let Value::Object(m) = b { m.len() } else { 1 };
1509 (Some(b.clone()), count)
1510 }
1511 (Some(into_val), Some(from_val)) => {
1512 let (merged, added) = merge_json(into_val, from_val, strategy);
1513 (Some(merged), added)
1514 }
1515 }
1516}
1517
1518fn merge_json(into: &Value, from: &Value, strategy: EntityDedupMergePolicy) -> (Value, usize) {
1520 match (into, from, strategy) {
1521 (Value::Object(a), Value::Object(b), EntityDedupMergePolicy::Union) => {
1522 let mut result = a.clone();
1523 let mut added = 0usize;
1524 for (k, v_from) in b {
1525 if let Some(v_into) = a.get(k) {
1526 let (merged, sub_added) =
1527 merge_json(v_into, v_from, EntityDedupMergePolicy::Union);
1528 result.insert(k.clone(), merged);
1529 added += sub_added;
1530 } else {
1531 result.insert(k.clone(), v_from.clone());
1532 added += 1;
1533 }
1534 }
1535 (Value::Object(result), added)
1536 }
1537 (Value::Object(a), Value::Object(b), EntityDedupMergePolicy::PreferInto) => {
1538 let mut result = a.clone();
1539 let mut added = 0usize;
1540 for (k, v) in b {
1541 if !a.contains_key(k) {
1542 result.insert(k.clone(), v.clone());
1543 added += 1;
1544 }
1545 }
1546 (Value::Object(result), added)
1547 }
1548 (Value::Object(a), Value::Object(b), EntityDedupMergePolicy::PreferFrom) => {
1549 let mut result = a.clone();
1550 let mut added = 0usize;
1551 for (k, v) in b {
1552 result.insert(k.clone(), v.clone());
1553 if !a.contains_key(k) {
1554 added += 1;
1555 }
1556 }
1557 (Value::Object(result), added)
1558 }
1559 (_into_val, from_val, EntityDedupMergePolicy::PreferFrom) => (from_val.clone(), 1),
1561 _ => (into.clone(), 0),
1562 }
1563}
1564
1565fn union_tags(into: &[String], from: &[String]) -> (Vec<String>, usize) {
1566 let mut seen: HashSet<&str> = into.iter().map(|s| s.as_str()).collect();
1567 let mut result: Vec<String> = into.to_vec();
1568 let mut added = 0usize;
1569 for tag in from {
1570 if seen.insert(tag.as_str()) {
1571 result.push(tag.clone());
1572 added += 1;
1573 }
1574 }
1575 (result, added)
1576}
1577
1578#[cfg(test)]
1587mod tests {
1588 use super::*;
1589 use crate::runtime::{KhiveRuntime, NamespaceToken};
1590 use khive_storage::types::{Direction, TextFilter, TextQueryMode, TextSearchRequest};
1591
1592 fn rt() -> KhiveRuntime {
1593 KhiveRuntime::memory().unwrap()
1594 }
1595
1596 async fn fts_hit(rt: &KhiveRuntime, token: &NamespaceToken, query: &str) -> Vec<Uuid> {
1598 let ns = token.namespace().as_str().to_string();
1599 rt.text(token)
1600 .unwrap()
1601 .search(TextSearchRequest {
1602 query: query.to_string(),
1603 mode: TextQueryMode::Plain,
1604 filter: Some(TextFilter {
1605 namespaces: vec![ns],
1606 ..Default::default()
1607 }),
1608 top_k: 50,
1609 snippet_chars: 100,
1610 })
1611 .await
1612 .unwrap()
1613 .into_iter()
1614 .map(|h| h.subject_id)
1615 .collect()
1616 }
1617
1618 #[tokio::test]
1619 async fn update_entity_patch_changes_only_specified_fields() {
1620 let rt = rt();
1621 let tok = NamespaceToken::local();
1622 let entity = rt
1623 .create_entity(
1624 &tok,
1625 "concept",
1626 None,
1627 "OriginalName",
1628 Some("orig desc"),
1629 Some(serde_json::json!({"k":"v"})),
1630 vec![],
1631 )
1632 .await
1633 .unwrap();
1634
1635 let updated = rt
1636 .update_entity(
1637 &tok,
1638 entity.id,
1639 EntityPatch {
1640 description: Some(Some("new desc".to_string())),
1641 ..Default::default()
1642 },
1643 )
1644 .await
1645 .unwrap();
1646
1647 assert_eq!(updated.name, "OriginalName");
1648 assert_eq!(updated.description.as_deref(), Some("new desc"));
1649 assert_eq!(updated.properties, Some(serde_json::json!({"k":"v"})));
1650 }
1651
1652 #[tokio::test]
1653 async fn update_entity_clear_description_with_some_none() {
1654 let rt = rt();
1655 let tok = NamespaceToken::local();
1656 let entity = rt
1657 .create_entity(
1658 &tok,
1659 "concept",
1660 None,
1661 "ClearDesc",
1662 Some("has description"),
1663 None,
1664 vec![],
1665 )
1666 .await
1667 .unwrap();
1668
1669 let updated = rt
1670 .update_entity(
1671 &tok,
1672 entity.id,
1673 EntityPatch {
1674 description: Some(None),
1675 ..Default::default()
1676 },
1677 )
1678 .await
1679 .unwrap();
1680
1681 assert!(
1682 updated.description.is_none(),
1683 "description should be cleared"
1684 );
1685 }
1686
1687 #[tokio::test]
1688 async fn update_entity_reindexes_when_name_changes() {
1689 let rt = rt();
1690 let tok = NamespaceToken::local();
1691 let entity = rt
1692 .create_entity(&tok, "concept", None, "OldName", None, None, vec![])
1693 .await
1694 .unwrap();
1695
1696 let hits_before = fts_hit(&rt, &tok, "OldName").await;
1698 assert!(
1699 hits_before.contains(&entity.id),
1700 "entity should be findable by old name"
1701 );
1702
1703 rt.update_entity(
1704 &tok,
1705 entity.id,
1706 EntityPatch {
1707 name: Some("NewName".to_string()),
1708 ..Default::default()
1709 },
1710 )
1711 .await
1712 .unwrap();
1713
1714 let hits_old = fts_hit(&rt, &tok, "OldName").await;
1715 let hits_new = fts_hit(&rt, &tok, "NewName").await;
1716
1717 assert!(
1719 !hits_old.contains(&entity.id),
1720 "old name should no longer match after rename"
1721 );
1722 assert!(
1723 hits_new.contains(&entity.id),
1724 "new name should be findable after rename"
1725 );
1726 }
1727
1728 #[tokio::test]
1729 async fn update_entity_properties_merges_preserving_existing_keys() {
1730 let rt = rt();
1731 let tok = NamespaceToken::local();
1732 let entity = rt
1733 .create_entity(
1734 &tok,
1735 "concept",
1736 None,
1737 "MergeProps",
1738 None,
1739 Some(serde_json::json!({
1740 "domain": "inference",
1741 "repo": "lattice",
1742 "status": "researched",
1743 })),
1744 vec![],
1745 )
1746 .await
1747 .unwrap();
1748
1749 let updated = rt
1750 .update_entity(
1751 &tok,
1752 entity.id,
1753 EntityPatch {
1754 properties: Some(serde_json::json!({"status": "implemented"})),
1755 ..Default::default()
1756 },
1757 )
1758 .await
1759 .unwrap();
1760
1761 let props = updated.properties.expect("properties should remain set");
1762 assert_eq!(props["domain"], "inference", "domain key must be preserved");
1763 assert_eq!(props["repo"], "lattice", "repo key must be preserved");
1764 assert_eq!(
1765 props["status"], "implemented",
1766 "status key must be updated by patch"
1767 );
1768 }
1769
1770 #[tokio::test]
1771 async fn update_entity_skips_reindex_when_only_properties_change() {
1772 let rt = rt();
1773 let tok = NamespaceToken::local();
1774 let entity = rt
1775 .create_entity(&tok, "concept", None, "StableIndexed", None, None, vec![])
1776 .await
1777 .unwrap();
1778
1779 let hits_before = fts_hit(&rt, &tok, "StableIndexed").await;
1781 assert!(hits_before.contains(&entity.id));
1782
1783 rt.update_entity(
1785 &tok,
1786 entity.id,
1787 EntityPatch {
1788 properties: Some(serde_json::json!({"new": "prop"})),
1789 ..Default::default()
1790 },
1791 )
1792 .await
1793 .unwrap();
1794
1795 let hits_after = fts_hit(&rt, &tok, "StableIndexed").await;
1796 assert!(
1797 hits_after.contains(&entity.id),
1798 "still findable after props-only patch"
1799 );
1800 }
1801
1802 #[tokio::test]
1803 async fn merge_entity_rewires_edges() {
1804 let rt = rt();
1805 let tok = NamespaceToken::local();
1806 let a = rt
1807 .create_entity(&tok, "concept", None, "A", None, None, vec![])
1808 .await
1809 .unwrap();
1810 let b = rt
1811 .create_entity(&tok, "concept", None, "B", None, None, vec![])
1812 .await
1813 .unwrap();
1814 let c = rt
1815 .create_entity(&tok, "concept", None, "C", None, None, vec![])
1816 .await
1817 .unwrap();
1818 let d = rt
1819 .create_entity(&tok, "concept", None, "D", None, None, vec![])
1820 .await
1821 .unwrap();
1822
1823 rt.link(&tok, a.id, b.id, EdgeRelation::Extends, 1.0, None)
1825 .await
1826 .unwrap();
1827 rt.link(&tok, c.id, b.id, EdgeRelation::Extends, 1.0, None)
1828 .await
1829 .unwrap();
1830
1831 let summary = rt
1832 .merge_entity(&tok, d.id, b.id, EntityDedupMergePolicy::PreferInto, false)
1833 .await
1834 .unwrap();
1835
1836 assert_eq!(summary.kept_id, d.id);
1837 assert_eq!(summary.removed_id, b.id);
1838 assert_eq!(summary.edges_rewired, 2);
1839
1840 let a_neighbors = rt
1842 .neighbors(&tok, a.id, Direction::Out, None, None)
1843 .await
1844 .unwrap();
1845 assert_eq!(a_neighbors.len(), 1);
1846 assert_eq!(a_neighbors[0].node_id, d.id);
1847
1848 let c_neighbors = rt
1849 .neighbors(&tok, c.id, Direction::Out, None, None)
1850 .await
1851 .unwrap();
1852 assert_eq!(c_neighbors.len(), 1);
1853 assert_eq!(c_neighbors[0].node_id, d.id);
1854 }
1855
1856 #[tokio::test]
1857 async fn merge_entity_self_merge_rejected() {
1858 let rt = rt();
1859 let tok = NamespaceToken::local();
1860 let a = rt
1861 .create_entity(&tok, "concept", None, "A", None, None, vec![])
1862 .await
1863 .unwrap();
1864 let err = rt
1865 .merge_entity(&tok, a.id, a.id, EntityDedupMergePolicy::PreferInto, false)
1866 .await
1867 .unwrap_err();
1868 assert!(
1869 format!("{err:?}").contains("cannot merge an entity into itself"),
1870 "expected self-merge rejection, got: {err:?}"
1871 );
1872 }
1873
1874 #[tokio::test]
1875 async fn merge_entity_prefer_into_strategy() {
1876 let rt = rt();
1877 let tok = NamespaceToken::local();
1878 let into = rt
1879 .create_entity(
1880 &tok,
1881 "concept",
1882 None,
1883 "Into",
1884 None,
1885 Some(serde_json::json!({"a": 1})),
1886 vec![],
1887 )
1888 .await
1889 .unwrap();
1890 let from = rt
1891 .create_entity(
1892 &tok,
1893 "concept",
1894 None,
1895 "From",
1896 None,
1897 Some(serde_json::json!({"a": 2, "b": 3})),
1898 vec![],
1899 )
1900 .await
1901 .unwrap();
1902
1903 rt.merge_entity(
1904 &tok,
1905 into.id,
1906 from.id,
1907 EntityDedupMergePolicy::PreferInto,
1908 false,
1909 )
1910 .await
1911 .unwrap();
1912
1913 let kept = rt.get_entity(&tok, into.id).await.unwrap();
1914 let props = kept.properties.unwrap();
1915 assert_eq!(props["a"], 1);
1917 assert_eq!(props["b"], 3);
1918 }
1919
1920 #[tokio::test]
1921 async fn merge_entity_prefer_from_strategy() {
1922 let rt = rt();
1923 let tok = NamespaceToken::local();
1924 let into = rt
1925 .create_entity(
1926 &tok,
1927 "concept",
1928 None,
1929 "Into",
1930 None,
1931 Some(serde_json::json!({"a": 1})),
1932 vec![],
1933 )
1934 .await
1935 .unwrap();
1936 let from = rt
1937 .create_entity(
1938 &tok,
1939 "concept",
1940 None,
1941 "From",
1942 None,
1943 Some(serde_json::json!({"a": 2, "b": 3})),
1944 vec![],
1945 )
1946 .await
1947 .unwrap();
1948
1949 rt.merge_entity(
1950 &tok,
1951 into.id,
1952 from.id,
1953 EntityDedupMergePolicy::PreferFrom,
1954 false,
1955 )
1956 .await
1957 .unwrap();
1958
1959 let kept = rt.get_entity(&tok, into.id).await.unwrap();
1960 let props = kept.properties.unwrap();
1961 assert_eq!(props["a"], 2);
1963 assert_eq!(props["b"], 3);
1964 }
1965
1966 #[tokio::test]
1967 async fn merge_entity_union_strategy() {
1968 let rt = rt();
1969 let tok = NamespaceToken::local();
1970 let into = rt
1971 .create_entity(
1972 &tok,
1973 "concept",
1974 None,
1975 "Into",
1976 None,
1977 Some(serde_json::json!({"a": 1})),
1978 vec![],
1979 )
1980 .await
1981 .unwrap();
1982 let from = rt
1983 .create_entity(
1984 &tok,
1985 "concept",
1986 None,
1987 "From",
1988 None,
1989 Some(serde_json::json!({"a": 2, "b": 3})),
1990 vec![],
1991 )
1992 .await
1993 .unwrap();
1994
1995 rt.merge_entity(&tok, into.id, from.id, EntityDedupMergePolicy::Union, false)
1996 .await
1997 .unwrap();
1998
1999 let kept = rt.get_entity(&tok, into.id).await.unwrap();
2000 let props = kept.properties.unwrap();
2001 assert_eq!(props["a"], 1);
2003 assert_eq!(props["b"], 3);
2004 }
2005
2006 #[tokio::test]
2007 async fn merge_entity_unions_tags() {
2008 let rt = rt();
2009 let tok = NamespaceToken::local();
2010 let into = rt
2011 .create_entity(
2012 &tok,
2013 "concept",
2014 None,
2015 "Into",
2016 None,
2017 None,
2018 vec!["x".to_string(), "y".to_string()],
2019 )
2020 .await
2021 .unwrap();
2022 let from = rt
2023 .create_entity(
2024 &tok,
2025 "concept",
2026 None,
2027 "From",
2028 None,
2029 None,
2030 vec!["y".to_string(), "z".to_string()],
2031 )
2032 .await
2033 .unwrap();
2034
2035 rt.merge_entity(
2036 &tok,
2037 into.id,
2038 from.id,
2039 EntityDedupMergePolicy::PreferInto,
2040 false,
2041 )
2042 .await
2043 .unwrap();
2044
2045 let kept = rt.get_entity(&tok, into.id).await.unwrap();
2046 let mut tags = kept.tags.clone();
2047 tags.sort();
2048 assert_eq!(tags, vec!["x", "y", "z"]);
2049 }
2050
2051 #[tokio::test]
2052 async fn merge_entity_drops_self_loops() {
2053 let rt = rt();
2054 let tok = NamespaceToken::local();
2055 let a = rt
2056 .create_entity(&tok, "concept", None, "A", None, None, vec![])
2057 .await
2058 .unwrap();
2059 let b = rt
2060 .create_entity(&tok, "concept", None, "B", None, None, vec![])
2061 .await
2062 .unwrap();
2063
2064 rt.link(&tok, a.id, b.id, EdgeRelation::Extends, 1.0, None)
2066 .await
2067 .unwrap();
2068
2069 let summary = rt
2070 .merge_entity(&tok, a.id, b.id, EntityDedupMergePolicy::PreferInto, false)
2071 .await
2072 .unwrap();
2073
2074 assert_eq!(
2075 summary.edges_rewired, 0,
2076 "self-loop should be dropped, not rewired"
2077 );
2078
2079 let a_out = rt
2080 .neighbors(&tok, a.id, Direction::Out, None, None)
2081 .await
2082 .unwrap();
2083 assert!(a_out.is_empty(), "no self-loop should remain");
2084 }
2085
2086 #[test]
2089 fn union_tags_deduplicates() {
2090 let (tags, added) = union_tags(
2091 &["x".to_string(), "y".to_string()],
2092 &["y".to_string(), "z".to_string()],
2093 );
2094 let mut sorted = tags.clone();
2095 sorted.sort();
2096 assert_eq!(sorted, vec!["x", "y", "z"]);
2097 assert_eq!(added, 1);
2098 }
2099
2100 #[test]
2101 fn merge_properties_prefer_into_fills_missing_keys() {
2102 let a = serde_json::json!({"a": 1});
2103 let b = serde_json::json!({"a": 99, "b": 2});
2104 let (merged, added) =
2105 merge_properties(&Some(a), &Some(b), EntityDedupMergePolicy::PreferInto);
2106 let m = merged.unwrap();
2107 assert_eq!(m["a"], 1);
2108 assert_eq!(m["b"], 2);
2109 assert_eq!(added, 1);
2110 }
2111
2112 #[tokio::test]
2115 async fn merge_entity_tombstones_source_with_provenance() {
2116 let rt = rt();
2117 let tok = NamespaceToken::local();
2118 let into = rt
2119 .create_entity(&tok, "concept", None, "Into", None, None, vec![])
2120 .await
2121 .unwrap();
2122 let from = rt
2123 .create_entity(&tok, "concept", None, "From", None, None, vec![])
2124 .await
2125 .unwrap();
2126 let from_id = from.id;
2127
2128 rt.merge_entity(
2129 &tok,
2130 into.id,
2131 from_id,
2132 EntityDedupMergePolicy::PreferInto,
2133 false,
2134 )
2135 .await
2136 .unwrap();
2137
2138 assert!(
2140 rt.get_entity(&tok, from_id).await.is_err(),
2141 "tombstoned source should not be returned by get_entity"
2142 );
2143
2144 let pool = rt.backend().pool_arc();
2146 let (deleted_at, merged_into): (Option<i64>, Option<String>) =
2147 tokio::task::spawn_blocking(move || {
2148 let guard = pool.writer().unwrap();
2149 guard
2150 .conn()
2151 .query_row(
2152 "SELECT deleted_at, merged_into FROM entities WHERE id = ?1",
2153 [from_id.to_string()],
2154 |row| Ok((row.get(0)?, row.get(1)?)),
2155 )
2156 .unwrap()
2157 })
2158 .await
2159 .unwrap();
2160 assert!(
2161 deleted_at.is_some(),
2162 "tombstoned entity must have deleted_at set"
2163 );
2164 assert_eq!(
2165 merged_into.as_deref(),
2166 Some(into.id.to_string().as_str()),
2167 "merged_into must point to into_id"
2168 );
2169 }
2170
2171 #[tokio::test]
2172 async fn merge_note_same_kind_appends_content() {
2173 let rt = rt();
2174 let tok = NamespaceToken::local();
2175 let into = rt
2176 .create_note(
2177 &tok,
2178 "observation",
2179 None,
2180 "Into content",
2181 None,
2182 None,
2183 vec![],
2184 )
2185 .await
2186 .unwrap();
2187 let from = rt
2188 .create_note(
2189 &tok,
2190 "observation",
2191 None,
2192 "From content",
2193 None,
2194 None,
2195 vec![],
2196 )
2197 .await
2198 .unwrap();
2199 let from_id = from.id;
2200
2201 let summary = rt
2202 .merge_note(
2203 &tok,
2204 into.id,
2205 from_id,
2206 EntityDedupMergePolicy::PreferInto,
2207 ContentMergeStrategy::Append,
2208 false,
2209 )
2210 .await
2211 .unwrap();
2212
2213 assert_eq!(summary.kept_id, into.id);
2214 assert_eq!(summary.removed_id, from_id);
2215 assert!(summary.content_appended);
2216 assert!(!summary.dry_run);
2217
2218 let from_store = rt.notes(&tok).unwrap();
2220 assert!(
2221 from_store.get_note(from_id).await.unwrap().is_none(),
2222 "merged-from note should be soft-deleted"
2223 );
2224 }
2225
2226 #[tokio::test]
2227 async fn merge_note_different_kinds_rejected() {
2228 let rt = rt();
2229 let tok = NamespaceToken::local();
2230 let into = rt
2231 .create_note(&tok, "observation", None, "Into", None, None, vec![])
2232 .await
2233 .unwrap();
2234 let from = rt
2235 .create_note(&tok, "decision", None, "From", None, None, vec![])
2236 .await
2237 .unwrap();
2238
2239 let result = rt
2240 .merge_note(
2241 &tok,
2242 into.id,
2243 from.id,
2244 EntityDedupMergePolicy::PreferInto,
2245 ContentMergeStrategy::Append,
2246 false,
2247 )
2248 .await;
2249 assert!(result.is_err(), "merging different note kinds must fail");
2250 }
2251
2252 #[tokio::test]
2253 async fn merge_note_dry_run_leaves_notes_unchanged() {
2254 let rt = rt();
2255 let tok = NamespaceToken::local();
2256 let into = rt
2257 .create_note(
2258 &tok,
2259 "observation",
2260 None,
2261 "Into content",
2262 None,
2263 None,
2264 vec![],
2265 )
2266 .await
2267 .unwrap();
2268 let from = rt
2269 .create_note(
2270 &tok,
2271 "observation",
2272 None,
2273 "From content",
2274 None,
2275 None,
2276 vec![],
2277 )
2278 .await
2279 .unwrap();
2280 let into_id = into.id;
2281 let from_id = from.id;
2282
2283 let summary = rt
2284 .merge_note(
2285 &tok,
2286 into_id,
2287 from_id,
2288 EntityDedupMergePolicy::PreferInto,
2289 ContentMergeStrategy::Append,
2290 true,
2291 )
2292 .await
2293 .unwrap();
2294
2295 assert!(summary.dry_run);
2296
2297 let store = rt.notes(&tok).unwrap();
2299 let into_after = store.get_note(into_id).await.unwrap().unwrap();
2300 let from_after = store.get_note(from_id).await.unwrap().unwrap();
2301 assert_eq!(
2302 into_after.content, "Into content",
2303 "dry_run must not mutate into-note"
2304 );
2305 assert_eq!(
2306 from_after.content, "From content",
2307 "dry_run must not mutate from-note"
2308 );
2309 }
2310
2311 #[tokio::test]
2312 async fn update_edge_updates_properties() {
2313 use khive_storage::EdgeRelation;
2314 let rt = rt();
2315 let tok = NamespaceToken::local();
2316 let a = rt
2317 .create_entity(&tok, "concept", None, "A", None, None, vec![])
2318 .await
2319 .unwrap();
2320 let b = rt
2321 .create_entity(&tok, "concept", None, "B", None, None, vec![])
2322 .await
2323 .unwrap();
2324 let edge = rt
2325 .link(&tok, a.id, b.id, EdgeRelation::Extends, 0.5, None)
2326 .await
2327 .unwrap();
2328 let edge_id: Uuid = edge.id.into();
2329
2330 let updated = rt
2331 .update_edge(
2332 &tok,
2333 edge_id,
2334 EdgePatch {
2335 properties: Some(serde_json::json!({"source": "manual"})),
2336 ..Default::default()
2337 },
2338 )
2339 .await
2340 .unwrap();
2341
2342 assert_eq!(updated.metadata.as_ref().unwrap()["source"], "manual");
2343 assert!((updated.weight - 0.5).abs() < 0.001, "weight unchanged");
2344 }
2345
2346 #[tokio::test]
2351 async fn merge_entity_survives_shared_edge_to_third_party() {
2352 use khive_storage::EdgeRelation;
2353 let rt = rt();
2354 let tok = NamespaceToken::local();
2355
2356 let a = rt
2359 .create_entity(&tok, "concept", None, "A", None, None, vec![])
2360 .await
2361 .unwrap();
2362 let b = rt
2363 .create_entity(&tok, "concept", None, "B", None, None, vec![])
2364 .await
2365 .unwrap();
2366 let shared = rt
2367 .create_entity(&tok, "concept", None, "Shared", None, None, vec![])
2368 .await
2369 .unwrap();
2370
2371 rt.link(&tok, a.id, shared.id, EdgeRelation::Extends, 1.0, None)
2374 .await
2375 .unwrap();
2376 rt.link(&tok, b.id, shared.id, EdgeRelation::Extends, 1.0, None)
2377 .await
2378 .unwrap();
2379
2380 let summary = rt
2382 .merge_entity(
2383 &tok,
2384 a.id,
2385 b.id,
2386 crate::EntityDedupMergePolicy::PreferInto,
2387 false,
2388 )
2389 .await
2390 .expect(
2391 "C1: merge must succeed even when both entities share an edge to a third party",
2392 );
2393
2394 assert_eq!(summary.kept_id, a.id);
2395 assert_eq!(summary.removed_id, b.id);
2396 let a_edges = rt
2404 .list_edges(
2405 &tok,
2406 crate::EdgeListFilter {
2407 source_id: Some(a.id),
2408 target_id: Some(shared.id),
2409 relations: vec![EdgeRelation::Extends],
2410 ..Default::default()
2411 },
2412 10,
2413 )
2414 .await
2415 .unwrap();
2416 assert_eq!(
2417 a_edges.len(),
2418 1,
2419 "C1: exactly one live A→shared Extends edge must exist after merge; got: {a_edges:?}"
2420 );
2421
2422 let b_after = rt.entities(&tok).unwrap().get_entity(b.id).await.unwrap();
2425 assert!(
2426 b_after.is_none(),
2427 "C3: from_entity must be tombstoned (get_entity returns None for deleted) after merge; got: {b_after:?}"
2428 );
2429 }
2430
2431 #[tokio::test]
2435 async fn merge_entity_cross_kind_rejected_at_runtime() {
2436 let rt = rt();
2437 let tok = NamespaceToken::local();
2438
2439 let concept = rt
2440 .create_entity(&tok, "concept", None, "H2Concept", None, None, vec![])
2441 .await
2442 .unwrap();
2443 let project = rt
2444 .create_entity(&tok, "project", None, "H2Project", None, None, vec![])
2445 .await
2446 .unwrap();
2447
2448 let err = rt
2450 .merge_entity(
2451 &tok,
2452 concept.id,
2453 project.id,
2454 crate::EntityDedupMergePolicy::PreferInto,
2455 false,
2456 )
2457 .await
2458 .expect_err("H2: cross-kind merge must be rejected by runtime");
2459 assert!(
2460 matches!(err, crate::RuntimeError::InvalidInput(_)),
2461 "H2: expected InvalidInput, got: {err:?}"
2462 );
2463
2464 let concept_after = rt.get_entity(&tok, concept.id).await;
2466 let project_after = rt.get_entity(&tok, project.id).await;
2467 assert!(
2468 concept_after.is_ok(),
2469 "H2: concept must remain live after rejected merge; got: {concept_after:?}"
2470 );
2471 assert!(
2472 project_after.is_ok(),
2473 "H2: project must remain live after rejected merge; got: {project_after:?}"
2474 );
2475 }
2476
2477 #[tokio::test]
2479 async fn merge_entity_same_kind_succeeds() {
2480 let rt = rt();
2481 let tok = NamespaceToken::local();
2482
2483 let c1 = rt
2484 .create_entity(&tok, "concept", None, "Concept1", None, None, vec![])
2485 .await
2486 .unwrap();
2487 let c2 = rt
2488 .create_entity(&tok, "concept", None, "Concept2", None, None, vec![])
2489 .await
2490 .unwrap();
2491
2492 let summary = rt
2493 .merge_entity(
2494 &tok,
2495 c1.id,
2496 c2.id,
2497 crate::EntityDedupMergePolicy::PreferInto,
2498 false,
2499 )
2500 .await
2501 .expect("same-kind merge must succeed");
2502 assert_eq!(summary.kept_id, c1.id);
2503 assert_eq!(summary.removed_id, c2.id);
2504
2505 let c2_after = rt.entities(&tok).unwrap().get_entity(c2.id).await.unwrap();
2507 assert!(c2_after.is_none(), "from_entity must be tombstoned");
2508 }
2509
2510 #[tokio::test]
2513 async fn merge_note_cross_namespace_either_id_returns_not_found() {
2514 use crate::error::RuntimeError;
2515 use crate::Namespace;
2516
2517 let rt = rt();
2518 let ns_a = NamespaceToken::for_namespace(Namespace::parse("ns-a").unwrap());
2519 let ns_b = NamespaceToken::for_namespace(Namespace::parse("ns-b").unwrap());
2520
2521 let into_a = rt
2522 .create_note(&ns_a, "observation", None, "Into A", None, None, vec![])
2523 .await
2524 .unwrap();
2525 let from_a = rt
2526 .create_note(&ns_a, "observation", None, "From A", None, None, vec![])
2527 .await
2528 .unwrap();
2529 let note_b = rt
2530 .create_note(&ns_b, "observation", None, "Note B", None, None, vec![])
2531 .await
2532 .unwrap();
2533
2534 let foreign_into = rt
2536 .merge_note(
2537 &ns_a,
2538 note_b.id,
2539 from_a.id,
2540 EntityDedupMergePolicy::PreferInto,
2541 ContentMergeStrategy::Append,
2542 false,
2543 )
2544 .await;
2545 assert!(
2546 matches!(foreign_into, Err(RuntimeError::NotFound(_))),
2547 "foreign into_id must be denied before merge, got {foreign_into:?}"
2548 );
2549
2550 let foreign_from = rt
2552 .merge_note(
2553 &ns_a,
2554 into_a.id,
2555 note_b.id,
2556 EntityDedupMergePolicy::PreferInto,
2557 ContentMergeStrategy::Append,
2558 false,
2559 )
2560 .await;
2561 assert!(
2562 matches!(foreign_from, Err(RuntimeError::NotFound(_))),
2563 "foreign from_id must be denied before merge, got {foreign_from:?}"
2564 );
2565 }
2566
2567 #[tokio::test]
2570 async fn update_entity_cross_namespace_returns_not_found_and_preserves_source() {
2571 use crate::error::RuntimeError;
2572 use crate::Namespace;
2573
2574 let rt = rt();
2575 let ns_a = NamespaceToken::for_namespace(Namespace::parse("ns-a").unwrap());
2576 let ns_b = NamespaceToken::for_namespace(Namespace::parse("ns-b").unwrap());
2577
2578 let entity = rt
2579 .create_entity(
2580 &ns_a,
2581 "concept",
2582 None,
2583 "Alpha",
2584 Some("original"),
2585 None,
2586 vec![],
2587 )
2588 .await
2589 .unwrap();
2590
2591 let err = rt
2592 .update_entity(
2593 &ns_b,
2594 entity.id,
2595 EntityPatch {
2596 name: Some("Compromised".into()),
2597 ..Default::default()
2598 },
2599 )
2600 .await;
2601
2602 assert!(
2603 matches!(err, Err(RuntimeError::NotFound(_))),
2604 "cross-namespace update must return opaque NotFound, got {err:?}"
2605 );
2606
2607 let after = rt.get_entity(&ns_a, entity.id).await.unwrap();
2608 assert_eq!(
2609 after.name, "Alpha",
2610 "foreign update must not mutate source row"
2611 );
2612 assert_eq!(after.description.as_deref(), Some("original"));
2613 }
2614
2615 #[tokio::test]
2616 async fn merge_entity_cross_namespace_either_id_returns_not_found() {
2617 use crate::error::RuntimeError;
2618 use crate::Namespace;
2619
2620 let rt = rt();
2621 let ns_a = NamespaceToken::for_namespace(Namespace::parse("ns-a").unwrap());
2622 let ns_b = NamespaceToken::for_namespace(Namespace::parse("ns-b").unwrap());
2623
2624 let into_a = rt
2625 .create_entity(&ns_a, "concept", None, "Into A", None, None, vec![])
2626 .await
2627 .unwrap();
2628 let from_a = rt
2629 .create_entity(&ns_a, "concept", None, "From A", None, None, vec![])
2630 .await
2631 .unwrap();
2632 let foreign_b = rt
2633 .create_entity(&ns_b, "concept", None, "Foreign B", None, None, vec![])
2634 .await
2635 .unwrap();
2636
2637 let foreign_into = rt
2639 .merge_entity(
2640 &ns_a,
2641 foreign_b.id,
2642 from_a.id,
2643 EntityDedupMergePolicy::PreferInto,
2644 false,
2645 )
2646 .await;
2647 assert!(
2648 matches!(foreign_into, Err(RuntimeError::NotFound(_))),
2649 "foreign into_id must be denied before merge, got {foreign_into:?}"
2650 );
2651
2652 let foreign_from = rt
2654 .merge_entity(
2655 &ns_a,
2656 into_a.id,
2657 foreign_b.id,
2658 EntityDedupMergePolicy::PreferInto,
2659 false,
2660 )
2661 .await;
2662 assert!(
2663 matches!(foreign_from, Err(RuntimeError::NotFound(_))),
2664 "foreign from_id must be denied before merge, got {foreign_from:?}"
2665 );
2666
2667 assert!(rt.get_entity(&ns_a, into_a.id).await.is_ok());
2669 assert!(rt.get_entity(&ns_a, from_a.id).await.is_ok());
2670 assert!(rt.get_entity(&ns_b, foreign_b.id).await.is_ok());
2671 }
2672}