1use std::collections::HashSet;
12
13use serde::{Deserialize, Serialize};
14use serde_json::Value;
15use uuid::Uuid;
16
17use khive_db::SqliteError;
18use khive_storage::note::Note;
19use khive_storage::types::{EdgeFilter, TextDocument};
20use khive_storage::{EdgeRelation, Entity, SubstrateKind};
21use khive_types::EventKind;
22use rusqlite::OptionalExtension;
23
24use crate::error::{RuntimeError, RuntimeResult};
25use crate::operations::canonical_edge_endpoints;
26use crate::runtime::{KhiveRuntime, NamespaceToken};
27
28#[derive(Clone, Debug, Default)]
50pub struct EntityPatch {
51 pub name: Option<String>,
52 pub description: Option<Option<String>>,
53 pub properties: Option<Value>,
54 pub tags: Option<Vec<String>>,
55}
56
57#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
59#[serde(rename_all = "snake_case")]
60pub enum EntityDedupMergePolicy {
61 #[default]
64 PreferInto,
65 PreferFrom,
67 Union,
69}
70
71#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
73#[serde(rename_all = "snake_case")]
74pub enum ContentMergeStrategy {
75 #[default]
76 Append,
77 PreferInto,
78 PreferFrom,
79}
80
81#[derive(Clone, Debug, Serialize, Deserialize)]
83pub struct MergeSummary {
84 pub kept_id: Uuid,
85 pub removed_id: Uuid,
86 pub edges_rewired: usize,
87 pub properties_merged: usize,
88 pub tags_unioned: usize,
89 pub content_appended: bool,
90 pub dry_run: bool,
91}
92
93#[derive(Clone, Debug, Default)]
98pub struct EdgePatch {
99 pub relation: Option<EdgeRelation>,
100 pub weight: Option<f64>,
101 pub properties: Option<Value>,
102}
103
104#[derive(Clone, Debug, Default)]
111pub struct NotePatch {
112 pub name: Option<Option<String>>,
113 pub content: Option<String>,
114 pub salience: Option<Option<f64>>,
115 pub decay_factor: Option<Option<f64>>,
116 pub properties: Option<Value>,
117 pub(crate) kind_status: Option<String>,
118}
119
120impl NotePatch {
121 pub fn new(
124 name: Option<Option<String>>,
125 content: Option<String>,
126 salience: Option<Option<f64>>,
127 decay_factor: Option<Option<f64>>,
128 properties: Option<Value>,
129 ) -> Self {
130 Self {
131 name,
132 content,
133 salience,
134 decay_factor,
135 properties,
136 kind_status: None,
137 }
138 }
139}
140
141#[derive(Clone, Debug, Default)]
143pub struct EdgeListFilter {
144 pub source_id: Option<Uuid>,
145 pub target_id: Option<Uuid>,
146 pub relations: Vec<EdgeRelation>,
148 pub min_weight: Option<f64>,
149 pub max_weight: Option<f64>,
150}
151
152impl From<EdgeListFilter> for EdgeFilter {
153 fn from(f: EdgeListFilter) -> Self {
154 EdgeFilter {
155 source_ids: f.source_id.into_iter().collect(),
156 target_ids: f.target_id.into_iter().collect(),
157 relations: f.relations,
158 min_weight: f.min_weight,
159 max_weight: f.max_weight,
160 ..Default::default()
161 }
162 }
163}
164
165impl KhiveRuntime {
170 pub async fn update_entity(
178 &self,
179 token: &NamespaceToken,
180 id: Uuid,
181 patch: EntityPatch,
182 ) -> RuntimeResult<Entity> {
183 if let Some(ref name) = patch.name {
185 crate::secret_gate::check(name)?;
186 }
187 if let Some(Some(ref desc)) = patch.description {
188 crate::secret_gate::check(desc)?;
189 }
190 if let Some(ref props) = patch.properties {
191 crate::secret_gate::check_json(props)?;
192 }
193 if let Some(ref tags) = patch.tags {
194 crate::secret_gate::check_tags(tags)?;
195 }
196 let store = self.entities(token)?;
197 let mut entity = store
198 .get_entity(id)
199 .await?
200 .ok_or_else(|| RuntimeError::NotFound(format!("entity {id}")))?;
201
202 Self::ensure_namespace(&entity.namespace, token.namespace().as_str())?;
203
204 let mut text_changed = false;
205 let mut changed_fields: Vec<&'static str> = Vec::new();
206
207 if let Some(name) = patch.name {
208 text_changed |= entity.name != name;
209 entity.name = name;
210 changed_fields.push("name");
211 }
212 if let Some(desc_patch) = patch.description {
213 text_changed |= entity.description != desc_patch;
214 entity.description = desc_patch;
215 changed_fields.push("description");
216 }
217 if let Some(props) = patch.properties {
218 let (merged, _) = merge_properties(
219 &entity.properties,
220 &Some(props),
221 EntityDedupMergePolicy::PreferFrom,
222 );
223 entity.properties = merged;
224 changed_fields.push("properties");
225 }
226 if let Some(tags) = patch.tags {
227 entity.tags = tags;
228 changed_fields.push("tags");
229 }
230
231 entity.updated_at = chrono::Utc::now().timestamp_micros();
232 store.upsert_entity(entity.clone()).await?;
233
234 if text_changed {
235 self.reindex_entity(token, &entity).await?;
236 }
237
238 let event_store = self.events(token)?;
239 let event = khive_storage::event::Event::new(
240 entity.namespace.clone(),
241 "update",
242 EventKind::EntityUpdated,
243 SubstrateKind::Entity,
244 "",
245 )
246 .with_target(entity.id)
247 .with_payload(serde_json::json!({
248 "id": entity.id,
249 "namespace": entity.namespace,
250 "changed_fields": changed_fields,
251 }));
252 event_store.append_event(event).await.map_err(|e| {
253 RuntimeError::Internal(format!("update_entity: event store write failed: {e}"))
254 })?;
255
256 Ok(entity)
257 }
258
259 pub async fn merge_entity(
272 &self,
273 token: &NamespaceToken,
274 into_id: Uuid,
275 from_id: Uuid,
276 strategy: EntityDedupMergePolicy,
277 dry_run: bool,
278 ) -> RuntimeResult<MergeSummary> {
279 if into_id == from_id {
280 return Err(RuntimeError::InvalidInput(
281 "cannot merge an entity into itself".into(),
282 ));
283 }
284 {
288 let into_entity = self.get_entity(token, into_id).await?;
289 let from_entity = self.get_entity(token, from_id).await?;
290 if into_entity.kind != from_entity.kind {
291 return Err(RuntimeError::InvalidInput(format!(
292 "cannot merge entities of different kinds: into={} ({}), from={} ({}); \
293 merge requires both entities to share the same kind",
294 into_id, into_entity.kind, from_id, from_entity.kind
295 )));
296 }
297 }
298 let ns = token.namespace().as_str().to_owned();
299 let sanitized_ns: String = ns
300 .chars()
301 .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
302 .collect();
303 let fts_table = format!("fts_entities_{}", sanitized_ns);
304 let vec_table = self.config().embedding_model.map(|model| {
305 let key: String = model
306 .to_string()
307 .chars()
308 .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
309 .collect();
310 format!("vec_{}", key)
311 });
312
313 let _ = self.entities(token)?;
316 let _ = self.graph(token)?;
317 let _ = self.text(token)?;
318 if self.config().embedding_model.is_some() {
319 let _ = self.vectors(token)?;
320 }
321
322 let pool = self.backend().pool_arc();
323
324 let (summary, updated_entity) = tokio::task::spawn_blocking(move || {
325 let guard = pool.writer()?;
326 guard.transaction(|conn| {
327 merge_entity_sql(
328 conn, ns, fts_table, vec_table, into_id, from_id, strategy, dry_run,
329 )
330 })
331 })
332 .await
333 .map_err(|e| RuntimeError::Internal(e.to_string()))??;
334
335 if !dry_run && self.config().embedding_model.is_some() {
338 self.reindex_entity(token, &updated_entity).await?;
339 }
340
341 let event_store = self.events(token)?;
342 let policy_str = match strategy {
345 EntityDedupMergePolicy::PreferInto => "prefer_into",
346 EntityDedupMergePolicy::PreferFrom => "prefer_from",
347 EntityDedupMergePolicy::Union => "union",
348 };
349 let event = khive_storage::event::Event::new(
350 updated_entity.namespace.clone(),
351 "merge",
352 EventKind::EntityMerged,
353 SubstrateKind::Entity,
354 "",
355 )
356 .with_target(summary.kept_id)
357 .with_payload(serde_json::json!({
358 "into_id": summary.kept_id,
359 "from_id": summary.removed_id,
360 "policy": policy_str,
361 "edges_rewired": summary.edges_rewired,
362 }));
363 event_store.append_event(event).await.map_err(|e| {
364 RuntimeError::Internal(format!("merge_entity: event store write failed: {e}"))
365 })?;
366
367 Ok(summary)
368 }
369
370 pub(crate) async fn reindex_entity(
378 &self,
379 token: &NamespaceToken,
380 entity: &Entity,
381 ) -> RuntimeResult<()> {
382 let ns = entity.namespace.clone();
384 let doc = entity_fts_document(entity);
385 let embed_body = doc.body.clone();
386 self.text(token)?.upsert_document(doc).await?;
387
388 if self.config().embedding_model.is_some() {
389 let vector = self.embed_document(&embed_body).await?;
390 self.vectors(token)?
391 .insert(
392 entity.id,
393 SubstrateKind::Entity,
394 &ns,
395 "entity.body",
396 vec![vector],
397 )
398 .await?;
399 }
400
401 Ok(())
402 }
403
404 pub(crate) async fn remove_from_indexes(
406 &self,
407 token: &NamespaceToken,
408 id: Uuid,
409 ) -> RuntimeResult<()> {
410 let ns = token.namespace().as_str().to_owned();
411 self.text(token)?.delete_document(&ns, id).await?;
412 if self.config().embedding_model.is_some() {
413 self.vectors(token)?.delete(id).await?;
414 }
415 Ok(())
416 }
417
418 pub(crate) async fn reindex_note(
420 &self,
421 token: &NamespaceToken,
422 note: &khive_storage::note::Note,
423 ) -> RuntimeResult<()> {
424 self.text_for_notes(token)?
425 .upsert_document(note_fts_document(note))
426 .await?;
427
428 if self.config().embedding_model.is_some() {
429 let ns = note.namespace.clone();
430 let vector = self.embed_document(¬e.content).await?;
431 self.vectors(token)?
432 .insert(
433 note.id,
434 SubstrateKind::Note,
435 &ns,
436 "note.content",
437 vec![vector],
438 )
439 .await?;
440 }
441 Ok(())
442 }
443
444 pub async fn update_note(
446 &self,
447 token: &NamespaceToken,
448 id: Uuid,
449 patch: NotePatch,
450 ) -> RuntimeResult<khive_storage::note::Note> {
451 if let Some(ref content) = patch.content {
453 crate::secret_gate::check(content)?;
454 }
455 if let Some(Some(ref name)) = patch.name {
456 crate::secret_gate::check(name)?;
457 }
458 if let Some(ref props) = patch.properties {
459 crate::secret_gate::check_json(props)?;
460 }
461 let store = self.notes(token)?;
462 let mut note = store
463 .get_note(id)
464 .await?
465 .ok_or_else(|| RuntimeError::NotFound(format!("note {id}")))?;
466
467 Self::ensure_namespace(¬e.namespace, token.namespace().as_str())?;
468
469 let mut text_changed = false;
470
471 if let Some(name_patch) = patch.name {
472 text_changed |= note.name != name_patch;
473 note.name = name_patch;
474 }
475 if let Some(content) = patch.content {
476 text_changed |= note.content != content;
477 note.content = content;
478 }
479 if let Some(salience_patch) = patch.salience {
480 if let Some(s) = salience_patch {
483 if !s.is_finite() || !(0.0..=1.0).contains(&s) {
484 return Err(crate::RuntimeError::InvalidInput(format!(
485 "salience must be a finite value in [0.0, 1.0]; got {s}"
486 )));
487 }
488 }
489 note.salience = salience_patch;
490 }
491 if let Some(decay_patch) = patch.decay_factor {
492 if let Some(d) = decay_patch {
494 if !d.is_finite() || d < 0.0 {
495 return Err(crate::RuntimeError::InvalidInput(format!(
496 "decay_factor must be a finite value >= 0.0; got {d}"
497 )));
498 }
499 }
500 note.decay_factor = decay_patch;
501 }
502 if let Some(props) = patch.properties {
503 let (merged, _) = merge_properties(
504 ¬e.properties,
505 &Some(props),
506 EntityDedupMergePolicy::PreferFrom,
507 );
508 note.properties = merged;
509 }
510 if let Some(status) = patch.kind_status {
511 note.status = status;
512 }
513
514 note.updated_at = chrono::Utc::now().timestamp_micros();
515 store.upsert_note(note.clone()).await?;
516
517 if text_changed {
518 self.reindex_note(token, ¬e).await?;
519 }
520
521 Ok(note)
522 }
523
524 pub async fn merge_note(
533 &self,
534 token: &NamespaceToken,
535 into_id: Uuid,
536 from_id: Uuid,
537 strategy: EntityDedupMergePolicy,
538 content_strategy: ContentMergeStrategy,
539 dry_run: bool,
540 ) -> RuntimeResult<MergeSummary> {
541 if into_id == from_id {
542 return Err(RuntimeError::InvalidInput(
543 "cannot merge a note into itself".into(),
544 ));
545 }
546 let ns = token.namespace().as_str().to_string();
547 let sanitized_ns: String = ns
548 .chars()
549 .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
550 .collect();
551 let fts_table = format!("fts_notes_{}", sanitized_ns);
552 let vec_table = self.config().embedding_model.map(|model| {
553 let key: String = model
554 .to_string()
555 .chars()
556 .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
557 .collect();
558 format!("vec_{}", key)
559 });
560
561 let note_store = self.notes(token)?;
562 let into_note = note_store
563 .get_note(into_id)
564 .await?
565 .ok_or_else(|| RuntimeError::NotFound("not found in this namespace".into()))?;
566 Self::ensure_namespace(&into_note.namespace, &ns)?;
567
568 let from_note = note_store
569 .get_note(from_id)
570 .await?
571 .ok_or_else(|| RuntimeError::NotFound("not found in this namespace".into()))?;
572 Self::ensure_namespace(&from_note.namespace, &ns)?;
573
574 let _ = self.graph(token)?;
575 let _ = self.text_for_notes(token)?;
576 if self.config().embedding_model.is_some() {
577 let _ = self.vectors(token)?;
578 }
579
580 let pool = self.backend().pool_arc();
581 let (summary, updated_note) = tokio::task::spawn_blocking(move || {
582 let guard = pool.writer()?;
583 guard.transaction(|conn| {
584 merge_note_sql(
585 conn,
586 ns,
587 fts_table,
588 vec_table,
589 into_id,
590 from_id,
591 strategy,
592 content_strategy,
593 dry_run,
594 )
595 })
596 })
597 .await
598 .map_err(|e| RuntimeError::Internal(e.to_string()))??;
599
600 if !dry_run && self.config().embedding_model.is_some() {
601 self.reindex_note(token, &updated_note).await?;
602 }
603 Ok(summary)
604 }
605}
606
607pub fn entity_fts_document(entity: &Entity) -> TextDocument {
624 let body = match &entity.description {
625 Some(d) if !d.is_empty() => format!("{} {}", entity.name, d),
626 _ => entity.name.clone(),
627 };
628 let updated_at =
629 chrono::DateTime::from_timestamp_micros(entity.updated_at).unwrap_or_else(chrono::Utc::now);
630 TextDocument {
631 subject_id: entity.id,
632 kind: SubstrateKind::Entity,
633 title: Some(entity.name.clone()),
634 body,
635 tags: entity.tags.clone(),
636 namespace: entity.namespace.clone(),
637 metadata: entity.properties.clone(),
638 updated_at,
639 }
640}
641
642pub fn note_fts_document(note: &Note) -> TextDocument {
655 let body = match ¬e.name {
656 Some(n) => format!("{n} {}", note.content),
657 None => note.content.clone(),
658 };
659 let updated_at =
660 chrono::DateTime::from_timestamp_micros(note.updated_at).unwrap_or_else(chrono::Utc::now);
661 TextDocument {
662 subject_id: note.id,
663 kind: SubstrateKind::Note,
664 title: note.name.clone(),
665 body,
666 tags: vec![],
667 namespace: note.namespace.clone(),
668 metadata: note.properties.clone(),
669 updated_at,
670 }
671}
672
673pub(crate) struct NoteFtsScalars {
679 pub title: String,
682 pub body: String,
683 pub tags: String,
685 pub metadata: Option<String>,
687 pub updated_at_micros: i64,
689}
690
691pub(crate) fn note_fts_scalars(note: &Note) -> NoteFtsScalars {
696 let doc = note_fts_document(note);
697 NoteFtsScalars {
698 title: doc.title.unwrap_or_default(),
699 body: doc.body,
700 tags: "[]".to_string(),
701 metadata: doc
702 .metadata
703 .as_ref()
704 .map(|v| serde_json::to_string(v).unwrap_or_default()),
705 updated_at_micros: doc.updated_at.timestamp_micros(),
706 }
707}
708
709fn read_merge_entity(
715 conn: &rusqlite::Connection,
716 id: Uuid,
717 namespace: &str,
718) -> Result<Entity, SqliteError> {
719 let id_str = id.to_string();
720 let mut stmt = conn.prepare(
721 "SELECT id, namespace, kind, entity_type, name, description, properties, tags, \
722 created_at, updated_at, deleted_at, merged_into, merge_event_id \
723 FROM entities WHERE id = ?1 AND deleted_at IS NULL",
724 )?;
725 let mut rows = stmt.query(rusqlite::params![id_str])?;
726 let row = rows
727 .next()?
728 .ok_or_else(|| SqliteError::InvalidData(format!("entity {id} not found")))?;
729
730 let id_s: String = row.get(0)?;
731 let ns: String = row.get(1)?;
732 let kind: String = row.get(2)?;
733 let entity_type: Option<String> = row.get(3)?;
734 let name: String = row.get(4)?;
735 let description: Option<String> = row.get(5)?;
736 let properties_str: Option<String> = row.get(6)?;
737 let tags_str: String = row.get(7)?;
738 let created_at: i64 = row.get(8)?;
739 let updated_at: i64 = row.get(9)?;
740 let deleted_at: Option<i64> = row.get(10)?;
741 let merged_into_str: Option<String> = row.get(11)?;
742 let merge_event_id_str: Option<String> = row.get(12)?;
743
744 if ns != namespace {
745 return Err(SqliteError::InvalidData(format!(
746 "entity {id} belongs to namespace '{ns}', not '{namespace}'"
747 )));
748 }
749
750 let entity_id = Uuid::parse_str(&id_s).map_err(|e| SqliteError::InvalidData(e.to_string()))?;
751 let properties: Option<Value> = properties_str
752 .map(|s| {
753 serde_json::from_str::<Value>(&s).map_err(|e| SqliteError::InvalidData(e.to_string()))
754 })
755 .transpose()?;
756 let tags: Vec<String> =
757 serde_json::from_str(&tags_str).map_err(|e| SqliteError::InvalidData(e.to_string()))?;
758 let merged_into = merged_into_str
759 .as_deref()
760 .map(Uuid::parse_str)
761 .transpose()
762 .map_err(|e| SqliteError::InvalidData(e.to_string()))?;
763 let merge_event_id = merge_event_id_str
764 .as_deref()
765 .map(Uuid::parse_str)
766 .transpose()
767 .map_err(|e| SqliteError::InvalidData(e.to_string()))?;
768
769 Ok(Entity {
770 id: entity_id,
771 namespace: ns,
772 kind,
773 entity_type,
774 name,
775 description,
776 properties,
777 tags,
778 created_at,
779 updated_at,
780 deleted_at,
781 merged_into,
782 merge_event_id,
783 })
784}
785
786#[allow(clippy::too_many_arguments)]
797fn merge_entity_sql(
798 conn: &rusqlite::Connection,
799 namespace: String,
800 fts_table: String,
801 vec_table: Option<String>,
802 into_id: Uuid,
803 from_id: Uuid,
804 strategy: EntityDedupMergePolicy,
805 dry_run: bool,
806) -> Result<(MergeSummary, Entity), SqliteError> {
807 let into_entity = read_merge_entity(conn, into_id, &namespace)?;
808 let from_entity = read_merge_entity(conn, from_id, &namespace)?;
809
810 #[allow(dead_code)]
815 struct EdgeRow {
816 id: Uuid,
817 source_id: Uuid,
818 target_id: Uuid,
819 relation: String,
820 weight: f64,
821 created_at: i64,
822 updated_at: i64,
823 deleted_at: Option<i64>,
824 target_backend: Option<String>,
825 metadata: Option<String>,
826 }
827
828 let parse_id =
829 |s: String| Uuid::parse_str(&s).map_err(|e| SqliteError::InvalidData(e.to_string()));
830
831 let from_str = from_id.to_string();
832
833 let mut outbound: Vec<EdgeRow> = Vec::new();
834 {
835 let mut stmt = conn.prepare(
836 "SELECT id, source_id, target_id, relation, weight, created_at, \
837 updated_at, deleted_at, target_backend, metadata \
838 FROM graph_edges WHERE namespace = ?1 AND source_id = ?2",
839 )?;
840 let mut rows = stmt.query(rusqlite::params![&namespace, &from_str])?;
841 while let Some(row) = rows.next()? {
842 outbound.push(EdgeRow {
843 id: parse_id(row.get(0)?)?,
844 source_id: parse_id(row.get(1)?)?,
845 target_id: parse_id(row.get(2)?)?,
846 relation: row.get(3)?,
847 weight: row.get(4)?,
848 created_at: row.get(5)?,
849 updated_at: row.get(6)?,
850 deleted_at: row.get(7)?,
851 target_backend: row.get(8)?,
852 metadata: row.get(9)?,
853 });
854 }
855 }
856
857 let mut inbound: Vec<EdgeRow> = Vec::new();
858 {
859 let mut stmt = conn.prepare(
860 "SELECT id, source_id, target_id, relation, weight, created_at, \
861 updated_at, deleted_at, target_backend, metadata \
862 FROM graph_edges WHERE namespace = ?1 AND target_id = ?2",
863 )?;
864 let mut rows = stmt.query(rusqlite::params![&namespace, &from_str])?;
865 while let Some(row) = rows.next()? {
866 inbound.push(EdgeRow {
867 id: parse_id(row.get(0)?)?,
868 source_id: parse_id(row.get(1)?)?,
869 target_id: parse_id(row.get(2)?)?,
870 relation: row.get(3)?,
871 weight: row.get(4)?,
872 created_at: row.get(5)?,
873 updated_at: row.get(6)?,
874 deleted_at: row.get(7)?,
875 target_backend: row.get(8)?,
876 metadata: row.get(9)?,
877 });
878 }
879 }
880
881 let mut seen: HashSet<Uuid> = HashSet::new();
883 let mut all_edges: Vec<EdgeRow> = Vec::new();
884 for edge in outbound.into_iter().chain(inbound) {
885 if seen.insert(edge.id) {
886 all_edges.push(edge);
887 }
888 }
889
890 let (merged_props, properties_merged) =
892 merge_properties(&into_entity.properties, &from_entity.properties, strategy);
893 let merged_name = merge_string_field(&into_entity.name, &from_entity.name, strategy);
894 let merged_description =
895 merge_option_string_field(&into_entity.description, &from_entity.description, strategy);
896 let (merged_tags, tags_unioned) = union_tags(&into_entity.tags, &from_entity.tags);
897
898 let now = chrono::Utc::now().timestamp_micros();
899 let into_str = into_id.to_string();
900 let props_str = merged_props
901 .as_ref()
902 .map(|v| serde_json::to_string(v).unwrap_or_default());
903 let tags_json = serde_json::to_string(&merged_tags).unwrap_or_else(|_| "[]".to_string());
904
905 let mut edges_rewired = 0usize;
907 if !dry_run {
908 for edge in all_edges {
909 let raw_src = if edge.source_id == from_id {
910 into_id
911 } else {
912 edge.source_id
913 };
914 let raw_tgt = if edge.target_id == from_id {
915 into_id
916 } else {
917 edge.target_id
918 };
919 let (new_src, new_tgt) = match edge.relation.parse::<EdgeRelation>() {
922 Ok(rel) => canonical_edge_endpoints(rel, raw_src, raw_tgt),
923 Err(_) => (raw_src, raw_tgt),
924 };
925
926 if new_src == new_tgt {
927 conn.execute(
928 "DELETE FROM graph_edges WHERE namespace = ?1 AND id = ?2",
929 rusqlite::params![&namespace, edge.id.to_string()],
930 )?;
931 continue;
932 }
933
934 let now_ts = chrono::Utc::now().timestamp();
935 let conflict_id: Option<String> = {
949 let conflict_src = new_src.to_string();
950 let conflict_tgt = new_tgt.to_string();
951 conn.query_row(
952 "SELECT id FROM graph_edges \
953 WHERE namespace = ?1 AND source_id = ?2 AND target_id = ?3 \
954 AND relation = ?4 AND id != ?5",
955 rusqlite::params![
956 &namespace,
957 &conflict_src,
958 &conflict_tgt,
959 &edge.relation,
960 edge.id.to_string(),
961 ],
962 |row| row.get(0),
963 )
964 .optional()
965 .map_err(SqliteError::Rusqlite)?
966 };
967
968 let changed = if let Some(existing_id) = conflict_id {
969 conn.execute(
972 "DELETE FROM graph_edges WHERE namespace = ?1 AND id = ?2",
973 rusqlite::params![&namespace, edge.id.to_string()],
974 )?;
975 conn.execute(
976 "UPDATE graph_edges SET \
977 weight = ?1, updated_at = ?2, deleted_at = NULL, \
978 target_backend = ?3, metadata = ?4 \
979 WHERE namespace = ?5 AND id = ?6",
980 rusqlite::params![
981 edge.weight,
982 now_ts,
983 edge.target_backend,
984 edge.metadata,
985 &namespace,
986 &existing_id,
987 ],
988 )?
989 } else {
990 conn.execute(
993 "UPDATE graph_edges SET \
994 source_id = ?1, target_id = ?2, updated_at = ?3 \
995 WHERE namespace = ?4 AND id = ?5",
996 rusqlite::params![
997 new_src.to_string(),
998 new_tgt.to_string(),
999 now_ts,
1000 &namespace,
1001 edge.id.to_string(),
1002 ],
1003 )?
1004 };
1005 if changed > 0 {
1006 edges_rewired += 1;
1007 }
1008 }
1009
1010 conn.execute(
1012 "INSERT OR REPLACE INTO entities \
1013 (id, namespace, kind, name, description, properties, tags, \
1014 created_at, updated_at, deleted_at, merged_into, merge_event_id) \
1015 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
1016 rusqlite::params![
1017 &into_str,
1018 &namespace,
1019 &into_entity.kind,
1020 &merged_name,
1021 &merged_description,
1022 &props_str,
1023 &tags_json,
1024 into_entity.created_at,
1025 now,
1026 into_entity.deleted_at,
1027 Option::<String>::None,
1028 Option::<String>::None,
1029 ],
1030 )?;
1031
1032 let fts_body = match &merged_description {
1037 Some(d) if !d.is_empty() => format!("{} {}", merged_name, d),
1038 _ => merged_name.clone(),
1039 };
1040 let kind_str = SubstrateKind::Entity.to_string();
1041
1042 conn.execute(
1043 &format!(
1044 "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
1045 fts_table
1046 ),
1047 rusqlite::params![&namespace, &into_str],
1048 )?;
1049 conn.execute(
1050 &format!(
1051 "INSERT INTO {} \
1052 (subject_id, kind, title, body, tags, namespace, metadata, updated_at) \
1053 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1054 fts_table
1055 ),
1056 rusqlite::params![
1057 &into_str,
1058 &kind_str,
1059 &merged_name,
1060 &fts_body,
1061 &tags_json,
1062 &namespace,
1063 &props_str,
1064 now,
1065 ],
1066 )?;
1067
1068 conn.execute(
1070 &format!(
1071 "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
1072 fts_table
1073 ),
1074 rusqlite::params![&namespace, &from_str],
1075 )?;
1076
1077 if let Some(ref vec_tbl) = vec_table {
1079 conn.execute(
1080 &format!(
1081 "DELETE FROM {} WHERE subject_id = ?1 AND namespace = ?2",
1082 vec_tbl
1083 ),
1084 rusqlite::params![&from_str, &namespace],
1085 )?;
1086 }
1087
1088 let merge_event_id = Uuid::new_v4();
1090 conn.execute(
1091 "UPDATE entities \
1092 SET deleted_at = ?1, merged_into = ?2, merge_event_id = ?3, updated_at = ?1 \
1093 WHERE namespace = ?4 AND id = ?5 AND deleted_at IS NULL",
1094 rusqlite::params![
1095 now,
1096 into_str,
1097 merge_event_id.to_string(),
1098 &namespace,
1099 &from_str,
1100 ],
1101 )?;
1102 }
1103
1104 let updated_entity = Entity {
1105 id: into_id,
1106 namespace,
1107 kind: into_entity.kind,
1108 entity_type: into_entity.entity_type,
1109 name: merged_name,
1110 description: merged_description,
1111 properties: merged_props,
1112 tags: merged_tags,
1113 created_at: into_entity.created_at,
1114 updated_at: now,
1115 deleted_at: into_entity.deleted_at,
1116 merged_into: None,
1117 merge_event_id: None,
1118 };
1119
1120 Ok((
1121 MergeSummary {
1122 kept_id: into_id,
1123 removed_id: from_id,
1124 edges_rewired,
1125 properties_merged,
1126 tags_unioned,
1127 content_appended: false,
1128 dry_run,
1129 },
1130 updated_entity,
1131 ))
1132}
1133
1134fn read_merge_note(
1140 conn: &rusqlite::Connection,
1141 id: Uuid,
1142 namespace: &str,
1143) -> Result<khive_storage::note::Note, SqliteError> {
1144 use khive_storage::note::Note;
1145 let id_str = id.to_string();
1146 let mut stmt = conn.prepare(
1147 "SELECT id, namespace, kind, status, name, content, salience, decay_factor, \
1148 expires_at, properties, created_at, updated_at, deleted_at \
1149 FROM notes WHERE id = ?1 AND deleted_at IS NULL",
1150 )?;
1151 let mut rows = stmt.query(rusqlite::params![id_str])?;
1152 let row = rows
1153 .next()?
1154 .ok_or_else(|| SqliteError::InvalidData(format!("note {id} not found")))?;
1155
1156 let id_s: String = row.get(0)?;
1157 let ns: String = row.get(1)?;
1158 let kind: String = row.get(2)?;
1159 let status: String = row.get(3)?;
1160 let name: Option<String> = row.get(4)?;
1161 let content: String = row.get(5)?;
1162 let salience: Option<f64> = row.get(6)?;
1163 let decay_factor: Option<f64> = row.get(7)?;
1164 let expires_at: Option<i64> = row.get(8)?;
1165 let properties_str: Option<String> = row.get(9)?;
1166 let created_at: i64 = row.get(10)?;
1167 let updated_at: i64 = row.get(11)?;
1168 let deleted_at: Option<i64> = row.get(12)?;
1169
1170 if ns != namespace {
1171 return Err(SqliteError::InvalidData(format!(
1172 "note {id} belongs to namespace '{ns}', not '{namespace}'"
1173 )));
1174 }
1175
1176 let note_id = Uuid::parse_str(&id_s).map_err(|e| SqliteError::InvalidData(e.to_string()))?;
1177 let properties: Option<serde_json::Value> = properties_str
1178 .map(|s| serde_json::from_str(&s).map_err(|e| SqliteError::InvalidData(e.to_string())))
1179 .transpose()?;
1180
1181 Ok(Note {
1182 id: note_id,
1183 namespace: ns,
1184 kind,
1185 status,
1186 name,
1187 content,
1188 salience,
1189 decay_factor,
1190 expires_at,
1191 properties,
1192 created_at,
1193 updated_at,
1194 deleted_at,
1195 })
1196}
1197
1198fn max_option_f64(a: Option<f64>, b: Option<f64>) -> Option<f64> {
1199 match (a, b) {
1200 (Some(x), Some(y)) => Some(x.max(y)),
1201 (Some(x), None) => Some(x),
1202 (None, Some(y)) => Some(y),
1203 (None, None) => None,
1204 }
1205}
1206
1207fn append_merge_history(props: Option<Value>, entry: Value) -> Result<Option<Value>, SqliteError> {
1208 use serde_json::{json, Map};
1209 let mut obj: Map<String, Value> = match props {
1210 Some(Value::Object(m)) => m,
1211 Some(other) => {
1212 let mut m = Map::new();
1213 m.insert("_value".into(), other);
1214 m
1215 }
1216 None => Map::new(),
1217 };
1218 let history = obj
1219 .entry("_merge_history".to_string())
1220 .or_insert_with(|| json!([]));
1221 if let Value::Array(arr) = history {
1222 arr.push(entry);
1223 }
1224 Ok(Some(Value::Object(obj)))
1225}
1226
1227#[allow(clippy::too_many_arguments)]
1237fn merge_note_sql(
1238 conn: &rusqlite::Connection,
1239 namespace: String,
1240 fts_table: String,
1241 vec_table: Option<String>,
1242 into_id: Uuid,
1243 from_id: Uuid,
1244 strategy: EntityDedupMergePolicy,
1245 content_strategy: ContentMergeStrategy,
1246 dry_run: bool,
1247) -> Result<(MergeSummary, khive_storage::note::Note), SqliteError> {
1248 let into_note = read_merge_note(conn, into_id, &namespace)?;
1249 let from_note = read_merge_note(conn, from_id, &namespace)?;
1250
1251 if into_note.kind != from_note.kind {
1252 return Err(SqliteError::InvalidData(format!(
1253 "cannot merge notes of different kinds: {} vs {}",
1254 into_note.kind, from_note.kind
1255 )));
1256 }
1257
1258 let now = chrono::Utc::now().timestamp_micros();
1259 let into_str = into_id.to_string();
1260 let from_str = from_id.to_string();
1261
1262 #[allow(dead_code)]
1265 struct EdgeRow {
1266 id: Uuid,
1267 source_id: Uuid,
1268 target_id: Uuid,
1269 relation: String,
1270 weight: f64,
1271 created_at: i64,
1272 updated_at: i64,
1273 deleted_at: Option<i64>,
1274 target_backend: Option<String>,
1275 metadata: Option<String>,
1276 }
1277 let parse_id =
1278 |s: String| Uuid::parse_str(&s).map_err(|e| SqliteError::InvalidData(e.to_string()));
1279
1280 let mut outbound: Vec<EdgeRow> = Vec::new();
1281 {
1282 let mut stmt = conn.prepare(
1283 "SELECT id, source_id, target_id, relation, weight, created_at, updated_at, deleted_at, target_backend, metadata \
1284 FROM graph_edges WHERE namespace = ?1 AND source_id = ?2",
1285 )?;
1286 let mut rows = stmt.query(rusqlite::params![&namespace, &from_str])?;
1287 while let Some(row) = rows.next()? {
1288 outbound.push(EdgeRow {
1289 id: parse_id(row.get(0)?)?,
1290 source_id: parse_id(row.get(1)?)?,
1291 target_id: parse_id(row.get(2)?)?,
1292 relation: row.get(3)?,
1293 weight: row.get(4)?,
1294 created_at: row.get(5)?,
1295 updated_at: row.get(6)?,
1296 deleted_at: row.get(7)?,
1297 target_backend: row.get(8)?,
1298 metadata: row.get(9)?,
1299 });
1300 }
1301 }
1302 let mut inbound: Vec<EdgeRow> = Vec::new();
1303 {
1304 let mut stmt = conn.prepare(
1305 "SELECT id, source_id, target_id, relation, weight, created_at, updated_at, deleted_at, target_backend, metadata \
1306 FROM graph_edges WHERE namespace = ?1 AND target_id = ?2",
1307 )?;
1308 let mut rows = stmt.query(rusqlite::params![&namespace, &from_str])?;
1309 while let Some(row) = rows.next()? {
1310 inbound.push(EdgeRow {
1311 id: parse_id(row.get(0)?)?,
1312 source_id: parse_id(row.get(1)?)?,
1313 target_id: parse_id(row.get(2)?)?,
1314 relation: row.get(3)?,
1315 weight: row.get(4)?,
1316 created_at: row.get(5)?,
1317 updated_at: row.get(6)?,
1318 deleted_at: row.get(7)?,
1319 target_backend: row.get(8)?,
1320 metadata: row.get(9)?,
1321 });
1322 }
1323 }
1324 let mut seen: HashSet<Uuid> = HashSet::new();
1325 let mut all_edges: Vec<EdgeRow> = Vec::new();
1326 for edge in outbound.into_iter().chain(inbound) {
1327 if seen.insert(edge.id) {
1328 all_edges.push(edge);
1329 }
1330 }
1331
1332 let (merged_content, content_appended) = match content_strategy {
1334 ContentMergeStrategy::Append => {
1335 if from_note.content.is_empty() {
1336 (into_note.content.clone(), false)
1337 } else {
1338 (
1339 format!("{}\n\n---\n\n{}", into_note.content, from_note.content),
1340 true,
1341 )
1342 }
1343 }
1344 ContentMergeStrategy::PreferInto => (into_note.content.clone(), false),
1345 ContentMergeStrategy::PreferFrom => (from_note.content.clone(), false),
1346 };
1347
1348 let merged_name = match strategy {
1349 EntityDedupMergePolicy::PreferFrom => from_note.name.clone().or(into_note.name.clone()),
1350 _ => into_note.name.clone().or(from_note.name.clone()),
1351 };
1352
1353 let (merged_props, properties_merged) =
1354 merge_properties(&into_note.properties, &from_note.properties, strategy);
1355
1356 let merge_history_entry = serde_json::json!({
1358 "merged_from": from_id.to_string(),
1359 "merged_at": now,
1360 "strategy": format!("{:?}", strategy),
1361 "content_strategy": format!("{:?}", content_strategy),
1362 });
1363 let merged_props = append_merge_history(merged_props, merge_history_entry)?;
1364
1365 let merged_salience = max_option_f64(into_note.salience, from_note.salience);
1366 let merged_expires_at = match (into_note.expires_at, from_note.expires_at) {
1367 (Some(a), Some(b)) => Some(a.max(b)),
1368 (Some(a), None) => Some(a),
1369 (None, Some(b)) => Some(b),
1370 (None, None) => None,
1371 };
1372
1373 let props_str = merged_props
1374 .as_ref()
1375 .map(|v| serde_json::to_string(v).unwrap_or_default());
1376
1377 let mut edges_rewired = 0usize;
1378 if !dry_run {
1379 for edge in all_edges {
1381 let raw_src = if edge.source_id == from_id {
1382 into_id
1383 } else {
1384 edge.source_id
1385 };
1386 let raw_tgt = if edge.target_id == from_id {
1387 into_id
1388 } else {
1389 edge.target_id
1390 };
1391 let (new_src, new_tgt) = match edge.relation.parse::<EdgeRelation>() {
1393 Ok(rel) => canonical_edge_endpoints(rel, raw_src, raw_tgt),
1394 Err(_) => (raw_src, raw_tgt),
1395 };
1396 if new_src == new_tgt {
1397 conn.execute(
1398 "DELETE FROM graph_edges WHERE namespace = ?1 AND id = ?2",
1399 rusqlite::params![&namespace, edge.id.to_string()],
1400 )?;
1401 continue;
1402 }
1403 let now_ts = chrono::Utc::now().timestamp();
1404 let conflict_id: Option<String> = {
1407 let conflict_src = new_src.to_string();
1408 let conflict_tgt = new_tgt.to_string();
1409 conn.query_row(
1410 "SELECT id FROM graph_edges \
1411 WHERE namespace = ?1 AND source_id = ?2 AND target_id = ?3 \
1412 AND relation = ?4 AND id != ?5",
1413 rusqlite::params![
1414 &namespace,
1415 &conflict_src,
1416 &conflict_tgt,
1417 &edge.relation,
1418 edge.id.to_string(),
1419 ],
1420 |row| row.get(0),
1421 )
1422 .optional()
1423 .map_err(SqliteError::Rusqlite)?
1424 };
1425
1426 let changed = if let Some(existing_id) = conflict_id {
1427 conn.execute(
1428 "DELETE FROM graph_edges WHERE namespace = ?1 AND id = ?2",
1429 rusqlite::params![&namespace, edge.id.to_string()],
1430 )?;
1431 conn.execute(
1432 "UPDATE graph_edges SET \
1433 weight = ?1, updated_at = ?2, deleted_at = NULL, \
1434 target_backend = ?3, metadata = ?4 \
1435 WHERE namespace = ?5 AND id = ?6",
1436 rusqlite::params![
1437 edge.weight,
1438 now_ts,
1439 edge.target_backend,
1440 edge.metadata,
1441 &namespace,
1442 &existing_id,
1443 ],
1444 )?
1445 } else {
1446 conn.execute(
1447 "UPDATE graph_edges SET \
1448 source_id = ?1, target_id = ?2, updated_at = ?3 \
1449 WHERE namespace = ?4 AND id = ?5",
1450 rusqlite::params![
1451 new_src.to_string(),
1452 new_tgt.to_string(),
1453 now_ts,
1454 &namespace,
1455 edge.id.to_string(),
1456 ],
1457 )?
1458 };
1459 if changed > 0 {
1460 edges_rewired += 1;
1461 }
1462 }
1463
1464 conn.execute(
1466 "INSERT OR REPLACE INTO notes \
1467 (id, namespace, kind, status, name, content, salience, decay_factor, \
1468 expires_at, properties, created_at, updated_at, deleted_at) \
1469 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
1470 rusqlite::params![
1471 &into_str,
1472 &namespace,
1473 &into_note.kind,
1474 &into_note.status,
1475 &merged_name,
1476 &merged_content,
1477 merged_salience,
1478 into_note.decay_factor,
1479 merged_expires_at,
1480 &props_str,
1481 into_note.created_at,
1482 now,
1483 into_note.deleted_at,
1484 ],
1485 )?;
1486
1487 conn.execute(
1489 &format!(
1490 "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
1491 fts_table
1492 ),
1493 rusqlite::params![&namespace, &into_str],
1494 )?;
1495 let fts_merged = {
1501 let mut merged_note = Note::new(&namespace, &*into_note.kind, &*merged_content);
1502 merged_note.id = into_id;
1503 merged_note.name = merged_name.clone();
1504 merged_note.properties = merged_props.clone();
1505 merged_note.updated_at = now;
1506 note_fts_scalars(&merged_note)
1507 };
1508 conn.execute(
1509 &format!(
1510 "INSERT INTO {} \
1511 (subject_id, kind, title, body, tags, namespace, metadata, updated_at) \
1512 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1513 fts_table
1514 ),
1515 rusqlite::params![
1516 &into_str,
1517 SubstrateKind::Note.to_string(),
1518 &fts_merged.title,
1519 &fts_merged.body,
1520 &fts_merged.tags,
1521 &namespace,
1522 &fts_merged.metadata,
1523 fts_merged.updated_at_micros,
1524 ],
1525 )?;
1526
1527 conn.execute(
1529 &format!(
1530 "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
1531 fts_table
1532 ),
1533 rusqlite::params![&namespace, &from_str],
1534 )?;
1535
1536 if let Some(ref vec_tbl) = vec_table {
1538 conn.execute(
1539 &format!(
1540 "DELETE FROM {} WHERE subject_id = ?1 AND namespace = ?2",
1541 vec_tbl
1542 ),
1543 rusqlite::params![&from_str, &namespace],
1544 )?;
1545 }
1546
1547 conn.execute(
1549 "UPDATE notes SET status = 'deleted', deleted_at = ?1, updated_at = ?1 \
1550 WHERE namespace = ?2 AND id = ?3 AND deleted_at IS NULL",
1551 rusqlite::params![now, &namespace, &from_str],
1552 )?;
1553 }
1554
1555 let updated_note = khive_storage::note::Note {
1556 id: into_id,
1557 namespace: namespace.clone(),
1558 kind: into_note.kind.clone(),
1559 status: into_note.status.clone(),
1560 name: merged_name,
1561 content: merged_content,
1562 salience: merged_salience,
1563 decay_factor: into_note.decay_factor,
1564 expires_at: merged_expires_at,
1565 properties: merged_props,
1566 created_at: into_note.created_at,
1567 updated_at: now,
1568 deleted_at: into_note.deleted_at,
1569 };
1570
1571 Ok((
1572 MergeSummary {
1573 kept_id: into_id,
1574 removed_id: from_id,
1575 edges_rewired,
1576 properties_merged,
1577 tags_unioned: 0,
1578 content_appended,
1579 dry_run,
1580 },
1581 updated_note,
1582 ))
1583}
1584
1585fn merge_string_field(into: &str, from: &str, strategy: EntityDedupMergePolicy) -> String {
1590 match strategy {
1591 EntityDedupMergePolicy::PreferInto | EntityDedupMergePolicy::Union => into.to_string(),
1592 EntityDedupMergePolicy::PreferFrom => from.to_string(),
1593 }
1594}
1595
1596fn merge_option_string_field(
1597 into: &Option<String>,
1598 from: &Option<String>,
1599 strategy: EntityDedupMergePolicy,
1600) -> Option<String> {
1601 match strategy {
1602 EntityDedupMergePolicy::PreferInto => {
1603 if into.is_some() {
1604 into.clone()
1605 } else {
1606 from.clone()
1607 }
1608 }
1609 EntityDedupMergePolicy::PreferFrom => {
1610 if from.is_some() {
1611 from.clone()
1612 } else {
1613 into.clone()
1614 }
1615 }
1616 EntityDedupMergePolicy::Union => {
1617 match (into, from) {
1619 (Some(a), _) if !a.is_empty() => Some(a.clone()),
1620 (_, Some(b)) => Some(b.clone()),
1621 _ => None,
1622 }
1623 }
1624 }
1625}
1626
1627fn merge_properties(
1629 into: &Option<Value>,
1630 from: &Option<Value>,
1631 strategy: EntityDedupMergePolicy,
1632) -> (Option<Value>, usize) {
1633 match (into, from) {
1634 (None, None) => (None, 0),
1635 (Some(a), None) => (Some(a.clone()), 0),
1636 (None, Some(b)) => {
1637 let count = if let Value::Object(m) = b { m.len() } else { 1 };
1638 (Some(b.clone()), count)
1639 }
1640 (Some(into_val), Some(from_val)) => {
1641 let (merged, added) = merge_json(into_val, from_val, strategy);
1642 (Some(merged), added)
1643 }
1644 }
1645}
1646
1647fn merge_json(into: &Value, from: &Value, strategy: EntityDedupMergePolicy) -> (Value, usize) {
1649 match (into, from, strategy) {
1650 (Value::Object(a), Value::Object(b), EntityDedupMergePolicy::Union) => {
1651 let mut result = a.clone();
1652 let mut added = 0usize;
1653 for (k, v_from) in b {
1654 if let Some(v_into) = a.get(k) {
1655 let (merged, sub_added) =
1656 merge_json(v_into, v_from, EntityDedupMergePolicy::Union);
1657 result.insert(k.clone(), merged);
1658 added += sub_added;
1659 } else {
1660 result.insert(k.clone(), v_from.clone());
1661 added += 1;
1662 }
1663 }
1664 (Value::Object(result), added)
1665 }
1666 (Value::Object(a), Value::Object(b), EntityDedupMergePolicy::PreferInto) => {
1667 let mut result = a.clone();
1668 let mut added = 0usize;
1669 for (k, v) in b {
1670 if !a.contains_key(k) {
1671 result.insert(k.clone(), v.clone());
1672 added += 1;
1673 }
1674 }
1675 (Value::Object(result), added)
1676 }
1677 (Value::Object(a), Value::Object(b), EntityDedupMergePolicy::PreferFrom) => {
1678 let mut result = a.clone();
1679 let mut added = 0usize;
1680 for (k, v) in b {
1681 result.insert(k.clone(), v.clone());
1682 if !a.contains_key(k) {
1683 added += 1;
1684 }
1685 }
1686 (Value::Object(result), added)
1687 }
1688 (_into_val, from_val, EntityDedupMergePolicy::PreferFrom) => (from_val.clone(), 1),
1690 _ => (into.clone(), 0),
1691 }
1692}
1693
1694fn union_tags(into: &[String], from: &[String]) -> (Vec<String>, usize) {
1695 let mut seen: HashSet<&str> = into.iter().map(|s| s.as_str()).collect();
1696 let mut result: Vec<String> = into.to_vec();
1697 let mut added = 0usize;
1698 for tag in from {
1699 if seen.insert(tag.as_str()) {
1700 result.push(tag.clone());
1701 added += 1;
1702 }
1703 }
1704 (result, added)
1705}
1706
1707#[cfg(test)]
1716mod tests {
1717 use super::*;
1718 use crate::runtime::{KhiveRuntime, NamespaceToken};
1719 use khive_storage::types::{Direction, TextFilter, TextQueryMode, TextSearchRequest};
1720
1721 fn rt() -> KhiveRuntime {
1722 KhiveRuntime::memory().unwrap()
1723 }
1724
1725 async fn fts_hit(rt: &KhiveRuntime, token: &NamespaceToken, query: &str) -> Vec<Uuid> {
1727 let ns = token.namespace().as_str().to_string();
1728 rt.text(token)
1729 .unwrap()
1730 .search(TextSearchRequest {
1731 query: query.to_string(),
1732 mode: TextQueryMode::Plain,
1733 filter: Some(TextFilter {
1734 namespaces: vec![ns],
1735 ..Default::default()
1736 }),
1737 top_k: 50,
1738 snippet_chars: 100,
1739 })
1740 .await
1741 .unwrap()
1742 .into_iter()
1743 .map(|h| h.subject_id)
1744 .collect()
1745 }
1746
1747 #[tokio::test]
1748 async fn update_entity_patch_changes_only_specified_fields() {
1749 let rt = rt();
1750 let tok = NamespaceToken::local();
1751 let entity = rt
1752 .create_entity(
1753 &tok,
1754 "concept",
1755 None,
1756 "OriginalName",
1757 Some("orig desc"),
1758 Some(serde_json::json!({"k":"v"})),
1759 vec![],
1760 )
1761 .await
1762 .unwrap();
1763
1764 let updated = rt
1765 .update_entity(
1766 &tok,
1767 entity.id,
1768 EntityPatch {
1769 description: Some(Some("new desc".to_string())),
1770 ..Default::default()
1771 },
1772 )
1773 .await
1774 .unwrap();
1775
1776 assert_eq!(updated.name, "OriginalName");
1777 assert_eq!(updated.description.as_deref(), Some("new desc"));
1778 assert_eq!(updated.properties, Some(serde_json::json!({"k":"v"})));
1779 }
1780
1781 #[tokio::test]
1782 async fn update_entity_clear_description_with_some_none() {
1783 let rt = rt();
1784 let tok = NamespaceToken::local();
1785 let entity = rt
1786 .create_entity(
1787 &tok,
1788 "concept",
1789 None,
1790 "ClearDesc",
1791 Some("has description"),
1792 None,
1793 vec![],
1794 )
1795 .await
1796 .unwrap();
1797
1798 let updated = rt
1799 .update_entity(
1800 &tok,
1801 entity.id,
1802 EntityPatch {
1803 description: Some(None),
1804 ..Default::default()
1805 },
1806 )
1807 .await
1808 .unwrap();
1809
1810 assert!(
1811 updated.description.is_none(),
1812 "description should be cleared"
1813 );
1814 }
1815
1816 #[tokio::test]
1817 async fn update_entity_reindexes_when_name_changes() {
1818 let rt = rt();
1819 let tok = NamespaceToken::local();
1820 let entity = rt
1821 .create_entity(&tok, "concept", None, "OldName", None, None, vec![])
1822 .await
1823 .unwrap();
1824
1825 let hits_before = fts_hit(&rt, &tok, "OldName").await;
1827 assert!(
1828 hits_before.contains(&entity.id),
1829 "entity should be findable by old name"
1830 );
1831
1832 rt.update_entity(
1833 &tok,
1834 entity.id,
1835 EntityPatch {
1836 name: Some("NewName".to_string()),
1837 ..Default::default()
1838 },
1839 )
1840 .await
1841 .unwrap();
1842
1843 let hits_old = fts_hit(&rt, &tok, "OldName").await;
1844 let hits_new = fts_hit(&rt, &tok, "NewName").await;
1845
1846 assert!(
1848 !hits_old.contains(&entity.id),
1849 "old name should no longer match after rename"
1850 );
1851 assert!(
1852 hits_new.contains(&entity.id),
1853 "new name should be findable after rename"
1854 );
1855 }
1856
1857 #[tokio::test]
1858 async fn update_entity_properties_merges_preserving_existing_keys() {
1859 let rt = rt();
1860 let tok = NamespaceToken::local();
1861 let entity = rt
1862 .create_entity(
1863 &tok,
1864 "concept",
1865 None,
1866 "MergeProps",
1867 None,
1868 Some(serde_json::json!({
1869 "domain": "inference",
1870 "repo": "lattice",
1871 "status": "researched",
1872 })),
1873 vec![],
1874 )
1875 .await
1876 .unwrap();
1877
1878 let updated = rt
1879 .update_entity(
1880 &tok,
1881 entity.id,
1882 EntityPatch {
1883 properties: Some(serde_json::json!({"status": "implemented"})),
1884 ..Default::default()
1885 },
1886 )
1887 .await
1888 .unwrap();
1889
1890 let props = updated.properties.expect("properties should remain set");
1891 assert_eq!(props["domain"], "inference", "domain key must be preserved");
1892 assert_eq!(props["repo"], "lattice", "repo key must be preserved");
1893 assert_eq!(
1894 props["status"], "implemented",
1895 "status key must be updated by patch"
1896 );
1897 }
1898
1899 #[tokio::test]
1900 async fn update_entity_skips_reindex_when_only_properties_change() {
1901 let rt = rt();
1902 let tok = NamespaceToken::local();
1903 let entity = rt
1904 .create_entity(&tok, "concept", None, "StableIndexed", None, None, vec![])
1905 .await
1906 .unwrap();
1907
1908 let hits_before = fts_hit(&rt, &tok, "StableIndexed").await;
1910 assert!(hits_before.contains(&entity.id));
1911
1912 rt.update_entity(
1914 &tok,
1915 entity.id,
1916 EntityPatch {
1917 properties: Some(serde_json::json!({"new": "prop"})),
1918 ..Default::default()
1919 },
1920 )
1921 .await
1922 .unwrap();
1923
1924 let hits_after = fts_hit(&rt, &tok, "StableIndexed").await;
1925 assert!(
1926 hits_after.contains(&entity.id),
1927 "still findable after props-only patch"
1928 );
1929 }
1930
1931 #[tokio::test]
1932 async fn merge_entity_rewires_edges() {
1933 let rt = rt();
1934 let tok = NamespaceToken::local();
1935 let a = rt
1936 .create_entity(&tok, "concept", None, "A", None, None, vec![])
1937 .await
1938 .unwrap();
1939 let b = rt
1940 .create_entity(&tok, "concept", None, "B", None, None, vec![])
1941 .await
1942 .unwrap();
1943 let c = rt
1944 .create_entity(&tok, "concept", None, "C", None, None, vec![])
1945 .await
1946 .unwrap();
1947 let d = rt
1948 .create_entity(&tok, "concept", None, "D", None, None, vec![])
1949 .await
1950 .unwrap();
1951
1952 rt.link(&tok, a.id, b.id, EdgeRelation::Extends, 1.0, None)
1954 .await
1955 .unwrap();
1956 rt.link(&tok, c.id, b.id, EdgeRelation::Extends, 1.0, None)
1957 .await
1958 .unwrap();
1959
1960 let summary = rt
1961 .merge_entity(&tok, d.id, b.id, EntityDedupMergePolicy::PreferInto, false)
1962 .await
1963 .unwrap();
1964
1965 assert_eq!(summary.kept_id, d.id);
1966 assert_eq!(summary.removed_id, b.id);
1967 assert_eq!(summary.edges_rewired, 2);
1968
1969 let a_neighbors = rt
1971 .neighbors(&tok, a.id, Direction::Out, None, None)
1972 .await
1973 .unwrap();
1974 assert_eq!(a_neighbors.len(), 1);
1975 assert_eq!(a_neighbors[0].node_id, d.id);
1976
1977 let c_neighbors = rt
1978 .neighbors(&tok, c.id, Direction::Out, None, None)
1979 .await
1980 .unwrap();
1981 assert_eq!(c_neighbors.len(), 1);
1982 assert_eq!(c_neighbors[0].node_id, d.id);
1983 }
1984
1985 #[tokio::test]
1986 async fn merge_entity_self_merge_rejected() {
1987 let rt = rt();
1988 let tok = NamespaceToken::local();
1989 let a = rt
1990 .create_entity(&tok, "concept", None, "A", None, None, vec![])
1991 .await
1992 .unwrap();
1993 let err = rt
1994 .merge_entity(&tok, a.id, a.id, EntityDedupMergePolicy::PreferInto, false)
1995 .await
1996 .unwrap_err();
1997 assert!(
1998 format!("{err:?}").contains("cannot merge an entity into itself"),
1999 "expected self-merge rejection, got: {err:?}"
2000 );
2001 }
2002
2003 #[tokio::test]
2004 async fn merge_entity_prefer_into_strategy() {
2005 let rt = rt();
2006 let tok = NamespaceToken::local();
2007 let into = rt
2008 .create_entity(
2009 &tok,
2010 "concept",
2011 None,
2012 "Into",
2013 None,
2014 Some(serde_json::json!({"a": 1})),
2015 vec![],
2016 )
2017 .await
2018 .unwrap();
2019 let from = rt
2020 .create_entity(
2021 &tok,
2022 "concept",
2023 None,
2024 "From",
2025 None,
2026 Some(serde_json::json!({"a": 2, "b": 3})),
2027 vec![],
2028 )
2029 .await
2030 .unwrap();
2031
2032 rt.merge_entity(
2033 &tok,
2034 into.id,
2035 from.id,
2036 EntityDedupMergePolicy::PreferInto,
2037 false,
2038 )
2039 .await
2040 .unwrap();
2041
2042 let kept = rt.get_entity(&tok, into.id).await.unwrap();
2043 let props = kept.properties.unwrap();
2044 assert_eq!(props["a"], 1);
2046 assert_eq!(props["b"], 3);
2047 }
2048
2049 #[tokio::test]
2050 async fn merge_entity_prefer_from_strategy() {
2051 let rt = rt();
2052 let tok = NamespaceToken::local();
2053 let into = rt
2054 .create_entity(
2055 &tok,
2056 "concept",
2057 None,
2058 "Into",
2059 None,
2060 Some(serde_json::json!({"a": 1})),
2061 vec![],
2062 )
2063 .await
2064 .unwrap();
2065 let from = rt
2066 .create_entity(
2067 &tok,
2068 "concept",
2069 None,
2070 "From",
2071 None,
2072 Some(serde_json::json!({"a": 2, "b": 3})),
2073 vec![],
2074 )
2075 .await
2076 .unwrap();
2077
2078 rt.merge_entity(
2079 &tok,
2080 into.id,
2081 from.id,
2082 EntityDedupMergePolicy::PreferFrom,
2083 false,
2084 )
2085 .await
2086 .unwrap();
2087
2088 let kept = rt.get_entity(&tok, into.id).await.unwrap();
2089 let props = kept.properties.unwrap();
2090 assert_eq!(props["a"], 2);
2092 assert_eq!(props["b"], 3);
2093 }
2094
2095 #[tokio::test]
2096 async fn merge_entity_union_strategy() {
2097 let rt = rt();
2098 let tok = NamespaceToken::local();
2099 let into = rt
2100 .create_entity(
2101 &tok,
2102 "concept",
2103 None,
2104 "Into",
2105 None,
2106 Some(serde_json::json!({"a": 1})),
2107 vec![],
2108 )
2109 .await
2110 .unwrap();
2111 let from = rt
2112 .create_entity(
2113 &tok,
2114 "concept",
2115 None,
2116 "From",
2117 None,
2118 Some(serde_json::json!({"a": 2, "b": 3})),
2119 vec![],
2120 )
2121 .await
2122 .unwrap();
2123
2124 rt.merge_entity(&tok, into.id, from.id, EntityDedupMergePolicy::Union, false)
2125 .await
2126 .unwrap();
2127
2128 let kept = rt.get_entity(&tok, into.id).await.unwrap();
2129 let props = kept.properties.unwrap();
2130 assert_eq!(props["a"], 1);
2132 assert_eq!(props["b"], 3);
2133 }
2134
2135 #[tokio::test]
2136 async fn merge_entity_unions_tags() {
2137 let rt = rt();
2138 let tok = NamespaceToken::local();
2139 let into = rt
2140 .create_entity(
2141 &tok,
2142 "concept",
2143 None,
2144 "Into",
2145 None,
2146 None,
2147 vec!["x".to_string(), "y".to_string()],
2148 )
2149 .await
2150 .unwrap();
2151 let from = rt
2152 .create_entity(
2153 &tok,
2154 "concept",
2155 None,
2156 "From",
2157 None,
2158 None,
2159 vec!["y".to_string(), "z".to_string()],
2160 )
2161 .await
2162 .unwrap();
2163
2164 rt.merge_entity(
2165 &tok,
2166 into.id,
2167 from.id,
2168 EntityDedupMergePolicy::PreferInto,
2169 false,
2170 )
2171 .await
2172 .unwrap();
2173
2174 let kept = rt.get_entity(&tok, into.id).await.unwrap();
2175 let mut tags = kept.tags.clone();
2176 tags.sort();
2177 assert_eq!(tags, vec!["x", "y", "z"]);
2178 }
2179
2180 #[tokio::test]
2181 async fn merge_entity_drops_self_loops() {
2182 let rt = rt();
2183 let tok = NamespaceToken::local();
2184 let a = rt
2185 .create_entity(&tok, "concept", None, "A", None, None, vec![])
2186 .await
2187 .unwrap();
2188 let b = rt
2189 .create_entity(&tok, "concept", None, "B", None, None, vec![])
2190 .await
2191 .unwrap();
2192
2193 rt.link(&tok, a.id, b.id, EdgeRelation::Extends, 1.0, None)
2195 .await
2196 .unwrap();
2197
2198 let summary = rt
2199 .merge_entity(&tok, a.id, b.id, EntityDedupMergePolicy::PreferInto, false)
2200 .await
2201 .unwrap();
2202
2203 assert_eq!(
2204 summary.edges_rewired, 0,
2205 "self-loop should be dropped, not rewired"
2206 );
2207
2208 let a_out = rt
2209 .neighbors(&tok, a.id, Direction::Out, None, None)
2210 .await
2211 .unwrap();
2212 assert!(a_out.is_empty(), "no self-loop should remain");
2213 }
2214
2215 #[test]
2218 fn union_tags_deduplicates() {
2219 let (tags, added) = union_tags(
2220 &["x".to_string(), "y".to_string()],
2221 &["y".to_string(), "z".to_string()],
2222 );
2223 let mut sorted = tags.clone();
2224 sorted.sort();
2225 assert_eq!(sorted, vec!["x", "y", "z"]);
2226 assert_eq!(added, 1);
2227 }
2228
2229 #[test]
2230 fn merge_properties_prefer_into_fills_missing_keys() {
2231 let a = serde_json::json!({"a": 1});
2232 let b = serde_json::json!({"a": 99, "b": 2});
2233 let (merged, added) =
2234 merge_properties(&Some(a), &Some(b), EntityDedupMergePolicy::PreferInto);
2235 let m = merged.unwrap();
2236 assert_eq!(m["a"], 1);
2237 assert_eq!(m["b"], 2);
2238 assert_eq!(added, 1);
2239 }
2240
2241 #[tokio::test]
2244 async fn merge_entity_tombstones_source_with_provenance() {
2245 let rt = rt();
2246 let tok = NamespaceToken::local();
2247 let into = rt
2248 .create_entity(&tok, "concept", None, "Into", None, None, vec![])
2249 .await
2250 .unwrap();
2251 let from = rt
2252 .create_entity(&tok, "concept", None, "From", None, None, vec![])
2253 .await
2254 .unwrap();
2255 let from_id = from.id;
2256
2257 rt.merge_entity(
2258 &tok,
2259 into.id,
2260 from_id,
2261 EntityDedupMergePolicy::PreferInto,
2262 false,
2263 )
2264 .await
2265 .unwrap();
2266
2267 assert!(
2269 rt.get_entity(&tok, from_id).await.is_err(),
2270 "tombstoned source should not be returned by get_entity"
2271 );
2272
2273 let pool = rt.backend().pool_arc();
2275 let (deleted_at, merged_into): (Option<i64>, Option<String>) =
2276 tokio::task::spawn_blocking(move || {
2277 let guard = pool.writer().unwrap();
2278 guard
2279 .conn()
2280 .query_row(
2281 "SELECT deleted_at, merged_into FROM entities WHERE id = ?1",
2282 [from_id.to_string()],
2283 |row| Ok((row.get(0)?, row.get(1)?)),
2284 )
2285 .unwrap()
2286 })
2287 .await
2288 .unwrap();
2289 assert!(
2290 deleted_at.is_some(),
2291 "tombstoned entity must have deleted_at set"
2292 );
2293 assert_eq!(
2294 merged_into.as_deref(),
2295 Some(into.id.to_string().as_str()),
2296 "merged_into must point to into_id"
2297 );
2298 }
2299
2300 #[tokio::test]
2301 async fn merge_note_same_kind_appends_content() {
2302 let rt = rt();
2303 let tok = NamespaceToken::local();
2304 let into = rt
2305 .create_note(
2306 &tok,
2307 "observation",
2308 None,
2309 "Into content",
2310 None,
2311 None,
2312 vec![],
2313 )
2314 .await
2315 .unwrap();
2316 let from = rt
2317 .create_note(
2318 &tok,
2319 "observation",
2320 None,
2321 "From content",
2322 None,
2323 None,
2324 vec![],
2325 )
2326 .await
2327 .unwrap();
2328 let from_id = from.id;
2329
2330 let summary = rt
2331 .merge_note(
2332 &tok,
2333 into.id,
2334 from_id,
2335 EntityDedupMergePolicy::PreferInto,
2336 ContentMergeStrategy::Append,
2337 false,
2338 )
2339 .await
2340 .unwrap();
2341
2342 assert_eq!(summary.kept_id, into.id);
2343 assert_eq!(summary.removed_id, from_id);
2344 assert!(summary.content_appended);
2345 assert!(!summary.dry_run);
2346
2347 let from_store = rt.notes(&tok).unwrap();
2349 assert!(
2350 from_store.get_note(from_id).await.unwrap().is_none(),
2351 "merged-from note should be soft-deleted"
2352 );
2353 }
2354
2355 #[tokio::test]
2356 async fn merge_note_different_kinds_rejected() {
2357 let rt = rt();
2358 let tok = NamespaceToken::local();
2359 let into = rt
2360 .create_note(&tok, "observation", None, "Into", None, None, vec![])
2361 .await
2362 .unwrap();
2363 let from = rt
2364 .create_note(&tok, "decision", None, "From", None, None, vec![])
2365 .await
2366 .unwrap();
2367
2368 let result = rt
2369 .merge_note(
2370 &tok,
2371 into.id,
2372 from.id,
2373 EntityDedupMergePolicy::PreferInto,
2374 ContentMergeStrategy::Append,
2375 false,
2376 )
2377 .await;
2378 assert!(result.is_err(), "merging different note kinds must fail");
2379 }
2380
2381 #[tokio::test]
2382 async fn merge_note_dry_run_leaves_notes_unchanged() {
2383 let rt = rt();
2384 let tok = NamespaceToken::local();
2385 let into = rt
2386 .create_note(
2387 &tok,
2388 "observation",
2389 None,
2390 "Into content",
2391 None,
2392 None,
2393 vec![],
2394 )
2395 .await
2396 .unwrap();
2397 let from = rt
2398 .create_note(
2399 &tok,
2400 "observation",
2401 None,
2402 "From content",
2403 None,
2404 None,
2405 vec![],
2406 )
2407 .await
2408 .unwrap();
2409 let into_id = into.id;
2410 let from_id = from.id;
2411
2412 let summary = rt
2413 .merge_note(
2414 &tok,
2415 into_id,
2416 from_id,
2417 EntityDedupMergePolicy::PreferInto,
2418 ContentMergeStrategy::Append,
2419 true,
2420 )
2421 .await
2422 .unwrap();
2423
2424 assert!(summary.dry_run);
2425
2426 let store = rt.notes(&tok).unwrap();
2428 let into_after = store.get_note(into_id).await.unwrap().unwrap();
2429 let from_after = store.get_note(from_id).await.unwrap().unwrap();
2430 assert_eq!(
2431 into_after.content, "Into content",
2432 "dry_run must not mutate into-note"
2433 );
2434 assert_eq!(
2435 from_after.content, "From content",
2436 "dry_run must not mutate from-note"
2437 );
2438 }
2439
2440 #[tokio::test]
2447 async fn merge_nameless_notes_fts_document_is_parity_correct() {
2448 use khive_storage::types::TextSearchRequest;
2449
2450 let rt = rt(); let tok = NamespaceToken::local();
2452
2453 let into = rt
2454 .create_note(
2455 &tok,
2456 "observation",
2457 None,
2458 "intosentinelzxq body",
2459 None,
2460 Some(serde_json::json!({"src": "into"})),
2461 vec![],
2462 )
2463 .await
2464 .expect("create into-note");
2465 let from = rt
2466 .create_note(
2467 &tok,
2468 "observation",
2469 None,
2470 "fromsentinelzxq body",
2471 None,
2472 None,
2473 vec![],
2474 )
2475 .await
2476 .expect("create from-note");
2477
2478 let into_id = into.id;
2479 let from_id = from.id;
2480
2481 rt.merge_note(
2482 &tok,
2483 into_id,
2484 from_id,
2485 EntityDedupMergePolicy::PreferInto,
2486 ContentMergeStrategy::Append,
2487 false,
2488 )
2489 .await
2490 .expect("merge_note must succeed");
2491
2492 let note_store = rt.notes(&tok).expect("note store");
2494 let merged_note = note_store
2495 .get_note(into_id)
2496 .await
2497 .expect("get_note")
2498 .expect("merged note must exist");
2499
2500 let expected = note_fts_document(&merged_note);
2502
2503 let fts = rt.text_for_notes(&tok).expect("FTS store");
2505 let stored = fts
2506 .get_document("local", into_id)
2507 .await
2508 .expect("get_document must not error")
2509 .expect("FTS document must exist after merge");
2510
2511 assert_eq!(stored.subject_id, expected.subject_id, "subject_id");
2513 assert_eq!(
2514 stored.title, expected.title,
2515 "title (None for nameless note)"
2516 );
2517 assert_eq!(stored.body, expected.body, "body");
2518 assert_eq!(stored.namespace, expected.namespace, "namespace");
2519 assert_eq!(stored.kind, expected.kind, "kind");
2520
2521 assert!(
2523 stored.title.is_none(),
2524 "nameless merged note must have title=None in FTS (was NULL before fix)"
2525 );
2526
2527 let hits = fts
2529 .search(TextSearchRequest {
2530 query: "intosentinelzxq".to_string(),
2531 mode: khive_storage::types::TextQueryMode::Plain,
2532 filter: None,
2533 top_k: 10,
2534 snippet_chars: 0,
2535 })
2536 .await
2537 .expect("search");
2538 assert!(
2539 hits.iter().any(|h| h.subject_id == into_id),
2540 "merged note must be searchable by into-note content"
2541 );
2542 }
2543
2544 #[tokio::test]
2545 async fn update_edge_updates_properties() {
2546 use khive_storage::EdgeRelation;
2547 let rt = rt();
2548 let tok = NamespaceToken::local();
2549 let a = rt
2550 .create_entity(&tok, "concept", None, "A", None, None, vec![])
2551 .await
2552 .unwrap();
2553 let b = rt
2554 .create_entity(&tok, "concept", None, "B", None, None, vec![])
2555 .await
2556 .unwrap();
2557 let edge = rt
2558 .link(&tok, a.id, b.id, EdgeRelation::Extends, 0.5, None)
2559 .await
2560 .unwrap();
2561 let edge_id: Uuid = edge.id.into();
2562
2563 let updated = rt
2564 .update_edge(
2565 &tok,
2566 edge_id,
2567 EdgePatch {
2568 properties: Some(serde_json::json!({"source": "manual"})),
2569 ..Default::default()
2570 },
2571 )
2572 .await
2573 .unwrap();
2574
2575 assert_eq!(updated.metadata.as_ref().unwrap()["source"], "manual");
2576 assert!((updated.weight - 0.5).abs() < 0.001, "weight unchanged");
2577 }
2578
2579 #[tokio::test]
2584 async fn merge_entity_survives_shared_edge_to_third_party() {
2585 use khive_storage::EdgeRelation;
2586 let rt = rt();
2587 let tok = NamespaceToken::local();
2588
2589 let a = rt
2592 .create_entity(&tok, "concept", None, "A", None, None, vec![])
2593 .await
2594 .unwrap();
2595 let b = rt
2596 .create_entity(&tok, "concept", None, "B", None, None, vec![])
2597 .await
2598 .unwrap();
2599 let shared = rt
2600 .create_entity(&tok, "concept", None, "Shared", None, None, vec![])
2601 .await
2602 .unwrap();
2603
2604 rt.link(&tok, a.id, shared.id, EdgeRelation::Extends, 1.0, None)
2607 .await
2608 .unwrap();
2609 rt.link(&tok, b.id, shared.id, EdgeRelation::Extends, 1.0, None)
2610 .await
2611 .unwrap();
2612
2613 let summary = rt
2615 .merge_entity(
2616 &tok,
2617 a.id,
2618 b.id,
2619 crate::EntityDedupMergePolicy::PreferInto,
2620 false,
2621 )
2622 .await
2623 .expect(
2624 "C1: merge must succeed even when both entities share an edge to a third party",
2625 );
2626
2627 assert_eq!(summary.kept_id, a.id);
2628 assert_eq!(summary.removed_id, b.id);
2629 let a_edges = rt
2637 .list_edges(
2638 &tok,
2639 crate::EdgeListFilter {
2640 source_id: Some(a.id),
2641 target_id: Some(shared.id),
2642 relations: vec![EdgeRelation::Extends],
2643 ..Default::default()
2644 },
2645 10,
2646 )
2647 .await
2648 .unwrap();
2649 assert_eq!(
2650 a_edges.len(),
2651 1,
2652 "C1: exactly one live A→shared Extends edge must exist after merge; got: {a_edges:?}"
2653 );
2654
2655 let b_after = rt.entities(&tok).unwrap().get_entity(b.id).await.unwrap();
2658 assert!(
2659 b_after.is_none(),
2660 "C3: from_entity must be tombstoned (get_entity returns None for deleted) after merge; got: {b_after:?}"
2661 );
2662 }
2663
2664 #[tokio::test]
2668 async fn merge_entity_cross_kind_rejected_at_runtime() {
2669 let rt = rt();
2670 let tok = NamespaceToken::local();
2671
2672 let concept = rt
2673 .create_entity(&tok, "concept", None, "H2Concept", None, None, vec![])
2674 .await
2675 .unwrap();
2676 let project = rt
2677 .create_entity(&tok, "project", None, "H2Project", None, None, vec![])
2678 .await
2679 .unwrap();
2680
2681 let err = rt
2683 .merge_entity(
2684 &tok,
2685 concept.id,
2686 project.id,
2687 crate::EntityDedupMergePolicy::PreferInto,
2688 false,
2689 )
2690 .await
2691 .expect_err("H2: cross-kind merge must be rejected by runtime");
2692 assert!(
2693 matches!(err, crate::RuntimeError::InvalidInput(_)),
2694 "H2: expected InvalidInput, got: {err:?}"
2695 );
2696
2697 let concept_after = rt.get_entity(&tok, concept.id).await;
2699 let project_after = rt.get_entity(&tok, project.id).await;
2700 assert!(
2701 concept_after.is_ok(),
2702 "H2: concept must remain live after rejected merge; got: {concept_after:?}"
2703 );
2704 assert!(
2705 project_after.is_ok(),
2706 "H2: project must remain live after rejected merge; got: {project_after:?}"
2707 );
2708 }
2709
2710 #[tokio::test]
2712 async fn merge_entity_same_kind_succeeds() {
2713 let rt = rt();
2714 let tok = NamespaceToken::local();
2715
2716 let c1 = rt
2717 .create_entity(&tok, "concept", None, "Concept1", None, None, vec![])
2718 .await
2719 .unwrap();
2720 let c2 = rt
2721 .create_entity(&tok, "concept", None, "Concept2", None, None, vec![])
2722 .await
2723 .unwrap();
2724
2725 let summary = rt
2726 .merge_entity(
2727 &tok,
2728 c1.id,
2729 c2.id,
2730 crate::EntityDedupMergePolicy::PreferInto,
2731 false,
2732 )
2733 .await
2734 .expect("same-kind merge must succeed");
2735 assert_eq!(summary.kept_id, c1.id);
2736 assert_eq!(summary.removed_id, c2.id);
2737
2738 let c2_after = rt.entities(&tok).unwrap().get_entity(c2.id).await.unwrap();
2740 assert!(c2_after.is_none(), "from_entity must be tombstoned");
2741 }
2742
2743 #[tokio::test]
2746 async fn merge_note_cross_namespace_either_id_returns_not_found() {
2747 use crate::error::RuntimeError;
2748 use crate::Namespace;
2749
2750 let rt = rt();
2751 let ns_a = NamespaceToken::for_namespace(Namespace::parse("ns-a").unwrap());
2752 let ns_b = NamespaceToken::for_namespace(Namespace::parse("ns-b").unwrap());
2753
2754 let into_a = rt
2755 .create_note(&ns_a, "observation", None, "Into A", None, None, vec![])
2756 .await
2757 .unwrap();
2758 let from_a = rt
2759 .create_note(&ns_a, "observation", None, "From A", None, None, vec![])
2760 .await
2761 .unwrap();
2762 let note_b = rt
2763 .create_note(&ns_b, "observation", None, "Note B", None, None, vec![])
2764 .await
2765 .unwrap();
2766
2767 let foreign_into = rt
2769 .merge_note(
2770 &ns_a,
2771 note_b.id,
2772 from_a.id,
2773 EntityDedupMergePolicy::PreferInto,
2774 ContentMergeStrategy::Append,
2775 false,
2776 )
2777 .await;
2778 assert!(
2779 matches!(foreign_into, Err(RuntimeError::NotFound(_))),
2780 "foreign into_id must be denied before merge, got {foreign_into:?}"
2781 );
2782
2783 let foreign_from = rt
2785 .merge_note(
2786 &ns_a,
2787 into_a.id,
2788 note_b.id,
2789 EntityDedupMergePolicy::PreferInto,
2790 ContentMergeStrategy::Append,
2791 false,
2792 )
2793 .await;
2794 assert!(
2795 matches!(foreign_from, Err(RuntimeError::NotFound(_))),
2796 "foreign from_id must be denied before merge, got {foreign_from:?}"
2797 );
2798 }
2799
2800 #[tokio::test]
2803 async fn update_entity_cross_namespace_returns_not_found_and_preserves_source() {
2804 use crate::error::RuntimeError;
2805 use crate::Namespace;
2806
2807 let rt = rt();
2808 let ns_a = NamespaceToken::for_namespace(Namespace::parse("ns-a").unwrap());
2809 let ns_b = NamespaceToken::for_namespace(Namespace::parse("ns-b").unwrap());
2810
2811 let entity = rt
2812 .create_entity(
2813 &ns_a,
2814 "concept",
2815 None,
2816 "Alpha",
2817 Some("original"),
2818 None,
2819 vec![],
2820 )
2821 .await
2822 .unwrap();
2823
2824 let err = rt
2825 .update_entity(
2826 &ns_b,
2827 entity.id,
2828 EntityPatch {
2829 name: Some("Compromised".into()),
2830 ..Default::default()
2831 },
2832 )
2833 .await;
2834
2835 assert!(
2836 matches!(err, Err(RuntimeError::NotFound(_))),
2837 "cross-namespace update must return opaque NotFound, got {err:?}"
2838 );
2839
2840 let after = rt.get_entity(&ns_a, entity.id).await.unwrap();
2841 assert_eq!(
2842 after.name, "Alpha",
2843 "foreign update must not mutate source row"
2844 );
2845 assert_eq!(after.description.as_deref(), Some("original"));
2846 }
2847
2848 #[tokio::test]
2849 async fn merge_entity_cross_namespace_either_id_returns_not_found() {
2850 use crate::error::RuntimeError;
2851 use crate::Namespace;
2852
2853 let rt = rt();
2854 let ns_a = NamespaceToken::for_namespace(Namespace::parse("ns-a").unwrap());
2855 let ns_b = NamespaceToken::for_namespace(Namespace::parse("ns-b").unwrap());
2856
2857 let into_a = rt
2858 .create_entity(&ns_a, "concept", None, "Into A", None, None, vec![])
2859 .await
2860 .unwrap();
2861 let from_a = rt
2862 .create_entity(&ns_a, "concept", None, "From A", None, None, vec![])
2863 .await
2864 .unwrap();
2865 let foreign_b = rt
2866 .create_entity(&ns_b, "concept", None, "Foreign B", None, None, vec![])
2867 .await
2868 .unwrap();
2869
2870 let foreign_into = rt
2872 .merge_entity(
2873 &ns_a,
2874 foreign_b.id,
2875 from_a.id,
2876 EntityDedupMergePolicy::PreferInto,
2877 false,
2878 )
2879 .await;
2880 assert!(
2881 matches!(foreign_into, Err(RuntimeError::NotFound(_))),
2882 "foreign into_id must be denied before merge, got {foreign_into:?}"
2883 );
2884
2885 let foreign_from = rt
2887 .merge_entity(
2888 &ns_a,
2889 into_a.id,
2890 foreign_b.id,
2891 EntityDedupMergePolicy::PreferInto,
2892 false,
2893 )
2894 .await;
2895 assert!(
2896 matches!(foreign_from, Err(RuntimeError::NotFound(_))),
2897 "foreign from_id must be denied before merge, got {foreign_from:?}"
2898 );
2899
2900 assert!(rt.get_entity(&ns_a, into_a.id).await.is_ok());
2902 assert!(rt.get_entity(&ns_a, from_a.id).await.is_ok());
2903 assert!(rt.get_entity(&ns_b, foreign_b.id).await.is_ok());
2904 }
2905
2906 #[test]
2909 fn entity_fts_document_with_description() {
2910 let mut entity = Entity::new("local", "concept", "MyEntity");
2911 entity = entity.with_description("some description text");
2912 let doc = entity_fts_document(&entity);
2913 assert_eq!(doc.subject_id, entity.id);
2914 assert_eq!(doc.namespace, "local");
2915 assert_eq!(doc.title.as_deref(), Some("MyEntity"));
2916 assert_eq!(doc.body, "MyEntity some description text");
2917 assert_eq!(doc.kind, khive_types::SubstrateKind::Entity);
2918 }
2919
2920 #[test]
2921 fn entity_fts_document_without_description() {
2922 let entity = Entity::new("local", "concept", "NameOnly");
2923 let doc = entity_fts_document(&entity);
2924 assert_eq!(doc.title.as_deref(), Some("NameOnly"));
2925 assert_eq!(doc.body, "NameOnly");
2926 }
2927
2928 #[test]
2929 fn entity_fts_document_empty_description_uses_name_only() {
2930 let mut entity = Entity::new("local", "concept", "TitleOnly");
2931 entity = entity.with_description("");
2932 let doc = entity_fts_document(&entity);
2933 assert_eq!(
2934 doc.body, "TitleOnly",
2935 "empty description must not be appended"
2936 );
2937 }
2938
2939 #[tokio::test]
2943 async fn entity_fts_document_matches_runtime_create_path() {
2944 let rt = rt();
2945 let tok = NamespaceToken::local();
2946
2947 let entity = rt
2948 .create_entity(
2949 &tok,
2950 "concept",
2951 None,
2952 "CrossPathTitle",
2953 Some("cross path description body"),
2954 Some(serde_json::json!({"key": "val"})),
2955 vec!["tag1".to_string()],
2956 )
2957 .await
2958 .expect("create_entity");
2959
2960 let fts = rt.text(&tok).expect("FTS store");
2961 let stored = fts
2962 .get_document("local", entity.id)
2963 .await
2964 .expect("get_document")
2965 .expect("document must exist after create_entity");
2966
2967 let expected = entity_fts_document(&entity);
2968
2969 assert_eq!(stored.subject_id, expected.subject_id, "subject_id");
2970 assert_eq!(stored.kind, expected.kind, "kind");
2971 assert_eq!(stored.title, expected.title, "title");
2972 assert_eq!(stored.body, expected.body, "body");
2973 assert_eq!(stored.namespace, expected.namespace, "namespace");
2974 }
2975
2976 #[tokio::test]
2979 async fn entity_fts_document_matches_runtime_update_path() {
2980 let rt = rt();
2981 let tok = NamespaceToken::local();
2982
2983 let entity = rt
2984 .create_entity(
2985 &tok,
2986 "concept",
2987 None,
2988 "OldName",
2989 Some("old desc"),
2990 None,
2991 vec![],
2992 )
2993 .await
2994 .expect("create_entity");
2995
2996 let updated = rt
2997 .update_entity(
2998 &tok,
2999 entity.id,
3000 EntityPatch {
3001 name: Some("NewName".to_string()),
3002 description: Some(Some("new desc".to_string())),
3003 ..Default::default()
3004 },
3005 )
3006 .await
3007 .expect("update_entity");
3008
3009 let fts = rt.text(&tok).expect("FTS store");
3010 let stored = fts
3011 .get_document("local", updated.id)
3012 .await
3013 .expect("get_document")
3014 .expect("document must exist after update_entity");
3015
3016 let expected = entity_fts_document(&updated);
3017
3018 assert_eq!(stored.title, expected.title, "title after update");
3019 assert_eq!(stored.body, expected.body, "body after update");
3020 }
3021
3022 #[tokio::test]
3025 async fn entity_fts_document_matches_runtime_merge_path() {
3026 let rt = rt();
3027 let tok = NamespaceToken::local();
3028
3029 let into_e = rt
3030 .create_entity(
3031 &tok,
3032 "concept",
3033 None,
3034 "IntoEntity",
3035 Some("into desc"),
3036 None,
3037 vec![],
3038 )
3039 .await
3040 .expect("create into");
3041 let from_e = rt
3042 .create_entity(
3043 &tok,
3044 "concept",
3045 None,
3046 "FromEntity",
3047 Some("from desc"),
3048 None,
3049 vec![],
3050 )
3051 .await
3052 .expect("create from");
3053
3054 let summary = rt
3055 .merge_entity(
3056 &tok,
3057 into_e.id,
3058 from_e.id,
3059 EntityDedupMergePolicy::PreferInto,
3060 false,
3061 )
3062 .await
3063 .expect("merge_entity");
3064
3065 let kept = rt
3066 .get_entity(&tok, summary.kept_id)
3067 .await
3068 .expect("get kept");
3069
3070 let fts = rt.text(&tok).expect("FTS store");
3071 let stored = fts
3072 .get_document("local", kept.id)
3073 .await
3074 .expect("get_document")
3075 .expect("FTS document must exist for kept entity after merge");
3076
3077 let expected = entity_fts_document(&kept);
3078
3079 assert_eq!(stored.title, expected.title, "title after merge");
3080 assert_eq!(stored.body, expected.body, "body after merge");
3081 }
3082}