1use std::collections::{HashMap, HashSet};
2
3use futures::TryStreamExt;
4use hirn_core::revision::{
5 LogicalMemoryId, RecallSnapshot, RevisionId, RevisionOperation, RevisionRef, RevisionState,
6};
7
8use super::*;
9
10pub(super) const SEMANTIC_CREATE_MUTATION_KIND: &str = "semantic_create";
11pub(super) const SEMANTIC_SUCCESSOR_MUTATION_KIND: &str = "semantic_successor";
12pub(super) const SEMANTIC_MERGE_MUTATION_KIND: &str = "semantic_merge";
13pub(super) const SEMANTIC_CONTRADICTION_SYNC_MUTATION_KIND: &str = "semantic_contradiction_sync";
14pub(super) const SEMANTIC_PURGE_MUTATION_KIND: &str = "semantic_purge";
15pub(super) const SEMANTIC_RETRACT_MUTATION_KIND: &str = "semantic_retract";
16
17#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
18struct SemanticCreateEnvelope {
19 record_id: MemoryId,
20}
21
22fn encode_semantic_create_envelope(payload: &SemanticCreateEnvelope) -> HirnResult<Vec<u8>> {
23 serde_json::to_vec(payload)
24 .map_err(|error| HirnError::storage(format!("semantic create envelope serialize: {error}")))
25}
26
27fn decode_semantic_create_envelope(
28 envelope: &hirn_storage::MutationEnvelopeRecord,
29) -> HirnResult<SemanticCreateEnvelope> {
30 serde_json::from_slice(&envelope.payload).map_err(|error| {
31 HirnError::storage(format!("semantic create envelope deserialize: {error}"))
32 })
33}
34
35fn build_semantic_create_envelope(
36 record_id: MemoryId,
37) -> HirnResult<hirn_storage::MutationEnvelopeRecord> {
38 let payload = SemanticCreateEnvelope { record_id };
39 let payload = encode_semantic_create_envelope(&payload)?;
40
41 Ok(hirn_storage::MutationEnvelopeRecord::pending(
42 format!("semantic-create:{record_id}"),
43 SEMANTIC_CREATE_MUTATION_KIND,
44 payload,
45 ))
46}
47
48#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
49struct SemanticSuccessorEnvelope {
50 prior_record_id: MemoryId,
51 successor_id: MemoryId,
52}
53
54fn encode_semantic_successor_envelope(payload: &SemanticSuccessorEnvelope) -> HirnResult<Vec<u8>> {
55 serde_json::to_vec(payload).map_err(|error| {
56 HirnError::storage(format!("semantic successor envelope serialize: {error}"))
57 })
58}
59
60fn decode_semantic_successor_envelope(
61 envelope: &hirn_storage::MutationEnvelopeRecord,
62) -> HirnResult<SemanticSuccessorEnvelope> {
63 serde_json::from_slice(&envelope.payload).map_err(|error| {
64 HirnError::storage(format!("semantic successor envelope deserialize: {error}"))
65 })
66}
67
68fn build_semantic_successor_envelope(
69 prior_record_id: MemoryId,
70 successor_id: MemoryId,
71) -> HirnResult<hirn_storage::MutationEnvelopeRecord> {
72 let payload = SemanticSuccessorEnvelope {
73 prior_record_id,
74 successor_id,
75 };
76 let payload = encode_semantic_successor_envelope(&payload)?;
77
78 Ok(hirn_storage::MutationEnvelopeRecord::pending(
79 format!("semantic-successor:{successor_id}"),
80 SEMANTIC_SUCCESSOR_MUTATION_KIND,
81 payload,
82 ))
83}
84
85#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
86struct SemanticMergeEnvelope {
87 prior_target_id: MemoryId,
88 merged_target_id: MemoryId,
89 prior_source_ids: Vec<MemoryId>,
90 merged_source_ids: Vec<MemoryId>,
91}
92
93fn encode_semantic_merge_envelope(payload: &SemanticMergeEnvelope) -> HirnResult<Vec<u8>> {
94 serde_json::to_vec(payload)
95 .map_err(|error| HirnError::storage(format!("semantic merge envelope serialize: {error}")))
96}
97
98fn decode_semantic_merge_envelope(
99 envelope: &hirn_storage::MutationEnvelopeRecord,
100) -> HirnResult<SemanticMergeEnvelope> {
101 serde_json::from_slice(&envelope.payload).map_err(|error| {
102 HirnError::storage(format!("semantic merge envelope deserialize: {error}"))
103 })
104}
105
106fn build_semantic_merge_envelope(
107 prior_target_id: MemoryId,
108 merged_target_id: MemoryId,
109 prior_source_ids: Vec<MemoryId>,
110 merged_source_ids: Vec<MemoryId>,
111) -> HirnResult<hirn_storage::MutationEnvelopeRecord> {
112 let payload = SemanticMergeEnvelope {
113 prior_target_id,
114 merged_target_id,
115 prior_source_ids,
116 merged_source_ids,
117 };
118 let payload = encode_semantic_merge_envelope(&payload)?;
119
120 Ok(hirn_storage::MutationEnvelopeRecord::pending(
121 format!("semantic-merge:{merged_target_id}"),
122 SEMANTIC_MERGE_MUTATION_KIND,
123 payload,
124 ))
125}
126
127#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
128struct SemanticContradictionSyncEnvelope {
129 prior_record_ids: Vec<MemoryId>,
130 successor_ids: Vec<MemoryId>,
131}
132
133fn encode_semantic_contradiction_sync_envelope(
134 payload: &SemanticContradictionSyncEnvelope,
135) -> HirnResult<Vec<u8>> {
136 serde_json::to_vec(payload).map_err(|error| {
137 HirnError::storage(format!(
138 "semantic contradiction sync envelope serialize: {error}"
139 ))
140 })
141}
142
143fn decode_semantic_contradiction_sync_envelope(
144 envelope: &hirn_storage::MutationEnvelopeRecord,
145) -> HirnResult<SemanticContradictionSyncEnvelope> {
146 serde_json::from_slice(&envelope.payload).map_err(|error| {
147 HirnError::storage(format!(
148 "semantic contradiction sync envelope deserialize: {error}"
149 ))
150 })
151}
152
153fn build_semantic_contradiction_sync_envelope(
154 mut prior_record_ids: Vec<MemoryId>,
155 mut successor_ids: Vec<MemoryId>,
156) -> HirnResult<hirn_storage::MutationEnvelopeRecord> {
157 prior_record_ids.sort_unstable();
158 prior_record_ids.dedup();
159 successor_ids.sort_unstable();
160 successor_ids.dedup();
161
162 let envelope_suffix = successor_ids
163 .iter()
164 .map(ToString::to_string)
165 .collect::<Vec<_>>()
166 .join("+");
167
168 let payload = SemanticContradictionSyncEnvelope {
169 prior_record_ids,
170 successor_ids,
171 };
172 let payload = encode_semantic_contradiction_sync_envelope(&payload)?;
173
174 Ok(hirn_storage::MutationEnvelopeRecord::pending(
175 format!("semantic-contradiction-sync:{envelope_suffix}"),
176 SEMANTIC_CONTRADICTION_SYNC_MUTATION_KIND,
177 payload,
178 ))
179}
180
181#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
182struct SemanticPurgeEnvelope {
183 logical_memory_id: LogicalMemoryId,
184 revision_ids: Vec<MemoryId>,
185}
186
187fn encode_semantic_purge_envelope(payload: &SemanticPurgeEnvelope) -> HirnResult<Vec<u8>> {
188 serde_json::to_vec(payload)
189 .map_err(|error| HirnError::storage(format!("semantic purge envelope serialize: {error}")))
190}
191
192fn decode_semantic_purge_envelope(
193 envelope: &hirn_storage::MutationEnvelopeRecord,
194) -> HirnResult<SemanticPurgeEnvelope> {
195 serde_json::from_slice(&envelope.payload).map_err(|error| {
196 HirnError::storage(format!("semantic purge envelope deserialize: {error}"))
197 })
198}
199
200fn build_semantic_purge_envelope(
201 logical_memory_id: LogicalMemoryId,
202 mut revision_ids: Vec<MemoryId>,
203) -> HirnResult<hirn_storage::MutationEnvelopeRecord> {
204 revision_ids.sort_unstable();
205 revision_ids.dedup();
206
207 let payload = SemanticPurgeEnvelope {
208 logical_memory_id,
209 revision_ids,
210 };
211 let payload = encode_semantic_purge_envelope(&payload)?;
212
213 Ok(hirn_storage::MutationEnvelopeRecord::pending(
214 format!("semantic-purge:{logical_memory_id}"),
215 SEMANTIC_PURGE_MUTATION_KIND,
216 payload,
217 ))
218}
219
220#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
221struct SemanticRetractEnvelope {
222 prior_record_id: MemoryId,
223 tombstone_id: MemoryId,
224}
225
226fn encode_semantic_retract_envelope(payload: &SemanticRetractEnvelope) -> HirnResult<Vec<u8>> {
227 serde_json::to_vec(payload).map_err(|error| {
228 HirnError::storage(format!("semantic retract envelope serialize: {error}"))
229 })
230}
231
232fn decode_semantic_retract_envelope(
233 envelope: &hirn_storage::MutationEnvelopeRecord,
234) -> HirnResult<SemanticRetractEnvelope> {
235 serde_json::from_slice(&envelope.payload).map_err(|error| {
236 HirnError::storage(format!("semantic retract envelope deserialize: {error}"))
237 })
238}
239
240fn build_semantic_retract_envelope(
241 prior_record_id: MemoryId,
242 tombstone_id: MemoryId,
243) -> HirnResult<hirn_storage::MutationEnvelopeRecord> {
244 let payload = SemanticRetractEnvelope {
245 prior_record_id,
246 tombstone_id,
247 };
248 let payload = encode_semantic_retract_envelope(&payload)?;
249
250 Ok(hirn_storage::MutationEnvelopeRecord::pending(
251 format!("semantic-retract:{tombstone_id}"),
252 SEMANTIC_RETRACT_MUTATION_KIND,
253 payload,
254 ))
255}
256
257fn semantic_revision_is_newer(candidate: &SemanticRecord, current: &SemanticRecord) -> bool {
258 candidate.version > current.version
259 || (candidate.version == current.version
260 && (candidate.created_at > current.created_at
261 || (candidate.created_at == current.created_at
262 && candidate.revision_id > current.revision_id)))
263}
264
265fn upsert_semantic_head(
266 heads: &mut HashMap<LogicalMemoryId, SemanticRecord>,
267 record: SemanticRecord,
268) {
269 heads
270 .entry(record.logical_memory_id)
271 .and_modify(|current| {
272 if semantic_revision_is_newer(&record, current) {
273 *current = record.clone();
274 }
275 })
276 .or_insert(record);
277}
278
279fn collapse_semantic_heads(
280 records: impl IntoIterator<Item = SemanticRecord>,
281) -> HashMap<LogicalMemoryId, SemanticRecord> {
282 let mut heads = HashMap::new();
283
284 for record in records {
285 upsert_semantic_head(&mut heads, record);
286 }
287
288 heads
289}
290
291fn semantic_record_is_live(record: &SemanticRecord) -> bool {
292 record.is_live()
293}
294
295fn storage_precision_timestamp(ts: Timestamp) -> Timestamp {
296 Timestamp::from_millis(ts.millis())
297}
298
299fn normalize_semantic_record_timestamps(record: &mut SemanticRecord) {
300 record.last_accessed = storage_precision_timestamp(record.last_accessed);
301 record.created_at = storage_precision_timestamp(record.created_at);
302 record.updated_at = storage_precision_timestamp(record.updated_at);
303 record.valid_from = storage_precision_timestamp(record.valid_from);
304 record.valid_until = record.valid_until.map(storage_precision_timestamp);
305}
306
307#[derive(Clone)]
308struct ContradictionSuccessorLink {
309 target_id: MemoryId,
310 weight: f32,
311 metadata: Metadata,
312 skip_graph_edge: bool,
313}
314
315#[derive(Clone)]
316enum ContradictionEndpoint {
317 Semantic(Box<SemanticRecord>),
318 Other(MemoryId),
319}
320
321impl ContradictionEndpoint {
322 fn id(&self) -> MemoryId {
323 match self {
324 Self::Semantic(record) => record.id,
325 Self::Other(id) => *id,
326 }
327 }
328
329 fn as_semantic(&self) -> Option<&SemanticRecord> {
330 match self {
331 Self::Semantic(record) => Some(record.as_ref()),
332 Self::Other(_) => None,
333 }
334 }
335}
336
337struct PreparedSemanticContradictionSuccessor {
338 current: SemanticRecord,
339 next: SemanticRecord,
340}
341
342struct ContradictionSyncResult {
343 source_memory: MemoryId,
344 target_memory: MemoryId,
345 contradiction_edge: Option<crate::graph::EdgeId>,
346}
347
348fn format_memory_id_list(ids: &[MemoryId]) -> String {
349 ids.iter()
350 .map(ToString::to_string)
351 .collect::<Vec<_>>()
352 .join(",")
353}
354
355fn merged_confidence_and_evidence(records: &[&SemanticRecord]) -> (f32, u32) {
356 let total_weight: u64 = records
357 .iter()
358 .map(|record| u64::from(record.evidence_count.max(1)))
359 .sum();
360 let weighted_confidence = if total_weight == 0 {
361 0.5
362 } else {
363 let weighted_sum: f64 = records
364 .iter()
365 .map(|record| f64::from(record.confidence) * f64::from(record.evidence_count.max(1)))
366 .sum();
367 (weighted_sum / total_weight as f64) as f32
368 };
369 let total_evidence = records.iter().fold(0u32, |acc, record| {
370 acc.saturating_add(record.evidence_count.max(1))
371 });
372 (weighted_confidence.clamp(0.0, 1.0), total_evidence)
373}
374
375fn semantic_snapshot_head_as_of(
376 history: &[SemanticRecord],
377 cutoff: Timestamp,
378) -> Option<SemanticRecord> {
379 history
380 .iter()
381 .filter(|record| record.valid_from <= cutoff)
382 .max_by(|left, right| {
383 left.version
384 .cmp(&right.version)
385 .then_with(|| left.created_at.cmp(&right.created_at))
386 .then_with(|| left.revision_id.cmp(&right.revision_id))
387 })
388 .cloned()
389}
390
391fn semantic_snapshot_head_recorded_at_snapshot(
392 history: &[SemanticRecord],
393 snapshot: ResolvedRecallSnapshot,
394) -> Option<SemanticRecord> {
395 history
396 .iter()
397 .filter(|record| {
398 snapshot.contains_recorded_revision_for_chain(
399 record.logical_memory_id,
400 record.version,
401 record.created_at,
402 record.revision_id,
403 )
404 })
405 .max_by(|left, right| {
406 left.created_at
407 .cmp(&right.created_at)
408 .then_with(|| left.version.cmp(&right.version))
409 .then_with(|| left.revision_id.cmp(&right.revision_id))
410 })
411 .cloned()
412}
413
414fn memory_record_recorded_at(record: &MemoryRecord) -> Timestamp {
415 match record {
416 MemoryRecord::Episodic(record) => record.created_at,
417 MemoryRecord::Semantic(record) => record.created_at,
418 MemoryRecord::Working(record) => record.created_at,
419 MemoryRecord::Procedural(record) => record.created_at,
420 }
421}
422
423fn memory_record_revision_chain(record: &MemoryRecord) -> (LogicalMemoryId, u32) {
424 match record {
425 MemoryRecord::Episodic(record) => (record.logical_memory_id, record.version),
426 MemoryRecord::Semantic(record) => (record.logical_memory_id, record.version),
427 MemoryRecord::Working(record) => (record.logical_memory_id, record.version),
428 MemoryRecord::Procedural(record) => (record.logical_memory_id, record.version),
429 }
430}
431
432fn collect_semantic_logical_ids(results: &[RecallResult]) -> Vec<LogicalMemoryId> {
433 let mut logical_memory_ids = Vec::new();
434 let mut seen = HashSet::new();
435
436 for result in results {
437 let MemoryRecord::Semantic(record) = &result.record else {
438 continue;
439 };
440
441 if seen.insert(record.logical_memory_id) {
442 logical_memory_ids.push(record.logical_memory_id);
443 }
444 }
445
446 logical_memory_ids
447}
448
449fn collect_episodic_logical_ids(results: &[RecallResult]) -> Vec<LogicalMemoryId> {
450 let mut logical_memory_ids = Vec::new();
451 let mut seen = HashSet::new();
452
453 for result in results {
454 let MemoryRecord::Episodic(record) = &result.record else {
455 continue;
456 };
457
458 if seen.insert(record.logical_memory_id) {
459 logical_memory_ids.push(record.logical_memory_id);
460 }
461 }
462
463 logical_memory_ids
464}
465
466#[derive(Clone, Copy)]
467pub(super) enum ResolvedRecallSnapshot {
468 Observed(Timestamp),
469 Recorded(Timestamp),
470 Revision {
471 cutoff: Timestamp,
472 revision_id: RevisionId,
473 logical_memory_id: LogicalMemoryId,
474 version: u32,
475 },
476}
477
478impl ResolvedRecallSnapshot {
479 pub(super) fn contains_recorded_revision(
480 self,
481 created_at: Timestamp,
482 revision_id: RevisionId,
483 ) -> bool {
484 match self {
485 Self::Observed(_) => false,
486 Self::Recorded(cutoff) => created_at <= cutoff,
487 Self::Revision {
488 cutoff,
489 revision_id: boundary_revision_id,
490 ..
491 } => {
492 created_at < cutoff || (created_at == cutoff && revision_id <= boundary_revision_id)
493 }
494 }
495 }
496
497 pub(super) fn contains_recorded_revision_for_chain(
498 self,
499 logical_memory_id: LogicalMemoryId,
500 version: u32,
501 created_at: Timestamp,
502 revision_id: RevisionId,
503 ) -> bool {
504 match self {
505 Self::Revision {
506 cutoff,
507 revision_id: boundary_revision_id,
508 logical_memory_id: boundary_logical_memory_id,
509 version: boundary_version,
510 } if created_at == cutoff && logical_memory_id == boundary_logical_memory_id => {
511 version < boundary_version
512 || (version == boundary_version && revision_id <= boundary_revision_id)
513 }
514 _ => self.contains_recorded_revision(created_at, revision_id),
515 }
516 }
517}
518
519impl HirnDB {
520 pub(crate) async fn store_semantic(&self, mut record: SemanticRecord) -> HirnResult<MemoryId> {
526 self.enforce(
528 record.provenance.created_by.as_str(),
529 crate::policy::Action::Remember,
530 &self.config.default_realm,
531 record.namespace.as_str(),
532 )
533 .await?;
534
535 match self.config.text_retention {
537 hirn_core::TextRetention::Full => {}
538 hirn_core::TextRetention::SummaryOnly | hirn_core::TextRetention::None => {
539 record.description = String::new();
540 }
541 }
542
543 normalize_semantic_record_timestamps(&mut record);
544
545 let id = record.id;
546 let content_preview = record.concept.chars().take(120).collect::<String>();
547 let embedding = record.embedding.clone();
548 let confidence = record.confidence;
549 let created_at = record.created_at;
550 let namespace = record.namespace.clone();
551
552 {
554 let escaped_ns = record.namespace.as_str().replace('\'', "''");
555 let escaped_concept = record.concept.replace('\'', "''");
556 let agent_id = record.provenance.created_by.as_str();
557 let mut batches = self
558 .storage_runtime
559 .scan_stream(
560 hirn_storage::datasets::semantic::DATASET_NAME,
561 hirn_storage::store::ScanOptions {
562 filter: Some(format!(
563 "namespace = '{}' AND concept = '{}'",
564 escaped_ns, escaped_concept
565 )),
566 ..Default::default()
567 },
568 )
569 .await
570 .map_err(HirnError::storage)?;
571 let mut heads = HashMap::new();
572 while let Some(batch) = batches.try_next().await.map_err(HirnError::storage)? {
573 let recs = hirn_storage::datasets::semantic::from_batch(&batch)
574 .map_err(HirnError::storage)?;
575 for rec in recs {
576 upsert_semantic_head(&mut heads, rec);
577 }
578 }
579 if heads.into_values().any(|r| {
580 r.provenance.created_by.as_str() == agent_id && semantic_record_is_live(&r)
581 }) {
582 return Err(HirnError::AlreadyExists(format!(
583 "concept '{}' already exists in namespace '{}' for agent '{}'",
584 record.concept, record.namespace, record.provenance.created_by
585 )));
586 }
587 }
588
589 if let Some(ref emb) = embedding {
591 if emb.len() != self.config.embedding_dimensions.as_usize() {
592 return Err(HirnError::InvalidInput(format!(
593 "embedding dimension mismatch: expected {}, got {}",
594 self.config.embedding_dimensions.as_usize(),
595 emb.len()
596 )));
597 }
598 }
599
600 let envelope = build_semantic_create_envelope(id)?;
601 hirn_storage::append_mutation_envelope(self.storage_backend(), &envelope)
602 .await
603 .map_err(HirnError::storage)?;
604
605 if let Err(error) = self
607 .cached_graph()
608 .add_node(
609 id,
610 Layer::Semantic,
611 confidence,
612 created_at,
613 namespace.clone(),
614 )
615 .await
616 {
617 self.finalize_semantic_create_failure(
618 &envelope,
619 id,
620 error.to_string(),
621 true,
622 "graph add_node",
623 )
624 .await;
625 return Err(error);
626 }
627
628 if let Some(ref emb) = embedding {
630 let candidates = self.find_similarity_candidates(emb).await;
631 if let Err(error) = self.apply_similarity_edges(id, &candidates).await {
632 let cleanup_applied = match self.remove_semantic_graph_nodes_if_present(&[id]).await
633 {
634 Ok(()) => true,
635 Err(cleanup_error) => {
636 tracing::warn!(
637 id = %id,
638 envelope_id = %envelope.id,
639 error = %cleanup_error,
640 "semantic create graph cleanup incomplete after similarity edge error"
641 );
642 false
643 }
644 };
645 self.finalize_semantic_create_failure(
646 &envelope,
647 id,
648 error.to_string(),
649 cleanup_applied,
650 "similarity edge",
651 )
652 .await;
653 return Err(error);
654 }
655 }
656
657 let dims = self.config.embedding_dimensions.as_usize();
659 let batch = match hirn_storage::datasets::semantic::to_batch(
660 std::slice::from_ref(&record),
661 dims,
662 ) {
663 Ok(batch) => batch,
664 Err(error) => {
665 let storage_error = HirnError::storage(error);
666 let cleanup_applied = match self.remove_semantic_graph_nodes_if_present(&[id]).await
667 {
668 Ok(()) => true,
669 Err(cleanup_error) => {
670 tracing::warn!(
671 id = %id,
672 envelope_id = %envelope.id,
673 error = %cleanup_error,
674 "semantic create graph cleanup incomplete after semantic to_batch error"
675 );
676 false
677 }
678 };
679 self.finalize_semantic_create_failure(
680 &envelope,
681 id,
682 storage_error.to_string(),
683 cleanup_applied,
684 "semantic to_batch",
685 )
686 .await;
687 return Err(storage_error);
688 }
689 };
690 if let Err(e) = self
691 .storage_runtime
692 .append(hirn_storage::datasets::semantic::DATASET_NAME, batch)
693 .await
694 {
695 let error = HirnError::storage(e);
696 let cleanup_applied = match self.remove_semantic_graph_nodes_if_present(&[id]).await {
697 Ok(()) => true,
698 Err(cleanup_error) => {
699 tracing::warn!(
700 id = %id,
701 envelope_id = %envelope.id,
702 error = %cleanup_error,
703 "semantic create graph cleanup incomplete after semantic append error"
704 );
705 false
706 }
707 };
708 self.finalize_semantic_create_failure(
709 &envelope,
710 id,
711 error.to_string(),
712 cleanup_applied,
713 "semantic append",
714 )
715 .await;
716 return Err(error);
717 }
718
719 self.cache_semantic_head(&record);
720
721 self.emit_scoped(
722 record.namespace.as_str(),
723 record.provenance.created_by.as_str(),
724 MemoryEvent::SemanticCreated {
725 id,
726 concept_name: content_preview,
727 },
728 )
729 .await;
730 if let Err(error) = hirn_storage::update_mutation_envelope_state(
731 self.storage_backend(),
732 &envelope.id,
733 hirn_storage::MutationEnvelopeState::Applied,
734 None,
735 )
736 .await
737 {
738 tracing::warn!(
739 id = %id,
740 envelope_id = %envelope.id,
741 error = %error,
742 "semantic create mutation envelope finalize failed; recovery will retry"
743 );
744 }
745 Ok(id)
746 }
747
748 pub(crate) async fn batch_store_semantic(
755 &self,
756 records: Vec<SemanticRecord>,
757 ) -> Vec<HirnResult<MemoryId>> {
758 if records.is_empty() {
759 return Vec::new();
760 }
761
762 let n = records.len();
763
764 let agent_id = records[0].provenance.created_by.clone();
766 for rec in records.iter().skip(1) {
767 if rec.provenance.created_by != agent_id {
768 return (0..n)
769 .map(|_| {
770 Err(HirnError::InvalidInput(
771 "batch_store_semantic: all records must have the same agent_id".into(),
772 ))
773 })
774 .collect();
775 }
776 }
777
778 {
780 let mut checked_namespaces = HashSet::new();
781 for rec in &records {
782 if checked_namespaces.insert(rec.namespace.clone()) {
783 if let Err(e) = self
784 .enforce(
785 agent_id.as_str(),
786 crate::policy::Action::Remember,
787 &self.config.default_realm,
788 rec.namespace.as_str(),
789 )
790 .await
791 {
792 let msg = format!("{e}");
793 return (0..n)
794 .map(|_| Err(HirnError::AccessDenied(msg.clone())))
795 .collect();
796 }
797 }
798 }
799 }
800
801 let mut results: Vec<Option<HirnResult<MemoryId>>> = (0..n).map(|_| None).collect();
803
804 let mut records: Vec<(usize, SemanticRecord)> = records
806 .into_iter()
807 .enumerate()
808 .map(|(idx, mut rec)| {
809 match self.config.text_retention {
810 hirn_core::TextRetention::Full => {}
811 hirn_core::TextRetention::SummaryOnly | hirn_core::TextRetention::None => {
812 rec.description = String::new();
813 }
814 }
815 normalize_semantic_record_timestamps(&mut rec);
816 (idx, rec)
817 })
818 .collect();
819
820 {
824 let exists = self
825 .storage_runtime
826 .exists(hirn_storage::datasets::semantic::DATASET_NAME)
827 .await
828 .unwrap_or(false);
829 if exists {
830 let mut pairs: Vec<(String, String)> = Vec::new();
832 for (_, rec) in &records {
833 pairs.push((rec.namespace.as_str().to_owned(), rec.concept.clone()));
834 }
835
836 let clauses: Vec<String> = pairs
838 .iter()
839 .map(|(ns, concept)| {
840 let escaped_ns = ns.replace('\'', "''");
841 let escaped_concept = concept.replace('\'', "''");
842 format!(
843 "(namespace = '{}' AND concept = '{}')",
844 escaped_ns, escaped_concept
845 )
846 })
847 .collect();
848 let filter = clauses.join(" OR ");
849
850 let opts = hirn_storage::store::ScanOptions {
851 filter: Some(filter),
852 ..Default::default()
853 };
854 let mut batches = self
855 .storage_runtime
856 .scan_stream(hirn_storage::datasets::semantic::DATASET_NAME, opts)
857 .await
858 .ok();
859
860 let mut existing: HashSet<(String, String)> = HashSet::new();
862 if let Some(batches) = batches.as_mut() {
863 let mut heads = HashMap::new();
864 while let Ok(Some(batch)) = batches.try_next().await {
865 if let Ok(recs) = hirn_storage::datasets::semantic::from_batch(&batch) {
866 for rec in recs {
867 upsert_semantic_head(&mut heads, rec);
868 }
869 }
870 }
871 for r in heads.into_values() {
872 if r.provenance.created_by.as_str() == agent_id.as_str()
873 && semantic_record_is_live(&r)
874 {
875 existing.insert((r.namespace.to_string(), r.concept.clone()));
876 }
877 }
878 }
879
880 let mut batch_seen: HashSet<(String, String)> = HashSet::new();
882 records.retain(|(idx, rec)| {
883 let key = (rec.namespace.to_string(), rec.concept.clone());
884 if existing.contains(&key) {
885 results[*idx] = Some(Err(HirnError::AlreadyExists(format!(
886 "concept '{}' already exists in namespace '{}' for agent '{}'",
887 rec.concept, rec.namespace, agent_id
888 ))));
889 false
890 } else if !batch_seen.insert(key) {
891 results[*idx] = Some(Err(HirnError::AlreadyExists(format!(
892 "duplicate concept '{}' in batch for namespace '{}'",
893 rec.concept, rec.namespace
894 ))));
895 false
896 } else {
897 true
898 }
899 });
900 }
901 }
902
903 if records.is_empty() {
904 return results
905 .into_iter()
906 .map(|r| r.unwrap_or_else(|| Err(HirnError::InvalidInput("unreachable".into()))))
907 .collect();
908 }
909
910 records.retain(|(idx, rec)| {
912 if let Some(ref emb) = rec.embedding {
913 if emb.len() != self.config.embedding_dimensions.as_usize() {
914 results[*idx] = Some(Err(HirnError::InvalidInput(format!(
915 "embedding dimension mismatch: expected {}, got {}",
916 self.config.embedding_dimensions.as_usize(),
917 emb.len()
918 ))));
919 return false;
920 }
921 }
922 true
923 });
924
925 if records.is_empty() {
926 return results
927 .into_iter()
928 .map(|r| r.unwrap_or_else(|| Err(HirnError::InvalidInput("unreachable".into()))))
929 .collect();
930 }
931
932 struct PreparedRecord {
934 idx: usize,
935 record: SemanticRecord,
936 content_preview: String,
937 create_envelope: hirn_storage::MutationEnvelopeRecord,
938 }
939 let mut prepared: Vec<PreparedRecord> = Vec::with_capacity(records.len());
940
941 for (idx, rec) in records {
942 let id = rec.id;
943 let content_preview = rec.concept.chars().take(120).collect::<String>();
944 let create_envelope = match build_semantic_create_envelope(id) {
945 Ok(envelope) => envelope,
946 Err(error) => {
947 results[idx] = Some(Err(error));
948 continue;
949 }
950 };
951
952 prepared.push(PreparedRecord {
953 idx,
954 record: rec,
955 content_preview,
956 create_envelope,
957 });
958 }
959
960 if prepared.is_empty() {
961 return results
962 .into_iter()
963 .map(|r| r.unwrap_or_else(|| Err(HirnError::InvalidInput("unreachable".into()))))
964 .collect();
965 }
966
967 if let Err(error) = hirn_storage::append_mutation_envelopes(
968 self.storage_backend(),
969 &prepared
970 .iter()
971 .map(|record| record.create_envelope.clone())
972 .collect::<Vec<_>>(),
973 )
974 .await
975 {
976 let message = error.to_string();
977 for record in &prepared {
978 results[record.idx] = Some(Err(HirnError::storage(message.clone())));
979 }
980 return results
981 .into_iter()
982 .map(|r| {
983 r.unwrap_or_else(|| {
984 Err(HirnError::storage("semantic create envelope append failed"))
985 })
986 })
987 .collect();
988 }
989
990 let mut graph_prepared = Vec::with_capacity(prepared.len());
991 for prepared_record in prepared {
992 let id = prepared_record.record.id;
993
994 if let Err(error) = self
996 .cached_graph()
997 .add_node(
998 id,
999 Layer::Semantic,
1000 prepared_record.record.confidence,
1001 prepared_record.record.created_at,
1002 prepared_record.record.namespace,
1003 )
1004 .await
1005 {
1006 self.finalize_semantic_create_failure(
1007 &prepared_record.create_envelope,
1008 id,
1009 error.to_string(),
1010 true,
1011 "graph add_node",
1012 )
1013 .await;
1014 results[prepared_record.idx] = Some(Err(error));
1015 continue;
1016 }
1017
1018 if let Some(ref emb) = prepared_record.record.embedding {
1020 let candidates = self.find_similarity_candidates(emb).await;
1021 if let Err(error) = self.apply_similarity_edges(id, &candidates).await {
1022 let cleanup_applied = match self
1023 .remove_semantic_graph_nodes_if_present(&[id])
1024 .await
1025 {
1026 Ok(()) => true,
1027 Err(cleanup_error) => {
1028 tracing::warn!(
1029 id = %id,
1030 envelope_id = %prepared_record.create_envelope.id,
1031 error = %cleanup_error,
1032 "semantic create graph cleanup incomplete after similarity edge error"
1033 );
1034 false
1035 }
1036 };
1037 self.finalize_semantic_create_failure(
1038 &prepared_record.create_envelope,
1039 id,
1040 error.to_string(),
1041 cleanup_applied,
1042 "similarity edge",
1043 )
1044 .await;
1045 results[prepared_record.idx] = Some(Err(error));
1046 continue;
1047 }
1048 }
1049
1050 graph_prepared.push(prepared_record);
1051 }
1052
1053 let prepared = graph_prepared;
1054 if prepared.is_empty() {
1055 return results
1056 .into_iter()
1057 .map(|r| r.unwrap_or_else(|| Err(HirnError::InvalidInput("unreachable".into()))))
1058 .collect();
1059 }
1060
1061 if !prepared.is_empty() {
1063 let lance_records = prepared
1064 .iter()
1065 .map(|record| record.record.clone())
1066 .collect::<Vec<_>>();
1067 let dims = self.config.embedding_dimensions.as_usize();
1068 match hirn_storage::datasets::semantic::to_batch(&lance_records, dims) {
1069 Ok(batch) => {
1070 if let Err(e) = self
1071 .storage_runtime
1072 .append(hirn_storage::datasets::semantic::DATASET_NAME, batch)
1073 .await
1074 {
1075 tracing::error!(
1076 count = lance_records.len(),
1077 error = %e,
1078 "batch_store_semantic: LanceDB batch append failed"
1079 );
1080 let msg = format!("{e}");
1081 for p in &prepared {
1082 let cleanup_applied = match self
1083 .remove_semantic_graph_nodes_if_present(&[p.record.id])
1084 .await
1085 {
1086 Ok(()) => true,
1087 Err(cleanup_error) => {
1088 tracing::warn!(
1089 id = %p.record.id,
1090 envelope_id = %p.create_envelope.id,
1091 error = %cleanup_error,
1092 "semantic create graph cleanup incomplete after semantic append error"
1093 );
1094 false
1095 }
1096 };
1097 self.finalize_semantic_create_failure(
1098 &p.create_envelope,
1099 p.record.id,
1100 msg.clone(),
1101 cleanup_applied,
1102 "semantic append",
1103 )
1104 .await;
1105 results[p.idx] = Some(Err(HirnError::StorageError(msg.clone().into())));
1106 }
1107 return results
1108 .into_iter()
1109 .map(|r| {
1110 r.unwrap_or_else(|| {
1111 Err(HirnError::storage("LanceDB append failed"))
1112 })
1113 })
1114 .collect();
1115 }
1116
1117 for record in &lance_records {
1118 self.cache_semantic_head(record);
1119 }
1120 }
1121 Err(e) => {
1122 tracing::error!(
1123 count = lance_records.len(),
1124 error = %e,
1125 "batch_store_semantic: LanceDB to_batch failed"
1126 );
1127 let msg = format!("{e}");
1128 for p in &prepared {
1129 let cleanup_applied = match self
1130 .remove_semantic_graph_nodes_if_present(&[p.record.id])
1131 .await
1132 {
1133 Ok(()) => true,
1134 Err(cleanup_error) => {
1135 tracing::warn!(
1136 id = %p.record.id,
1137 envelope_id = %p.create_envelope.id,
1138 error = %cleanup_error,
1139 "semantic create graph cleanup incomplete after semantic to_batch error"
1140 );
1141 false
1142 }
1143 };
1144 self.finalize_semantic_create_failure(
1145 &p.create_envelope,
1146 p.record.id,
1147 msg.clone(),
1148 cleanup_applied,
1149 "semantic to_batch",
1150 )
1151 .await;
1152 results[p.idx] = Some(Err(HirnError::StorageError(msg.clone().into())));
1153 }
1154 return results
1155 .into_iter()
1156 .map(|r| {
1157 r.unwrap_or_else(|| Err(HirnError::storage("LanceDB to_batch failed")))
1158 })
1159 .collect();
1160 }
1161 }
1162 }
1163
1164 for p in &prepared {
1166 results[p.idx] = Some(Ok(p.record.id));
1167 self.emit_scoped(
1168 p.record.namespace.as_str(),
1169 p.record.provenance.created_by.as_str(),
1170 MemoryEvent::SemanticCreated {
1171 id: p.record.id,
1172 concept_name: p.content_preview.clone(),
1173 },
1174 )
1175 .await;
1176 if let Err(error) = hirn_storage::update_mutation_envelope_state(
1177 self.storage_backend(),
1178 &p.create_envelope.id,
1179 hirn_storage::MutationEnvelopeState::Applied,
1180 None,
1181 )
1182 .await
1183 {
1184 tracing::warn!(
1185 id = %p.record.id,
1186 envelope_id = %p.create_envelope.id,
1187 error = %error,
1188 "semantic create mutation envelope finalize failed; recovery will retry"
1189 );
1190 }
1191 }
1192
1193 results
1194 .into_iter()
1195 .map(|r| r.unwrap_or_else(|| Err(HirnError::InvalidInput("unreachable".into()))))
1196 .collect()
1197 }
1198
1199 pub(crate) async fn read_semantic_record(&self, id: MemoryId) -> HirnResult<SemanticRecord> {
1201 let mut batches = self
1202 .storage_runtime
1203 .scan_stream(
1204 hirn_storage::datasets::semantic::DATASET_NAME,
1205 hirn_storage::store::ScanOptions {
1206 exact_filter: Some(hirn_storage::store::ExactMatchFilter::utf8_value(
1207 "id",
1208 id.to_string(),
1209 )),
1210 limit: Some(1),
1211 ..Default::default()
1212 },
1213 )
1214 .await
1215 .map_err(HirnError::storage)?;
1216
1217 while let Some(batch) = batches.try_next().await.map_err(HirnError::storage)? {
1218 let recs =
1219 hirn_storage::datasets::semantic::from_batch(&batch).map_err(HirnError::storage)?;
1220 if let Some(r) = recs.into_iter().next() {
1221 return Ok(r);
1222 }
1223 }
1224 Err(HirnError::NotFound(format!("semantic record {id}")))
1225 }
1226
1227 async fn overwrite_semantic_record(&self, record: &SemanticRecord) -> HirnResult<()> {
1229 let dims = self.config.embedding_dimensions.as_usize();
1230 let exact_filter =
1231 hirn_storage::store::ExactMatchFilter::utf8_value("id", record.id.to_string());
1232 self.storage_runtime
1233 .delete_exact(
1234 hirn_storage::datasets::semantic::DATASET_NAME,
1235 &exact_filter,
1236 )
1237 .await
1238 .map_err(|e| HirnError::storage(e))?;
1239 let batch = hirn_storage::datasets::semantic::to_batch(std::slice::from_ref(record), dims)
1240 .map_err(|e| HirnError::storage(e))?;
1241 self.storage_runtime
1242 .append(hirn_storage::datasets::semantic::DATASET_NAME, batch)
1243 .await
1244 .map_err(|e| HirnError::storage(e))?;
1245 self.evict_semantic_head(record.logical_memory_id);
1246 Ok(())
1247 }
1248
1249 async fn append_semantic_record(&self, record: &SemanticRecord) -> HirnResult<()> {
1250 let dims = self.config.embedding_dimensions.as_usize();
1251 let batch = hirn_storage::datasets::semantic::to_batch(std::slice::from_ref(record), dims)
1252 .map_err(HirnError::storage)?;
1253 self.storage_runtime
1254 .append(hirn_storage::datasets::semantic::DATASET_NAME, batch)
1255 .await
1256 .map_err(HirnError::storage)?;
1257 Ok(())
1258 }
1259
1260 async fn append_semantic_records(&self, records: &[SemanticRecord]) -> HirnResult<()> {
1261 if records.is_empty() {
1262 return Ok(());
1263 }
1264
1265 let dims = self.config.embedding_dimensions.as_usize();
1266 let batch = hirn_storage::datasets::semantic::to_batch(records, dims)
1267 .map_err(HirnError::storage)?;
1268 self.storage_runtime
1269 .append(hirn_storage::datasets::semantic::DATASET_NAME, batch)
1270 .await
1271 .map_err(HirnError::storage)?;
1272 Ok(())
1273 }
1274
1275 async fn finalize_semantic_create_failure(
1276 &self,
1277 envelope: &hirn_storage::MutationEnvelopeRecord,
1278 record_id: MemoryId,
1279 error_message: String,
1280 cleanup_applied: bool,
1281 stage: &'static str,
1282 ) {
1283 if cleanup_applied {
1284 if let Err(update_error) = hirn_storage::update_mutation_envelope_state(
1285 self.storage_backend(),
1286 &envelope.id,
1287 hirn_storage::MutationEnvelopeState::Failed,
1288 Some(error_message.clone()),
1289 )
1290 .await
1291 {
1292 tracing::warn!(
1293 record_id = %record_id,
1294 envelope_id = %envelope.id,
1295 stage = stage,
1296 error = %update_error,
1297 "semantic create mutation envelope fail-fast finalize failed"
1298 );
1299 }
1300 } else {
1301 tracing::warn!(
1302 record_id = %record_id,
1303 envelope_id = %envelope.id,
1304 stage = stage,
1305 error = %error_message,
1306 "semantic create mutation cleanup incomplete; recovery will retry"
1307 );
1308 }
1309 }
1310
1311 fn semantic_logical_exact_filter(
1312 logical_memory_id: LogicalMemoryId,
1313 ) -> hirn_storage::store::ExactMatchFilter {
1314 hirn_storage::store::ExactMatchFilter::utf8_value(
1315 "logical_memory_id",
1316 logical_memory_id.to_string(),
1317 )
1318 }
1319
1320 fn semantic_logical_exact_filter_many(
1321 logical_memory_ids: &[LogicalMemoryId],
1322 ) -> Option<hirn_storage::store::ExactMatchFilter> {
1323 hirn_storage::store::ExactMatchFilter::utf8_values(
1324 "logical_memory_id",
1325 logical_memory_ids
1326 .iter()
1327 .map(ToString::to_string)
1328 .collect::<Vec<_>>(),
1329 )
1330 }
1331
1332 #[cfg(test)]
1333 fn semantic_revision_exact_filter(
1334 revision_id: RevisionId,
1335 ) -> hirn_storage::store::ExactMatchFilter {
1336 hirn_storage::store::ExactMatchFilter::utf8_value("revision_id", revision_id.to_string())
1337 }
1338
1339 fn cached_semantic_head(&self, logical_memory_id: LogicalMemoryId) -> Option<SemanticRecord> {
1340 self.semantic_head_cache_get(logical_memory_id)
1341 }
1342
1343 pub(crate) fn cache_semantic_head(&self, record: &SemanticRecord) {
1344 if let Some(current) = self.cached_semantic_head(record.logical_memory_id) {
1345 if !semantic_revision_is_newer(record, ¤t) {
1346 return;
1347 }
1348 }
1349 self.semantic_head_cache_put(record.clone());
1350 }
1351
1352 pub(crate) fn evict_semantic_head(&self, logical_memory_id: LogicalMemoryId) {
1353 self.semantic_head_cache_evict(logical_memory_id);
1354 }
1355
1356 pub(crate) fn cached_semantic_heads_snapshot(
1357 &self,
1358 ) -> HashMap<LogicalMemoryId, SemanticRecord> {
1359 self.semantic_head_cache_snapshot()
1360 }
1361
1362 pub(crate) fn replace_semantic_heads(&self, records: impl IntoIterator<Item = SemanticRecord>) {
1363 self.semantic_head_cache_replace(records);
1364 }
1365
1366 async fn load_semantic_head_from_storage(
1367 &self,
1368 logical_memory_id: LogicalMemoryId,
1369 ) -> HirnResult<SemanticRecord> {
1370 let mut batches = self
1371 .storage_runtime
1372 .scan_stream(
1373 hirn_storage::datasets::semantic::DATASET_NAME,
1374 hirn_storage::store::ScanOptions {
1375 exact_filter: Some(Self::semantic_logical_exact_filter(logical_memory_id)),
1376 order_by: Some(vec![
1377 hirn_storage::store::ScanOrdering::desc("version"),
1378 hirn_storage::store::ScanOrdering::desc("created_at_ms"),
1379 hirn_storage::store::ScanOrdering::desc("revision_id"),
1380 ]),
1381 limit: Some(1),
1382 ..Default::default()
1383 },
1384 )
1385 .await
1386 .map_err(HirnError::storage)?;
1387
1388 while let Some(batch) = batches.try_next().await.map_err(HirnError::storage)? {
1389 let recs =
1390 hirn_storage::datasets::semantic::from_batch(&batch).map_err(HirnError::storage)?;
1391 if let Some(record) = recs.into_iter().next() {
1392 return Ok(record);
1393 }
1394 }
1395
1396 Err(HirnError::NotFound(format!(
1397 "semantic logical memory {logical_memory_id}"
1398 )))
1399 }
1400
1401 async fn semantic_record_is_current_head(&self, record: &SemanticRecord) -> HirnResult<bool> {
1402 match self
1403 .load_semantic_head_from_storage(record.logical_memory_id)
1404 .await
1405 {
1406 Ok(head) => {
1407 self.cache_semantic_head(&head);
1408 Ok(head.id == record.id)
1409 }
1410 Err(HirnError::NotFound(_)) => Ok(false),
1411 Err(error) => Err(error),
1412 }
1413 }
1414
1415 pub(crate) async fn semantic_head_for_logical_id(
1416 &self,
1417 logical_memory_id: LogicalMemoryId,
1418 ) -> HirnResult<SemanticRecord> {
1419 if let Some(record) = self.cached_semantic_head(logical_memory_id) {
1420 return Ok(record);
1421 }
1422
1423 match self
1424 .load_semantic_head_from_storage(logical_memory_id)
1425 .await
1426 {
1427 Ok(record) => {
1428 self.cache_semantic_head(&record);
1429 Ok(record)
1430 }
1431 Err(HirnError::NotFound(_)) => {
1432 self.evict_semantic_head(logical_memory_id);
1433 Err(HirnError::NotFound(format!(
1434 "semantic logical memory {logical_memory_id}"
1435 )))
1436 }
1437 Err(error) => Err(error),
1438 }
1439 }
1440
1441 async fn semantic_heads_for_logical_ids(
1442 &self,
1443 logical_memory_ids: &[LogicalMemoryId],
1444 ) -> HirnResult<HashMap<LogicalMemoryId, SemanticRecord>> {
1445 let mut heads = HashMap::with_capacity(logical_memory_ids.len());
1446 let mut missing = Vec::new();
1447
1448 for &logical_memory_id in logical_memory_ids {
1449 if let Some(record) = self.cached_semantic_head(logical_memory_id) {
1450 heads.insert(logical_memory_id, record);
1451 } else {
1452 missing.push(logical_memory_id);
1453 }
1454 }
1455
1456 let Some(exact_filter) = Self::semantic_logical_exact_filter_many(&missing) else {
1457 return Ok(heads);
1458 };
1459
1460 let mut batches = self
1461 .storage_runtime
1462 .scan_stream(
1463 hirn_storage::datasets::semantic::DATASET_NAME,
1464 hirn_storage::store::ScanOptions {
1465 exact_filter: Some(exact_filter),
1466 ..Default::default()
1467 },
1468 )
1469 .await
1470 .map_err(HirnError::storage)?;
1471
1472 let mut loaded = HashMap::new();
1473 while let Some(batch) = batches.try_next().await.map_err(HirnError::storage)? {
1474 let recs =
1475 hirn_storage::datasets::semantic::from_batch(&batch).map_err(HirnError::storage)?;
1476 for record in recs {
1477 upsert_semantic_head(&mut loaded, record);
1478 }
1479 }
1480
1481 for (logical_memory_id, record) in loaded {
1482 self.cache_semantic_head(&record);
1483 heads.insert(logical_memory_id, record);
1484 }
1485
1486 for &logical_memory_id in &missing {
1487 if !heads.contains_key(&logical_memory_id) {
1488 self.evict_semantic_head(logical_memory_id);
1489 }
1490 }
1491
1492 Ok(heads)
1493 }
1494
1495 #[cfg(test)]
1496 pub(crate) async fn semantic_edit_target(&self, id: MemoryId) -> HirnResult<SemanticRecord> {
1497 let record = self.read_semantic_record(id).await?;
1498 let head = self
1499 .semantic_head_for_logical_id(record.logical_memory_id)
1500 .await?;
1501
1502 if head.revision_id == record.revision_id {
1503 if head.is_retracted() {
1504 Err(HirnError::InvalidInput(format!(
1505 "semantic logical memory {} is retracted",
1506 head.logical_memory_id
1507 )))
1508 } else if let Some(merged_into) = head.merged_into {
1509 Err(HirnError::InvalidInput(format!(
1510 "semantic logical memory {} has been merged into {}",
1511 head.logical_memory_id, merged_into
1512 )))
1513 } else {
1514 Ok(head)
1515 }
1516 } else {
1517 Err(HirnError::InvalidInput(format!(
1518 "semantic revision {id} is not the active head"
1519 )))
1520 }
1521 }
1522
1523 async fn resolve_active_semantic_head(&self, id: MemoryId) -> HirnResult<SemanticRecord> {
1524 let record = self.read_semantic_record(id).await?;
1525 let head = self
1526 .semantic_head_for_logical_id(record.logical_memory_id)
1527 .await?;
1528
1529 if head.is_retracted() {
1530 Err(HirnError::InvalidInput(format!(
1531 "semantic logical memory {} is retracted",
1532 head.logical_memory_id
1533 )))
1534 } else if let Some(merged_into) = head.merged_into {
1535 Err(HirnError::InvalidInput(format!(
1536 "semantic logical memory {} has been merged into {}",
1537 head.logical_memory_id, merged_into
1538 )))
1539 } else {
1540 Ok(head)
1541 }
1542 }
1543
1544 async fn live_semantic_heads_for_logical_ids(
1545 &self,
1546 logical_memory_ids: &[LogicalMemoryId],
1547 ) -> HirnResult<HashMap<LogicalMemoryId, SemanticRecord>> {
1548 let heads = self
1549 .semantic_heads_for_logical_ids(logical_memory_ids)
1550 .await?;
1551 Ok(heads
1552 .into_iter()
1553 .filter(|(_, record)| semantic_record_is_live(record))
1554 .collect())
1555 }
1556
1557 pub(crate) async fn semantic_revision_as_of(
1558 &self,
1559 logical_memory_id: LogicalMemoryId,
1560 cutoff: Timestamp,
1561 ) -> HirnResult<Option<SemanticRecord>> {
1562 let mut batches = self
1563 .storage_runtime
1564 .scan_stream(
1565 hirn_storage::datasets::semantic::DATASET_NAME,
1566 hirn_storage::store::ScanOptions {
1567 exact_filter: Some(Self::semantic_logical_exact_filter(logical_memory_id)),
1568 order_by: Some(vec![
1569 hirn_storage::store::ScanOrdering::asc("version"),
1570 hirn_storage::store::ScanOrdering::asc("created_at_ms"),
1571 ]),
1572 ..Default::default()
1573 },
1574 )
1575 .await
1576 .map_err(HirnError::storage)?;
1577
1578 let mut history = Vec::new();
1579 while let Some(batch) = batches.try_next().await.map_err(HirnError::storage)? {
1580 let recs =
1581 hirn_storage::datasets::semantic::from_batch(&batch).map_err(HirnError::storage)?;
1582 history.extend(recs);
1583 }
1584
1585 let Some(record) = semantic_snapshot_head_as_of(&history, cutoff) else {
1586 return Ok(None);
1587 };
1588
1589 if !semantic_record_is_live(&record) {
1590 return Ok(None);
1591 }
1592
1593 Ok(Some(record))
1594 }
1595
1596 async fn semantic_revision_recorded_at_snapshot(
1597 &self,
1598 logical_memory_id: LogicalMemoryId,
1599 snapshot: ResolvedRecallSnapshot,
1600 ) -> HirnResult<Option<SemanticRecord>> {
1601 let mut batches = self
1602 .storage_runtime
1603 .scan_stream(
1604 hirn_storage::datasets::semantic::DATASET_NAME,
1605 hirn_storage::store::ScanOptions {
1606 exact_filter: Some(Self::semantic_logical_exact_filter(logical_memory_id)),
1607 order_by: Some(vec![
1608 hirn_storage::store::ScanOrdering::asc("created_at_ms"),
1609 hirn_storage::store::ScanOrdering::asc("version"),
1610 ]),
1611 ..Default::default()
1612 },
1613 )
1614 .await
1615 .map_err(HirnError::storage)?;
1616
1617 let mut history = Vec::new();
1618 while let Some(batch) = batches.try_next().await.map_err(HirnError::storage)? {
1619 let recs =
1620 hirn_storage::datasets::semantic::from_batch(&batch).map_err(HirnError::storage)?;
1621 history.extend(recs);
1622 }
1623
1624 let Some(record) = semantic_snapshot_head_recorded_at_snapshot(&history, snapshot) else {
1625 return Ok(None);
1626 };
1627
1628 if !semantic_record_is_live(&record) {
1629 return Ok(None);
1630 }
1631
1632 Ok(Some(record))
1633 }
1634
1635 async fn semantic_revisions_for_logical_ids_at_snapshot(
1636 &self,
1637 logical_memory_ids: &[LogicalMemoryId],
1638 snapshot: ResolvedRecallSnapshot,
1639 ) -> HirnResult<HashMap<LogicalMemoryId, SemanticRecord>> {
1640 if let [logical_memory_id] = logical_memory_ids {
1641 let revision = match snapshot {
1642 ResolvedRecallSnapshot::Observed(cutoff) => {
1643 self.semantic_revision_as_of(*logical_memory_id, cutoff)
1644 .await?
1645 }
1646 recorded_snapshot => {
1647 self.semantic_revision_recorded_at_snapshot(
1648 *logical_memory_id,
1649 recorded_snapshot,
1650 )
1651 .await?
1652 }
1653 };
1654
1655 let mut resolved = HashMap::new();
1656 if let Some(revision) = revision {
1657 resolved.insert(*logical_memory_id, revision);
1658 }
1659 return Ok(resolved);
1660 }
1661
1662 let Some(exact_filter) = Self::semantic_logical_exact_filter_many(logical_memory_ids)
1663 else {
1664 return Ok(HashMap::new());
1665 };
1666
1667 let mut batches = self
1668 .storage_runtime
1669 .scan_stream(
1670 hirn_storage::datasets::semantic::DATASET_NAME,
1671 hirn_storage::store::ScanOptions {
1672 exact_filter: Some(exact_filter),
1673 order_by: Some(vec![
1674 hirn_storage::store::ScanOrdering::asc("version"),
1675 hirn_storage::store::ScanOrdering::asc("created_at_ms"),
1676 ]),
1677 ..Default::default()
1678 },
1679 )
1680 .await
1681 .map_err(HirnError::storage)?;
1682
1683 let mut histories: HashMap<LogicalMemoryId, Vec<SemanticRecord>> =
1684 HashMap::with_capacity(logical_memory_ids.len());
1685 while let Some(batch) = batches.try_next().await.map_err(HirnError::storage)? {
1686 let recs =
1687 hirn_storage::datasets::semantic::from_batch(&batch).map_err(HirnError::storage)?;
1688 for record in recs {
1689 histories
1690 .entry(record.logical_memory_id)
1691 .or_default()
1692 .push(record);
1693 }
1694 }
1695
1696 let mut resolved = HashMap::with_capacity(histories.len());
1697 for (logical_memory_id, mut history) in histories {
1698 history.sort_by(|left, right| {
1699 left.version
1700 .cmp(&right.version)
1701 .then_with(|| left.created_at.cmp(&right.created_at))
1702 });
1703
1704 let revision = match snapshot {
1705 ResolvedRecallSnapshot::Observed(cutoff) => {
1706 semantic_snapshot_head_as_of(&history, cutoff)
1707 }
1708 recorded_snapshot => {
1709 semantic_snapshot_head_recorded_at_snapshot(&history, recorded_snapshot)
1710 }
1711 };
1712
1713 if let Some(revision) = revision.filter(semantic_record_is_live) {
1714 resolved.insert(logical_memory_id, revision);
1715 }
1716 }
1717
1718 Ok(resolved)
1719 }
1720
1721 pub(super) async fn resolve_recall_snapshot(
1722 &self,
1723 snapshot: RecallSnapshot,
1724 ) -> HirnResult<ResolvedRecallSnapshot> {
1725 match snapshot {
1726 RecallSnapshot::Observed(cutoff) => Ok(ResolvedRecallSnapshot::Observed(cutoff)),
1727 RecallSnapshot::Recorded(cutoff) => Ok(ResolvedRecallSnapshot::Recorded(cutoff)),
1728 RecallSnapshot::Revision(revision_id) => {
1729 let boundary_record = self.get_memory(revision_id.as_memory_id()).await?;
1730 let (logical_memory_id, version) = memory_record_revision_chain(&boundary_record);
1731 Ok(ResolvedRecallSnapshot::Revision {
1732 cutoff: memory_record_recorded_at(&boundary_record),
1733 revision_id,
1734 logical_memory_id,
1735 version,
1736 })
1737 }
1738 }
1739 }
1740
1741 pub(crate) async fn semantic_revision_for_logical_id_at_snapshot(
1742 &self,
1743 logical_memory_id: LogicalMemoryId,
1744 snapshot: RecallSnapshot,
1745 ) -> HirnResult<Option<SemanticRecord>> {
1746 let snapshot = self.resolve_recall_snapshot(snapshot).await?;
1747 let mut semantic_revisions = self
1748 .semantic_revisions_for_logical_ids_at_snapshot(&[logical_memory_id], snapshot)
1749 .await?;
1750
1751 Ok(semantic_revisions.remove(&logical_memory_id))
1752 }
1753
1754 pub(crate) async fn normalize_current_recall_results(
1755 &self,
1756 results: Vec<RecallResult>,
1757 ) -> HirnResult<Vec<RecallResult>> {
1758 let semantic_heads = self
1759 .live_semantic_heads_for_logical_ids(&collect_semantic_logical_ids(&results))
1760 .await?;
1761 let episodic_heads = self
1762 .live_episodic_heads_for_logical_ids(&collect_episodic_logical_ids(&results))
1763 .await?;
1764 let mut resolved = Vec::with_capacity(results.len());
1765 let mut seen_semantic = HashSet::new();
1766 let mut seen_episodic = HashSet::new();
1767 let mut seen_procedural = HashSet::new();
1768 let mut seen_working = HashSet::new();
1769
1770 for mut result in results {
1771 match &result.record {
1772 MemoryRecord::Semantic(record) => {
1773 if !seen_semantic.insert(record.logical_memory_id) {
1774 continue;
1775 }
1776
1777 let Some(head) = semantic_heads.get(&record.logical_memory_id) else {
1778 continue;
1779 };
1780
1781 result.record = MemoryRecord::Semantic(head.clone());
1782 result.revision = Some(RevisionRef {
1783 logical_memory_id: head.logical_memory_id,
1784 revision_id: head.revision_id,
1785 state: RevisionState::Active,
1786 });
1787 resolved.push(result);
1788 }
1789 MemoryRecord::Episodic(record) => {
1790 if !seen_episodic.insert(record.logical_memory_id) {
1791 continue;
1792 }
1793
1794 let Some(head) = episodic_heads.get(&record.logical_memory_id) else {
1795 continue;
1796 };
1797
1798 result.record = MemoryRecord::Episodic(head.clone());
1799 result.revision = Some(RevisionRef {
1800 logical_memory_id: head.logical_memory_id,
1801 revision_id: head.revision_id,
1802 state: RevisionState::Active,
1803 });
1804 resolved.push(result);
1805 }
1806 MemoryRecord::Procedural(record) => {
1807 if !seen_procedural.insert(record.logical_memory_id) {
1808 continue;
1809 }
1810
1811 let Ok(head) = self
1812 .procedural_head_for_logical_id(record.logical_memory_id)
1813 .await
1814 else {
1815 continue;
1816 };
1817 if !head.is_live() {
1818 continue;
1819 }
1820
1821 result.record = MemoryRecord::Procedural(head.clone());
1822 result.revision = Some(RevisionRef {
1823 logical_memory_id: head.logical_memory_id,
1824 revision_id: head.revision_id,
1825 state: RevisionState::Active,
1826 });
1827 resolved.push(result);
1828 }
1829 MemoryRecord::Working(record) => {
1830 if !seen_working.insert(record.logical_memory_id) {
1831 continue;
1832 }
1833
1834 let Ok(head) = self
1835 .working_head_for_logical_id(record.logical_memory_id)
1836 .await
1837 else {
1838 continue;
1839 };
1840 if !head.is_live() {
1841 continue;
1842 }
1843
1844 result.record = MemoryRecord::Working(head.clone());
1845 result.revision = Some(RevisionRef {
1846 logical_memory_id: head.logical_memory_id,
1847 revision_id: head.revision_id,
1848 state: RevisionState::Active,
1849 });
1850 resolved.push(result);
1851 }
1852 }
1853 }
1854
1855 Ok(resolved)
1856 }
1857
1858 pub(crate) async fn normalize_recall_results_at_snapshot(
1859 &self,
1860 results: Vec<RecallResult>,
1861 snapshot: RecallSnapshot,
1862 ) -> HirnResult<Vec<RecallResult>> {
1863 let requested_snapshot = snapshot;
1864 let snapshot = self.resolve_recall_snapshot(snapshot).await?;
1865 let semantic_revisions = self
1866 .semantic_revisions_for_logical_ids_at_snapshot(
1867 &collect_semantic_logical_ids(&results),
1868 snapshot,
1869 )
1870 .await?;
1871 let mut resolved = Vec::with_capacity(results.len());
1872 let mut seen_semantic = HashSet::new();
1873 let mut seen_episodic = HashSet::new();
1874 let mut seen_procedural = HashSet::new();
1875 let mut seen_working = HashSet::new();
1876
1877 for mut result in results {
1878 match &result.record {
1879 MemoryRecord::Semantic(record) => {
1880 if !seen_semantic.insert(record.logical_memory_id) {
1881 continue;
1882 }
1883
1884 let Some(revision) = semantic_revisions.get(&record.logical_memory_id) else {
1885 continue;
1886 };
1887
1888 result.record = MemoryRecord::Semantic(revision.clone());
1889 result.revision = Some(RevisionRef {
1890 logical_memory_id: revision.logical_memory_id,
1891 revision_id: revision.revision_id,
1892 state: revision.logical_state(),
1893 });
1894 resolved.push(result);
1895 }
1896 MemoryRecord::Episodic(record) => {
1897 if !seen_episodic.insert(record.logical_memory_id) {
1898 continue;
1899 }
1900
1901 let Ok(Some(revision)) = self
1902 .episodic_revision_for_logical_id_at_snapshot(
1903 record.logical_memory_id,
1904 requested_snapshot,
1905 )
1906 .await
1907 else {
1908 continue;
1909 };
1910 if !revision.is_live() {
1911 continue;
1912 }
1913
1914 result.record = MemoryRecord::Episodic(revision.clone());
1915 result.revision = Some(RevisionRef {
1916 logical_memory_id: revision.logical_memory_id,
1917 revision_id: revision.revision_id,
1918 state: RevisionState::Active,
1919 });
1920 resolved.push(result);
1921 }
1922 MemoryRecord::Procedural(record) => {
1923 if !seen_procedural.insert(record.logical_memory_id) {
1924 continue;
1925 }
1926
1927 let Ok(Some(revision)) = self
1928 .procedural_revision_for_logical_id_at_snapshot(
1929 record.logical_memory_id,
1930 requested_snapshot,
1931 )
1932 .await
1933 else {
1934 continue;
1935 };
1936 if !revision.is_live() {
1937 continue;
1938 }
1939
1940 result.record = MemoryRecord::Procedural(revision.clone());
1941 result.revision = Some(RevisionRef {
1942 logical_memory_id: revision.logical_memory_id,
1943 revision_id: revision.revision_id,
1944 state: RevisionState::Active,
1945 });
1946 resolved.push(result);
1947 }
1948 MemoryRecord::Working(record) => {
1949 if !seen_working.insert(record.logical_memory_id) {
1950 continue;
1951 }
1952
1953 let Ok(Some(revision)) = self
1954 .working_revision_for_logical_id_at_snapshot(
1955 record.logical_memory_id,
1956 requested_snapshot,
1957 )
1958 .await
1959 else {
1960 continue;
1961 };
1962 if !revision.is_live() {
1963 continue;
1964 }
1965
1966 result.record = MemoryRecord::Working(revision.clone());
1967 result.revision = Some(RevisionRef {
1968 logical_memory_id: revision.logical_memory_id,
1969 revision_id: revision.revision_id,
1970 state: RevisionState::Active,
1971 });
1972 resolved.push(result);
1973 }
1974 }
1975 }
1976
1977 Ok(resolved)
1978 }
1979
1980 pub(crate) async fn get_semantic(&self, id: MemoryId) -> HirnResult<SemanticRecord> {
1985 let record = self.read_semantic_record(id).await?;
1986
1987 self.graph_runtime().buffer_semantic_access(id);
1989
1990 Ok(record)
1991 }
1992
1993 pub(crate) async fn get_semantic_by_concept(&self, name: &str) -> HirnResult<SemanticRecord> {
1995 self.get_semantic_by_concept_ns(name, &Namespace::default())
1996 .await
1997 }
1998
1999 pub(crate) async fn get_semantic_by_concept_ns(
2004 &self,
2005 name: &str,
2006 namespace: &Namespace,
2007 ) -> HirnResult<SemanticRecord> {
2008 let escaped_ns = namespace.as_str().replace('\'', "''");
2009 let escaped_concept = name.replace('\'', "''");
2010 let filter = format!(
2011 "namespace = '{}' AND concept = '{}'",
2012 escaped_ns, escaped_concept
2013 );
2014 let opts = hirn_storage::store::ScanOptions {
2015 filter: Some(filter),
2016 order_by: Some(vec![
2017 hirn_storage::store::ScanOrdering::desc("version"),
2018 hirn_storage::store::ScanOrdering::desc("created_at_ms"),
2019 hirn_storage::store::ScanOrdering::desc("revision_id"),
2020 ]),
2021 ..Default::default()
2022 };
2023 let mut batches = self
2024 .storage_runtime
2025 .scan_stream(hirn_storage::datasets::semantic::DATASET_NAME, opts)
2026 .await
2027 .map_err(HirnError::storage)?;
2028
2029 let mut candidates = Vec::new();
2030 while let Some(batch) = batches.try_next().await.map_err(HirnError::storage)? {
2031 let recs =
2032 hirn_storage::datasets::semantic::from_batch(&batch).map_err(HirnError::storage)?;
2033 candidates.extend(recs);
2034 }
2035
2036 if let Some(rec) = collapse_semantic_heads(candidates)
2037 .into_values()
2038 .find(semantic_record_is_live)
2039 {
2040 return Ok(rec);
2041 }
2042 Err(HirnError::NotFound(format!(
2043 "concept '{name}' in namespace '{namespace}'"
2044 )))
2045 }
2046
2047 pub(crate) async fn list_semantics(
2049 &self,
2050 filter: &SemanticFilter,
2051 ) -> HirnResult<Vec<SemanticRecord>> {
2052 let mut parts = Vec::new();
2053 if let Some(ref kt) = filter.knowledge_type {
2054 parts.push(format!("knowledge_type = '{:?}'", kt));
2055 }
2056 if let Some(min_conf) = filter.min_confidence {
2057 parts.push(format!("confidence >= {}", min_conf));
2058 }
2059 if let Some(ref ns) = filter.namespace {
2060 let escaped = ns.as_str().replace('\'', "''");
2061 parts.push(format!("namespace = '{}'", escaped));
2062 }
2063
2064 let lance_filter = if parts.is_empty() {
2065 None
2066 } else {
2067 Some(parts.join(" AND "))
2068 };
2069
2070 let opts = hirn_storage::store::ScanOptions {
2071 filter: lance_filter,
2072 ..Default::default()
2073 };
2074 let mut batches = self
2075 .storage_runtime
2076 .scan_stream(hirn_storage::datasets::semantic::DATASET_NAME, opts)
2077 .await
2078 .map_err(HirnError::storage)?;
2079
2080 let mut results = Vec::new();
2081 while let Some(batch) = batches.try_next().await.map_err(HirnError::storage)? {
2082 let recs =
2083 hirn_storage::datasets::semantic::from_batch(&batch).map_err(HirnError::storage)?;
2084 results.extend(recs);
2085 }
2086
2087 let mut heads: Vec<_> = collapse_semantic_heads(results)
2088 .into_values()
2089 .filter(semantic_record_is_live)
2090 .collect();
2091 heads.sort_by_key(|r| std::cmp::Reverse(r.version));
2092 if let Some(limit) = filter.limit {
2093 heads.truncate(limit);
2094 }
2095
2096 Ok(heads)
2097 }
2098
2099 pub(crate) async fn semantic_history(&self, id: MemoryId) -> HirnResult<Vec<SemanticRecord>> {
2101 let record = self.read_semantic_record(id).await?;
2102 let mut batches = self
2103 .storage_runtime
2104 .scan_stream(
2105 hirn_storage::datasets::semantic::DATASET_NAME,
2106 hirn_storage::store::ScanOptions {
2107 exact_filter: Some(Self::semantic_logical_exact_filter(
2108 record.logical_memory_id,
2109 )),
2110 order_by: Some(vec![
2111 hirn_storage::store::ScanOrdering::asc("version"),
2112 hirn_storage::store::ScanOrdering::asc("created_at_ms"),
2113 ]),
2114 ..Default::default()
2115 },
2116 )
2117 .await
2118 .map_err(HirnError::storage)?;
2119
2120 let mut history = Vec::new();
2121 while let Some(batch) = batches.try_next().await.map_err(HirnError::storage)? {
2122 let recs =
2123 hirn_storage::datasets::semantic::from_batch(&batch).map_err(HirnError::storage)?;
2124 history.extend(recs);
2125 }
2126
2127 history.sort_by(|left, right| {
2128 left.version
2129 .cmp(&right.version)
2130 .then_with(|| left.created_at.cmp(&right.created_at))
2131 });
2132 Ok(history)
2133 }
2134
2135 async fn contradiction_successor_links(
2136 &self,
2137 current: &SemanticRecord,
2138 ) -> Vec<ContradictionSuccessorLink> {
2139 let mut links = Vec::new();
2140 let mut seen_targets = HashSet::new();
2141
2142 for contradiction_id in ¤t.contradiction_ids {
2143 let (resolved_target, skip_graph_edge) = match self.get_memory(*contradiction_id).await
2144 {
2145 Ok(MemoryRecord::Semantic(record)) => match self
2146 .semantic_head_for_logical_id(record.logical_memory_id)
2147 .await
2148 {
2149 Ok(head) => (head.id, !semantic_record_is_live(&head)),
2150 Err(_) => (record.id, !semantic_record_is_live(&record)),
2151 },
2152 Ok(_) => (*contradiction_id, false),
2153 Err(_) => continue,
2154 };
2155
2156 if resolved_target == current.id || !seen_targets.insert(resolved_target) {
2157 continue;
2158 }
2159
2160 let mut template = self
2161 .cached_graph()
2162 .get_edges_between(current.id, *contradiction_id)
2163 .await
2164 .unwrap_or_default()
2165 .into_iter()
2166 .find(|edge| edge.relation == EdgeRelation::Contradicts);
2167
2168 if template.is_none() && resolved_target != *contradiction_id {
2169 template = self
2170 .cached_graph()
2171 .get_edges_between(current.id, resolved_target)
2172 .await
2173 .unwrap_or_default()
2174 .into_iter()
2175 .find(|edge| edge.relation == EdgeRelation::Contradicts);
2176 }
2177
2178 links.push(ContradictionSuccessorLink {
2179 target_id: resolved_target,
2180 weight: template.as_ref().map_or(1.0, |edge| {
2181 edge.confidence().unwrap_or(edge.weight).clamp(0.0, 1.0)
2182 }),
2183 metadata: template.map_or_else(Metadata::new, |edge| edge.metadata),
2184 skip_graph_edge,
2185 });
2186 }
2187
2188 links
2189 }
2190
2191 async fn attach_contradiction_successor_links(
2192 &self,
2193 current: &SemanticRecord,
2194 next: &mut SemanticRecord,
2195 ) {
2196 if current.contradiction_ids.is_empty() {
2197 next.contradiction_ids.clear();
2198 return;
2199 }
2200
2201 let links = self.contradiction_successor_links(current).await;
2202 let mut attached_ids = Vec::new();
2203
2204 for link in links {
2205 if link.target_id == next.id {
2206 continue;
2207 }
2208
2209 if !link.skip_graph_edge {
2210 match self
2211 .cached_graph()
2212 .add_edge(
2213 next.id,
2214 link.target_id,
2215 EdgeRelation::Contradicts,
2216 link.weight,
2217 link.metadata,
2218 )
2219 .await
2220 {
2221 Ok(_) => {}
2222 Err(error) => {
2223 let target_present = self
2224 .cached_graph()
2225 .has_node(link.target_id)
2226 .await
2227 .unwrap_or(false);
2228 if target_present {
2229 tracing::warn!(
2230 current_id = %current.id,
2231 next_id = %next.id,
2232 target_id = %link.target_id,
2233 error = %error,
2234 "failed to carry contradiction edge to semantic successor"
2235 );
2236 } else {
2237 tracing::debug!(
2238 current_id = %current.id,
2239 next_id = %next.id,
2240 target_id = %link.target_id,
2241 error = %error,
2242 "carrying contradiction lineage without graph edge for non-live target"
2243 );
2244 }
2245 }
2246 }
2247 }
2248
2249 attached_ids.push(link.target_id);
2250 }
2251
2252 attached_ids.sort();
2253 attached_ids.dedup();
2254 next.contradiction_ids = attached_ids;
2255 }
2256
2257 async fn resolve_contradiction_endpoint(
2258 &self,
2259 id: MemoryId,
2260 ) -> HirnResult<ContradictionEndpoint> {
2261 let record = match self.get_memory(id).await {
2262 Ok(record) => record,
2263 Err(HirnError::NotFound(_)) => {
2264 if self.cached_graph().has_node(id).await.unwrap_or(false) {
2265 return Ok(ContradictionEndpoint::Other(id));
2266 }
2267 return Err(HirnError::NotFound(format!("memory record {id}")));
2268 }
2269 Err(error) => return Err(error),
2270 };
2271
2272 match record {
2273 MemoryRecord::Semantic(record) => {
2274 let head = self
2275 .semantic_head_for_logical_id(record.logical_memory_id)
2276 .await?;
2277 if head.is_retracted() {
2278 return Err(HirnError::InvalidInput(format!(
2279 "semantic logical memory {} is retracted",
2280 head.logical_memory_id
2281 )));
2282 }
2283 if let Some(merged_into) = head.merged_into {
2284 return Err(HirnError::InvalidInput(format!(
2285 "semantic logical memory {} has been merged into {}",
2286 head.logical_memory_id, merged_into
2287 )));
2288 }
2289 Ok(ContradictionEndpoint::Semantic(Box::new(head)))
2290 }
2291 MemoryRecord::Episodic(record) => {
2292 let head = self
2293 .episodic_head_for_logical_id(record.logical_memory_id)
2294 .await?;
2295 if !head.is_live() {
2296 return Err(HirnError::InvalidInput(format!(
2297 "episodic logical memory {} is not live",
2298 head.logical_memory_id
2299 )));
2300 }
2301 Ok(ContradictionEndpoint::Other(head.id))
2302 }
2303 MemoryRecord::Procedural(record) => {
2304 let head = self
2305 .procedural_head_for_logical_id(record.logical_memory_id)
2306 .await?;
2307 if !head.is_live() {
2308 return Err(HirnError::InvalidInput(format!(
2309 "procedural logical memory {} is not live",
2310 head.logical_memory_id
2311 )));
2312 }
2313 Ok(ContradictionEndpoint::Other(head.id))
2314 }
2315 MemoryRecord::Working(record) => {
2316 let head = self
2317 .working_head_for_logical_id(record.logical_memory_id)
2318 .await?;
2319 if !head.is_live() {
2320 return Err(HirnError::InvalidInput(format!(
2321 "working logical memory {} is not live",
2322 head.logical_memory_id
2323 )));
2324 }
2325 Ok(ContradictionEndpoint::Other(head.id))
2326 }
2327 }
2328 }
2329
2330 fn prepare_semantic_contradiction_successor(
2331 &self,
2332 current: &SemanticRecord,
2333 next_id: MemoryId,
2334 contradiction_target_id: MemoryId,
2335 now: Timestamp,
2336 ) -> SemanticRecord {
2337 let mut next = current.clone();
2338 next.id = next_id;
2339 next.revision_id = RevisionId::from_memory_id(next_id);
2340 next.version = current.version + 1;
2341 next.revision_operation = RevisionOperation::Correct;
2342 next.revision_reason = Some("contradiction relation updated".to_string());
2343 next.revision_causation_id = Some(contradiction_target_id);
2344 next.created_at = now;
2345 next.updated_at = now;
2346 next.valid_from = current.valid_from;
2347 next.valid_until = None;
2348 next.superseded_by = None;
2349 next.merged_into = None;
2350 next.provenance.created_by = AgentId::well_known("system");
2351
2352 let old_contradictions = format_memory_id_list(¤t.contradiction_ids);
2353 let mut new_contradictions = current.contradiction_ids.clone();
2354 new_contradictions.push(contradiction_target_id);
2355 new_contradictions.sort_unstable();
2356 new_contradictions.dedup();
2357
2358 next.provenance
2359 .record_mutation(hirn_core::provenance::Mutation {
2360 timestamp: now,
2361 trigger: MutationTrigger::Manual,
2362 field: "contradiction_ids".to_string(),
2363 old_value: old_contradictions,
2364 new_value: format_memory_id_list(&new_contradictions),
2365 reason: "contradiction relation updated".to_string(),
2366 });
2367
2368 normalize_semantic_record_timestamps(&mut next);
2369 next
2370 }
2371
2372 async fn synchronize_contradiction_refs(
2373 &self,
2374 source: MemoryId,
2375 target: MemoryId,
2376 edge_spec: Option<(f32, Metadata)>,
2377 ) -> HirnResult<ContradictionSyncResult> {
2378 let source_endpoint = self.resolve_contradiction_endpoint(source).await?;
2379 let target_endpoint = self.resolve_contradiction_endpoint(target).await?;
2380 let source_id = source_endpoint.id();
2381 let target_id = target_endpoint.id();
2382
2383 if source_id == target_id {
2384 return Err(HirnError::InvalidInput(
2385 "a record cannot contradict itself".into(),
2386 ));
2387 }
2388
2389 if let (Some(source_record), Some(target_record)) =
2390 (source_endpoint.as_semantic(), target_endpoint.as_semantic())
2391 {
2392 if source_record.logical_memory_id == target_record.logical_memory_id {
2393 return Err(HirnError::InvalidInput(format!(
2394 "semantic logical memory {} cannot contradict itself",
2395 source_record.logical_memory_id
2396 )));
2397 }
2398 }
2399
2400 let semantic_pair_requires_successors =
2401 match (source_endpoint.as_semantic(), target_endpoint.as_semantic()) {
2402 (Some(source_record), Some(target_record)) => {
2403 !source_record.contradiction_ids.contains(&target_id)
2404 || !target_record.contradiction_ids.contains(&source_id)
2405 }
2406 _ => false,
2407 };
2408
2409 let source_requires_successor =
2410 match (source_endpoint.as_semantic(), target_endpoint.as_semantic()) {
2411 (Some(_), Some(_)) => semantic_pair_requires_successors,
2412 (Some(source_record), None) => {
2413 !source_record.contradiction_ids.contains(&target_id)
2414 }
2415 _ => false,
2416 };
2417 let target_requires_successor =
2418 match (source_endpoint.as_semantic(), target_endpoint.as_semantic()) {
2419 (Some(_), Some(_)) => semantic_pair_requires_successors,
2420 (None, Some(target_record)) => {
2421 !target_record.contradiction_ids.contains(&source_id)
2422 }
2423 _ => false,
2424 };
2425
2426 if !source_requires_successor && !target_requires_successor {
2427 let edge_id = if let Some((weight, metadata)) = edge_spec {
2428 let existing = self
2429 .cached_graph()
2430 .get_edges_between(source_id, target_id)
2431 .await
2432 .unwrap_or_default()
2433 .into_iter()
2434 .find(|edge| edge.relation == EdgeRelation::Contradicts)
2435 .map(|edge| edge.id);
2436
2437 match existing {
2438 Some(edge_id) => Some(edge_id),
2439 None => Some(
2440 self.cached_graph()
2441 .add_edge(
2442 source_id,
2443 target_id,
2444 EdgeRelation::Contradicts,
2445 weight,
2446 metadata,
2447 )
2448 .await?,
2449 ),
2450 }
2451 } else {
2452 None
2453 };
2454
2455 return Ok(ContradictionSyncResult {
2456 source_memory: source_id,
2457 target_memory: target_id,
2458 contradiction_edge: edge_id,
2459 });
2460 }
2461
2462 let now = Timestamp::now();
2463 let final_source_id = if source_requires_successor {
2464 MemoryId::new()
2465 } else {
2466 source_id
2467 };
2468 let final_target_id = if target_requires_successor {
2469 MemoryId::new()
2470 } else {
2471 target_id
2472 };
2473
2474 let mut source_prepared = source_endpoint
2475 .as_semantic()
2476 .filter(|_| source_requires_successor)
2477 .map(|current| PreparedSemanticContradictionSuccessor {
2478 current: current.clone(),
2479 next: self.prepare_semantic_contradiction_successor(
2480 current,
2481 final_source_id,
2482 final_target_id,
2483 now,
2484 ),
2485 });
2486 let mut target_prepared = target_endpoint
2487 .as_semantic()
2488 .filter(|_| target_requires_successor)
2489 .map(|current| PreparedSemanticContradictionSuccessor {
2490 current: current.clone(),
2491 next: self.prepare_semantic_contradiction_successor(
2492 current,
2493 final_target_id,
2494 final_source_id,
2495 now,
2496 ),
2497 });
2498
2499 let envelope = build_semantic_contradiction_sync_envelope(
2500 [source_prepared.as_ref(), target_prepared.as_ref()]
2501 .into_iter()
2502 .flatten()
2503 .map(|prepared| prepared.current.id)
2504 .collect(),
2505 [source_prepared.as_ref(), target_prepared.as_ref()]
2506 .into_iter()
2507 .flatten()
2508 .map(|prepared| prepared.next.id)
2509 .collect(),
2510 )?;
2511 hirn_storage::append_mutation_envelope(self.storage_backend(), &envelope)
2512 .await
2513 .map_err(HirnError::storage)?;
2514
2515 let mut added_nodes = Vec::new();
2516 for prepared in [source_prepared.as_ref(), target_prepared.as_ref()]
2517 .into_iter()
2518 .flatten()
2519 {
2520 if let Err(error) = self
2521 .cached_graph()
2522 .add_node(
2523 prepared.next.id,
2524 Layer::Semantic,
2525 prepared.next.confidence,
2526 prepared.next.created_at,
2527 prepared.next.namespace,
2528 )
2529 .await
2530 {
2531 for added_id in added_nodes.into_iter().rev() {
2532 let _ = self.cached_graph().remove_node(added_id).await;
2533 }
2534 if let Err(update_error) = hirn_storage::update_mutation_envelope_state(
2535 self.storage_backend(),
2536 &envelope.id,
2537 hirn_storage::MutationEnvelopeState::Failed,
2538 Some(error.to_string()),
2539 )
2540 .await
2541 {
2542 tracing::warn!(
2543 source_id = %source_id,
2544 target_id = %target_id,
2545 envelope_id = %envelope.id,
2546 error = %update_error,
2547 "semantic contradiction sync envelope fail-fast finalize failed after graph add_node error"
2548 );
2549 }
2550 return Err(error);
2551 }
2552 added_nodes.push(prepared.next.id);
2553 }
2554
2555 for prepared in [source_prepared.as_ref(), target_prepared.as_ref()]
2556 .into_iter()
2557 .flatten()
2558 {
2559 if let Some(ref embedding) = prepared.next.embedding {
2560 let candidates = self.find_similarity_candidates(embedding).await;
2561 if let Err(error) = self
2562 .apply_similarity_edges(prepared.next.id, &candidates)
2563 .await
2564 {
2565 for added_id in added_nodes.iter().rev().copied() {
2566 let _ = self.cached_graph().remove_node(added_id).await;
2567 }
2568 if let Err(update_error) = hirn_storage::update_mutation_envelope_state(
2569 self.storage_backend(),
2570 &envelope.id,
2571 hirn_storage::MutationEnvelopeState::Failed,
2572 Some(error.to_string()),
2573 )
2574 .await
2575 {
2576 tracing::warn!(
2577 source_id = %source_id,
2578 target_id = %target_id,
2579 envelope_id = %envelope.id,
2580 error = %update_error,
2581 "semantic contradiction sync envelope fail-fast finalize failed after similarity edge error"
2582 );
2583 }
2584 return Err(error);
2585 }
2586 }
2587 }
2588
2589 if let Some(prepared) = source_prepared.as_mut() {
2590 self.attach_contradiction_successor_links(&prepared.current, &mut prepared.next)
2591 .await;
2592 if semantic_pair_requires_successors {
2593 prepared
2594 .next
2595 .contradiction_ids
2596 .retain(|id| *id != target_id);
2597 }
2598 prepared.next.contradiction_ids.push(final_target_id);
2599 prepared.next.contradiction_ids.sort_unstable();
2600 prepared.next.contradiction_ids.dedup();
2601 }
2602 if let Some(prepared) = target_prepared.as_mut() {
2603 self.attach_contradiction_successor_links(&prepared.current, &mut prepared.next)
2604 .await;
2605 if semantic_pair_requires_successors {
2606 prepared
2607 .next
2608 .contradiction_ids
2609 .retain(|id| *id != source_id);
2610 }
2611 prepared.next.contradiction_ids.push(final_source_id);
2612 prepared.next.contradiction_ids.sort_unstable();
2613 prepared.next.contradiction_ids.dedup();
2614 }
2615
2616 let edge_id = if let Some((weight, metadata)) = edge_spec {
2617 match self
2618 .cached_graph()
2619 .add_edge(
2620 final_source_id,
2621 final_target_id,
2622 EdgeRelation::Contradicts,
2623 weight,
2624 metadata,
2625 )
2626 .await
2627 {
2628 Ok(edge_id) => Some(edge_id),
2629 Err(error) => {
2630 for added_id in added_nodes.iter().rev().copied() {
2631 let _ = self.cached_graph().remove_node(added_id).await;
2632 }
2633 if let Err(update_error) = hirn_storage::update_mutation_envelope_state(
2634 self.storage_backend(),
2635 &envelope.id,
2636 hirn_storage::MutationEnvelopeState::Failed,
2637 Some(error.to_string()),
2638 )
2639 .await
2640 {
2641 tracing::warn!(
2642 source_id = %source_id,
2643 target_id = %target_id,
2644 envelope_id = %envelope.id,
2645 error = %update_error,
2646 "semantic contradiction sync envelope fail-fast finalize failed after contradiction edge error"
2647 );
2648 }
2649 return Err(error);
2650 }
2651 }
2652 } else {
2653 None
2654 };
2655
2656 let next_records: Vec<SemanticRecord> =
2657 [source_prepared.as_ref(), target_prepared.as_ref()]
2658 .into_iter()
2659 .flatten()
2660 .map(|prepared| prepared.next.clone())
2661 .collect();
2662
2663 if let Err(error) = self.append_semantic_records(&next_records).await {
2664 for added_id in added_nodes.iter().rev().copied() {
2665 let _ = self.cached_graph().remove_node(added_id).await;
2666 }
2667 if let Err(update_error) = hirn_storage::update_mutation_envelope_state(
2668 self.storage_backend(),
2669 &envelope.id,
2670 hirn_storage::MutationEnvelopeState::Failed,
2671 Some(error.to_string()),
2672 )
2673 .await
2674 {
2675 tracing::warn!(
2676 source_id = %source_id,
2677 target_id = %target_id,
2678 envelope_id = %envelope.id,
2679 error = %update_error,
2680 "semantic contradiction sync envelope fail-fast finalize failed after semantic append error"
2681 );
2682 }
2683 return Err(error);
2684 }
2685
2686 if let Some(prepared) = source_prepared.as_ref() {
2687 self.cache_semantic_head(&prepared.next);
2688 }
2689 if let Some(prepared) = target_prepared.as_ref() {
2690 self.cache_semantic_head(&prepared.next);
2691 }
2692
2693 let mut predecessors_removed = true;
2694
2695 for prepared in [source_prepared.as_ref(), target_prepared.as_ref()]
2696 .into_iter()
2697 .flatten()
2698 {
2699 match self.cached_graph().has_node(prepared.current.id).await {
2700 Ok(true) => {
2701 if let Err(error) = self.cached_graph().remove_node(prepared.current.id).await {
2702 tracing::warn!(
2703 id = %prepared.current.id,
2704 error = %error,
2705 "failed to remove superseded semantic graph node after contradiction sync"
2706 );
2707 predecessors_removed = false;
2708 }
2709 }
2710 Ok(false) => {}
2711 Err(error) => {
2712 tracing::warn!(
2713 id = %prepared.current.id,
2714 error = %error,
2715 "failed to inspect superseded semantic graph node after contradiction sync"
2716 );
2717 predecessors_removed = false;
2718 }
2719 }
2720 }
2721
2722 if predecessors_removed {
2723 if let Err(error) = hirn_storage::update_mutation_envelope_state(
2724 self.storage_backend(),
2725 &envelope.id,
2726 hirn_storage::MutationEnvelopeState::Applied,
2727 None,
2728 )
2729 .await
2730 {
2731 tracing::warn!(
2732 source_id = %source_id,
2733 target_id = %target_id,
2734 envelope_id = %envelope.id,
2735 error = %error,
2736 "semantic contradiction sync envelope finalize failed; recovery will retry predecessor cleanup"
2737 );
2738 }
2739 }
2740
2741 Ok(ContradictionSyncResult {
2742 source_memory: final_source_id,
2743 target_memory: final_target_id,
2744 contradiction_edge: edge_id,
2745 })
2746 }
2747
2748 pub(crate) async fn connect_contradiction(
2749 &self,
2750 source: MemoryId,
2751 target: MemoryId,
2752 weight: f32,
2753 metadata: Metadata,
2754 ) -> HirnResult<crate::graph::EdgeId> {
2755 let synced = self
2756 .synchronize_contradiction_refs(source, target, Some((weight, metadata)))
2757 .await?;
2758
2759 synced.contradiction_edge.ok_or_else(|| {
2760 HirnError::InvalidInput(format!(
2761 "failed to materialize contradiction edge between {} and {}",
2762 synced.source_memory, synced.target_memory
2763 ))
2764 })
2765 }
2766
2767 async fn append_semantic_successor(
2768 &self,
2769 current: &SemanticRecord,
2770 update: SemanticUpdate,
2771 operation: RevisionOperation,
2772 preserve_valid_from: bool,
2773 authorization_action: crate::policy::Action,
2774 ) -> HirnResult<SemanticRecord> {
2775 let actor_id = update.actor_id;
2776 self.enforce(
2777 actor_id.as_str(),
2778 authorization_action,
2779 &self.config.default_realm,
2780 current.namespace.as_str(),
2781 )
2782 .await?;
2783
2784 let now = Timestamp::now();
2785 let mut next = current.clone();
2786 let new_id = MemoryId::new();
2787 next.id = new_id;
2788 next.revision_id = RevisionId::from_memory_id(new_id);
2789 next.version = current.version + 1;
2790 next.revision_operation = operation;
2791 next.revision_reason.clone_from(&update.reason);
2792 next.revision_causation_id = Some(update.causation_id);
2793 next.created_at = now;
2794 next.updated_at = now;
2795 next.valid_from = if preserve_valid_from {
2796 update.observed_at.unwrap_or(current.valid_from)
2797 } else {
2798 update.observed_at.unwrap_or(now)
2799 };
2800 next.valid_until = None;
2801 next.superseded_by = None;
2802 next.merged_into = None;
2803 next.provenance.created_by = actor_id;
2804
2805 let reason = update.reason.clone().unwrap_or_else(|| match operation {
2806 RevisionOperation::Override => "override".to_string(),
2807 RevisionOperation::Supersede => "supersede".to_string(),
2808 _ => "update".to_string(),
2809 });
2810
2811 self.apply_semantic_update_fields(current, &mut next, &update, &reason, now)
2812 .await?;
2813 normalize_semantic_record_timestamps(&mut next);
2814
2815 let envelope = build_semantic_successor_envelope(current.id, next.id)?;
2816 hirn_storage::append_mutation_envelope(self.storage_backend(), &envelope)
2817 .await
2818 .map_err(HirnError::storage)?;
2819
2820 if let Err(error) = self
2821 .cached_graph()
2822 .add_node(
2823 next.id,
2824 Layer::Semantic,
2825 next.confidence,
2826 next.created_at,
2827 next.namespace,
2828 )
2829 .await
2830 {
2831 if let Err(update_error) = hirn_storage::update_mutation_envelope_state(
2832 self.storage_backend(),
2833 &envelope.id,
2834 hirn_storage::MutationEnvelopeState::Failed,
2835 Some(error.to_string()),
2836 )
2837 .await
2838 {
2839 tracing::warn!(
2840 current_id = %current.id,
2841 next_id = %next.id,
2842 envelope_id = %envelope.id,
2843 error = %update_error,
2844 "semantic successor mutation envelope fail-fast finalize failed after graph add_node error"
2845 );
2846 }
2847 return Err(error);
2848 }
2849
2850 if let Some(ref emb) = next.embedding {
2851 let candidates = self.find_similarity_candidates(emb).await;
2852 if let Err(error) = self.apply_similarity_edges(next.id, &candidates).await {
2853 let _ = self.cached_graph().remove_node(next.id).await;
2854 if let Err(update_error) = hirn_storage::update_mutation_envelope_state(
2855 self.storage_backend(),
2856 &envelope.id,
2857 hirn_storage::MutationEnvelopeState::Failed,
2858 Some(error.to_string()),
2859 )
2860 .await
2861 {
2862 tracing::warn!(
2863 current_id = %current.id,
2864 next_id = %next.id,
2865 envelope_id = %envelope.id,
2866 error = %update_error,
2867 "semantic successor mutation envelope fail-fast finalize failed after similarity edge error"
2868 );
2869 }
2870 return Err(error);
2871 }
2872 }
2873
2874 self.attach_contradiction_successor_links(current, &mut next)
2875 .await;
2876
2877 if let Err(error) = self.append_semantic_record(&next).await {
2878 let _ = self.cached_graph().remove_node(next.id).await;
2879 if let Err(update_error) = hirn_storage::update_mutation_envelope_state(
2880 self.storage_backend(),
2881 &envelope.id,
2882 hirn_storage::MutationEnvelopeState::Failed,
2883 Some(error.to_string()),
2884 )
2885 .await
2886 {
2887 tracing::warn!(
2888 current_id = %current.id,
2889 next_id = %next.id,
2890 envelope_id = %envelope.id,
2891 error = %update_error,
2892 "semantic successor mutation envelope fail-fast finalize failed after semantic append error"
2893 );
2894 }
2895 return Err(error);
2896 }
2897
2898 self.cache_semantic_head(&next);
2899
2900 let predecessor_removed = match self.cached_graph().has_node(current.id).await {
2901 Ok(true) => match self.cached_graph().remove_node(current.id).await {
2902 Ok(_) => true,
2903 Err(error) => {
2904 tracing::warn!(id = %current.id, error = %error, "failed to remove superseded semantic graph node");
2905 false
2906 }
2907 },
2908 Ok(false) => true,
2909 Err(error) => {
2910 tracing::warn!(id = %current.id, error = %error, "failed to inspect superseded semantic graph node state");
2911 false
2912 }
2913 };
2914
2915 if predecessor_removed {
2916 if let Err(error) = hirn_storage::update_mutation_envelope_state(
2917 self.storage_backend(),
2918 &envelope.id,
2919 hirn_storage::MutationEnvelopeState::Applied,
2920 None,
2921 )
2922 .await
2923 {
2924 tracing::warn!(
2925 current_id = %current.id,
2926 next_id = %next.id,
2927 envelope_id = %envelope.id,
2928 error = %error,
2929 "semantic successor mutation envelope finalize failed; recovery will retry predecessor cleanup"
2930 );
2931 }
2932 }
2933
2934 Ok(next)
2935 }
2936
2937 async fn apply_semantic_update_fields(
2938 &self,
2939 current: &SemanticRecord,
2940 next: &mut SemanticRecord,
2941 update: &SemanticUpdate,
2942 reason: &str,
2943 now: Timestamp,
2944 ) -> HirnResult<()> {
2945 if let Some(desc) = update.description.clone() {
2946 let mutation = hirn_core::provenance::Mutation {
2947 timestamp: now,
2948 trigger: MutationTrigger::Manual,
2949 field: "description".to_string(),
2950 old_value: current.description.clone(),
2951 new_value: desc.clone(),
2952 reason: reason.to_string(),
2953 };
2954 next.provenance.record_mutation(mutation);
2955 next.description = desc;
2956 next.embedding = Some(self.embed_text(&next.description).await?);
2957 }
2958
2959 if let Some(conf) = update.confidence {
2960 let clamped = conf.clamp(0.0, 1.0);
2961 let mutation = hirn_core::provenance::Mutation {
2962 timestamp: now,
2963 trigger: MutationTrigger::Manual,
2964 field: "confidence".to_string(),
2965 old_value: current.confidence.to_string(),
2966 new_value: clamped.to_string(),
2967 reason: reason.to_string(),
2968 };
2969 next.provenance.record_mutation(mutation);
2970 next.confidence = clamped;
2971 }
2972
2973 if let Some(count) = update.evidence_count {
2974 next.evidence_count = count;
2975 }
2976
2977 Ok(())
2978 }
2979
2980 pub(crate) async fn correct_semantic(
2982 &self,
2983 id: MemoryId,
2984 update: SemanticUpdate,
2985 ) -> HirnResult<SemanticRecord> {
2986 let current = self.resolve_active_semantic_head(id).await?;
2987 let next = self
2988 .append_semantic_successor(
2989 ¤t,
2990 update,
2991 RevisionOperation::Correct,
2992 true,
2993 crate::policy::Action::Correct,
2994 )
2995 .await?;
2996
2997 self.emit_scoped(
2998 next.namespace.as_str(),
2999 next.provenance.created_by.as_str(),
3000 MemoryEvent::MemoryCorrected {
3001 logical_memory_id: current.logical_memory_id,
3002 old_revision_id: current.revision_id,
3003 new_revision_id: next.revision_id,
3004 reason: next.revision_reason.clone(),
3005 },
3006 )
3007 .await;
3008
3009 Ok(next)
3010 }
3011
3012 pub(crate) async fn supersede_semantic(
3014 &self,
3015 id: MemoryId,
3016 supersession: SemanticSupersession,
3017 ) -> HirnResult<SemanticRecord> {
3018 let current = self.resolve_active_semantic_head(id).await?;
3019 let next = self
3020 .append_semantic_successor(
3021 ¤t,
3022 supersession.into(),
3023 RevisionOperation::Supersede,
3024 false,
3025 crate::policy::Action::Supersede,
3026 )
3027 .await?;
3028
3029 self.emit_scoped(
3030 next.namespace.as_str(),
3031 next.provenance.created_by.as_str(),
3032 MemoryEvent::MemorySuperseded {
3033 logical_memory_id: current.logical_memory_id,
3034 prior_revision_id: current.revision_id,
3035 new_revision_id: next.revision_id,
3036 reason: next.revision_reason.clone(),
3037 },
3038 )
3039 .await;
3040
3041 Ok(next)
3042 }
3043
3044 pub(crate) async fn override_semantic(
3046 &self,
3047 id: MemoryId,
3048 override_request: SemanticOverride,
3049 ) -> HirnResult<SemanticRecord> {
3050 let current = self.resolve_active_semantic_head(id).await?;
3051 let actor_id = override_request.actor_id;
3052 let reason = override_request
3053 .reason
3054 .clone()
3055 .map(|reason| reason.trim().to_string())
3056 .filter(|reason| !reason.is_empty())
3057 .ok_or_else(|| {
3058 HirnError::InvalidInput("semantic override requires a non-empty reason".into())
3059 })?;
3060 let next = self
3061 .append_semantic_successor(
3062 ¤t,
3063 SemanticUpdate {
3064 description: override_request.description,
3065 confidence: override_request.confidence,
3066 evidence_count: override_request.evidence_count,
3067 reason: Some(reason.clone()),
3068 actor_id,
3069 observed_at: override_request.observed_at,
3070 causation_id: override_request.causation_id,
3071 },
3072 RevisionOperation::Override,
3073 false,
3074 crate::policy::Action::Admin,
3075 )
3076 .await?;
3077
3078 self.append_audit(
3079 Some(actor_id.clone()),
3080 hirn_core::audit::AuditAction::BeliefOverride {
3081 logical_memory_id: current.logical_memory_id,
3082 prior_revision_id: current.revision_id,
3083 override_revision_id: next.revision_id,
3084 namespace: next.namespace.as_str().to_string(),
3085 reason: reason.clone(),
3086 },
3087 )
3088 .await?;
3089
3090 self.emit_scoped(
3091 next.namespace.as_str(),
3092 actor_id.as_str(),
3093 MemoryEvent::MemoryOverridden {
3094 logical_memory_id: current.logical_memory_id,
3095 prior_revision_id: current.revision_id,
3096 override_revision_id: next.revision_id,
3097 reason: Some(reason),
3098 },
3099 )
3100 .await;
3101
3102 Ok(next)
3103 }
3104
3105 pub(crate) async fn merge_semantic(
3107 &self,
3108 target_id: MemoryId,
3109 merge: SemanticMerge,
3110 ) -> HirnResult<SemanticMergeOutcome> {
3111 if merge.source_ids.is_empty() {
3112 return Err(HirnError::InvalidInput(
3113 "MERGE MEMORY requires at least one source memory".into(),
3114 ));
3115 }
3116
3117 let current = self.resolve_active_semantic_head(target_id).await?;
3118 let actor_id = merge.actor_id;
3119 self.enforce(
3120 actor_id.as_str(),
3121 crate::policy::Action::Merge,
3122 &self.config.default_realm,
3123 current.namespace.as_str(),
3124 )
3125 .await?;
3126
3127 let mut source_heads = Vec::with_capacity(merge.source_ids.len());
3128 let mut seen_sources = HashSet::new();
3129 for source_id in &merge.source_ids {
3130 let source = self.resolve_active_semantic_head(*source_id).await?;
3131 if source.logical_memory_id == current.logical_memory_id {
3132 return Err(HirnError::InvalidInput(format!(
3133 "MERGE MEMORY source '{}' resolves to the target logical memory {}",
3134 source_id, current.logical_memory_id
3135 )));
3136 }
3137 if !seen_sources.insert(source.logical_memory_id) {
3138 return Err(HirnError::InvalidInput(format!(
3139 "MERGE MEMORY source '{}' duplicates logical memory {}",
3140 source_id, source.logical_memory_id
3141 )));
3142 }
3143 if source.namespace != current.namespace {
3144 return Err(HirnError::InvalidInput(format!(
3145 "MERGE MEMORY source {} is in namespace '{}' but target is in '{}'",
3146 source.id,
3147 source.namespace.as_str(),
3148 current.namespace.as_str()
3149 )));
3150 }
3151 if source.concept != current.concept {
3152 return Err(HirnError::InvalidInput(format!(
3153 "MERGE MEMORY source {} uses concept '{}' but target uses '{}'",
3154 source.id, source.concept, current.concept
3155 )));
3156 }
3157 source_heads.push(source);
3158 }
3159
3160 let now = Timestamp::now();
3161 let observed_at = merge.observed_at.unwrap_or(now);
3162 let causation_id = merge.causation_id;
3163 let reason = merge.reason.clone().unwrap_or_else(|| "merge".to_string());
3164
3165 let participants: Vec<&SemanticRecord> = std::iter::once(¤t)
3166 .chain(source_heads.iter())
3167 .collect();
3168 let (default_confidence, default_evidence_count) =
3169 merged_confidence_and_evidence(&participants);
3170
3171 let mut target = current.clone();
3172 let new_target_id = MemoryId::new();
3173 target.id = new_target_id;
3174 target.revision_id = RevisionId::from_memory_id(new_target_id);
3175 target.version = current.version + 1;
3176 target.revision_operation = RevisionOperation::Merge;
3177 target.revision_reason.clone_from(&merge.reason);
3178 target.revision_causation_id = Some(causation_id);
3179 target.created_at = now;
3180 target.updated_at = now;
3181 target.valid_from = observed_at;
3182 target.valid_until = None;
3183 target.superseded_by = None;
3184 target.merged_into = None;
3185 target.provenance.created_by = actor_id.clone();
3186
3187 let mut merged_source_episodes = target.source_episodes.clone();
3188 let mut merged_contradictions = target.contradiction_ids.clone();
3189 for source in &source_heads {
3190 merged_source_episodes.extend(source.source_episodes.iter().copied());
3191 merged_contradictions.extend(source.contradiction_ids.iter().copied());
3192 }
3193 merged_source_episodes.sort();
3194 merged_source_episodes.dedup();
3195 merged_contradictions.sort();
3196 merged_contradictions.dedup();
3197 target.source_episodes = merged_source_episodes;
3198 target.contradiction_ids = merged_contradictions;
3199
3200 let target_update = SemanticUpdate {
3201 description: merge.description.clone(),
3202 confidence: merge.confidence.or(Some(default_confidence)),
3203 evidence_count: merge.evidence_count.or(Some(default_evidence_count)),
3204 reason: merge.reason.clone(),
3205 actor_id,
3206 observed_at: Some(observed_at),
3207 causation_id,
3208 };
3209 self.apply_semantic_update_fields(¤t, &mut target, &target_update, &reason, now)
3210 .await?;
3211 normalize_semantic_record_timestamps(&mut target);
3212
3213 let mut merged_sources = Vec::with_capacity(source_heads.len());
3214 for source in &source_heads {
3215 let merged_source_id = MemoryId::new();
3216 let mut merged_source = source.clone();
3217 merged_source.id = merged_source_id;
3218 merged_source.revision_id = RevisionId::from_memory_id(merged_source_id);
3219 merged_source.version = source.version + 1;
3220 merged_source.revision_operation = RevisionOperation::Merge;
3221 merged_source.revision_reason.clone_from(&merge.reason);
3222 merged_source.revision_causation_id = Some(target.id);
3223 merged_source.created_at = now;
3224 merged_source.updated_at = now;
3225 merged_source.valid_from = observed_at;
3226 merged_source.valid_until = None;
3227 merged_source.superseded_by = None;
3228 merged_source.merged_into = Some(target.logical_memory_id);
3229 merged_source.provenance.created_by = actor_id.clone();
3230 normalize_semantic_record_timestamps(&mut merged_source);
3231 merged_sources.push(merged_source);
3232 }
3233
3234 let envelope = build_semantic_merge_envelope(
3235 current.id,
3236 target.id,
3237 source_heads.iter().map(|source| source.id).collect(),
3238 merged_sources.iter().map(|source| source.id).collect(),
3239 )?;
3240 hirn_storage::append_mutation_envelope(self.storage_backend(), &envelope)
3241 .await
3242 .map_err(HirnError::storage)?;
3243
3244 let mut added_nodes = Vec::with_capacity(1 + merged_sources.len());
3245 let graph_setup = async {
3246 self.cached_graph()
3247 .add_node(
3248 target.id,
3249 Layer::Semantic,
3250 target.confidence,
3251 target.created_at,
3252 target.namespace,
3253 )
3254 .await?;
3255 added_nodes.push(target.id);
3256
3257 for merged_source in &merged_sources {
3258 self.cached_graph()
3259 .add_node(
3260 merged_source.id,
3261 Layer::Semantic,
3262 merged_source.confidence,
3263 merged_source.created_at,
3264 merged_source.namespace,
3265 )
3266 .await?;
3267 added_nodes.push(merged_source.id);
3268 }
3269
3270 if let Some(ref emb) = target.embedding {
3271 let candidates = self.find_similarity_candidates(emb).await;
3272 self.apply_similarity_edges(target.id, &candidates).await?;
3273 }
3274
3275 for merged_source in &merged_sources {
3276 self.connect_with(
3277 target.id,
3278 merged_source.id,
3279 EdgeRelation::DerivedFrom,
3280 1.0,
3281 Metadata::default(),
3282 )
3283 .await?;
3284 }
3285
3286 HirnResult::Ok(())
3287 }
3288 .await;
3289
3290 if let Err(error) = graph_setup {
3291 for node_id in added_nodes {
3292 let _ = self.cached_graph().remove_node(node_id).await;
3293 }
3294 if let Err(update_error) = hirn_storage::update_mutation_envelope_state(
3295 self.storage_backend(),
3296 &envelope.id,
3297 hirn_storage::MutationEnvelopeState::Failed,
3298 Some(error.to_string()),
3299 )
3300 .await
3301 {
3302 tracing::warn!(
3303 current_id = %current.id,
3304 next_id = %target.id,
3305 envelope_id = %envelope.id,
3306 error = %update_error,
3307 "semantic merge mutation envelope fail-fast finalize failed after graph setup error"
3308 );
3309 }
3310 return Err(error);
3311 }
3312
3313 let mut appended_records = Vec::with_capacity(1 + merged_sources.len());
3314 appended_records.push(target.clone());
3315 appended_records.extend(merged_sources.iter().cloned());
3316 if let Err(error) = self.append_semantic_records(&appended_records).await {
3317 for node_id in [target.id]
3318 .into_iter()
3319 .chain(merged_sources.iter().map(|record| record.id))
3320 {
3321 let _ = self.cached_graph().remove_node(node_id).await;
3322 }
3323 if let Err(update_error) = hirn_storage::update_mutation_envelope_state(
3324 self.storage_backend(),
3325 &envelope.id,
3326 hirn_storage::MutationEnvelopeState::Failed,
3327 Some(error.to_string()),
3328 )
3329 .await
3330 {
3331 tracing::warn!(
3332 current_id = %current.id,
3333 next_id = %target.id,
3334 envelope_id = %envelope.id,
3335 error = %update_error,
3336 "semantic merge mutation envelope fail-fast finalize failed after semantic append error"
3337 );
3338 }
3339 return Err(error);
3340 }
3341
3342 self.cache_semantic_head(&target);
3343 for merged_source in &merged_sources {
3344 self.cache_semantic_head(merged_source);
3345 }
3346
3347 let mut predecessors_removed = true;
3348
3349 match self.cached_graph().has_node(current.id).await {
3350 Ok(true) => {
3351 if let Err(error) = self.cached_graph().remove_node(current.id).await {
3352 tracing::warn!(id = %current.id, error = %error, "failed to remove merged target predecessor graph node");
3353 predecessors_removed = false;
3354 }
3355 }
3356 Ok(false) => {}
3357 Err(error) => {
3358 tracing::warn!(id = %current.id, error = %error, "failed to inspect merged target predecessor graph node state");
3359 predecessors_removed = false;
3360 }
3361 }
3362
3363 for source in &source_heads {
3364 match self.cached_graph().has_node(source.id).await {
3365 Ok(true) => {
3366 if let Err(error) = self.cached_graph().remove_node(source.id).await {
3367 tracing::warn!(id = %source.id, error = %error, "failed to remove merged source predecessor graph node");
3368 predecessors_removed = false;
3369 }
3370 }
3371 Ok(false) => {}
3372 Err(error) => {
3373 tracing::warn!(id = %source.id, error = %error, "failed to inspect merged source predecessor graph node state");
3374 predecessors_removed = false;
3375 }
3376 }
3377 }
3378
3379 if predecessors_removed {
3380 if let Err(error) = hirn_storage::update_mutation_envelope_state(
3381 self.storage_backend(),
3382 &envelope.id,
3383 hirn_storage::MutationEnvelopeState::Applied,
3384 None,
3385 )
3386 .await
3387 {
3388 tracing::warn!(
3389 current_id = %current.id,
3390 next_id = %target.id,
3391 envelope_id = %envelope.id,
3392 error = %error,
3393 "semantic merge mutation envelope finalize failed; recovery will retry predecessor cleanup"
3394 );
3395 }
3396 }
3397
3398 self.emit_scoped(
3399 target.namespace.as_str(),
3400 target.provenance.created_by.as_str(),
3401 MemoryEvent::MemoryMerged {
3402 target_logical_memory_id: target.logical_memory_id,
3403 prior_target_revision_id: current.revision_id,
3404 new_target_revision_id: target.revision_id,
3405 source_logical_memory_ids: source_heads
3406 .iter()
3407 .map(|source| source.logical_memory_id)
3408 .collect(),
3409 source_revision_ids: merged_sources
3410 .iter()
3411 .map(|source| source.revision_id)
3412 .collect(),
3413 reason: target.revision_reason.clone(),
3414 },
3415 )
3416 .await;
3417
3418 Ok(SemanticMergeOutcome {
3419 target,
3420 merged_sources,
3421 })
3422 }
3423
3424 pub(crate) async fn retract_semantic(
3426 &self,
3427 id: MemoryId,
3428 retraction: SemanticRetraction,
3429 ) -> HirnResult<SemanticRecord> {
3430 let rec = self.resolve_active_semantic_head(id).await?;
3431
3432 let actor_id = retraction.actor_id;
3433
3434 self.enforce(
3435 actor_id.as_str(),
3436 crate::policy::Action::Retract,
3437 &self.config.default_realm,
3438 rec.namespace.as_str(),
3439 )
3440 .await?;
3441
3442 let now = Timestamp::now();
3443 let new_id = MemoryId::new();
3444 let mut tombstone = rec.clone();
3445 tombstone.id = new_id;
3446 tombstone.revision_id = RevisionId::from_memory_id(new_id);
3447 tombstone.version = rec.version + 1;
3448 tombstone.revision_operation = RevisionOperation::Retract;
3449 tombstone.revision_reason.clone_from(&retraction.reason);
3450 tombstone.revision_causation_id = Some(retraction.causation_id);
3451 tombstone.created_at = now;
3452 tombstone.updated_at = now;
3453 tombstone.valid_from = retraction.observed_at.unwrap_or(now);
3454 tombstone.valid_until = None;
3455 tombstone.superseded_by = None;
3456 tombstone.merged_into = None;
3457 tombstone.provenance.created_by = actor_id;
3458 normalize_semantic_record_timestamps(&mut tombstone);
3459
3460 let envelope = build_semantic_retract_envelope(rec.id, tombstone.id)?;
3461 hirn_storage::append_mutation_envelope(self.storage_backend(), &envelope)
3462 .await
3463 .map_err(HirnError::storage)?;
3464
3465 self.append_semantic_record(&tombstone).await?;
3466
3467 self.cache_semantic_head(&tombstone);
3468
3469 let node_removed = match self.cached_graph().remove_node(rec.id).await {
3470 Ok(_) => true,
3471 Err(error) => {
3472 tracing::warn!(id = %rec.id, error = %error, "failed to remove retracted semantic graph node");
3473 false
3474 }
3475 };
3476
3477 self.emit_scoped(
3478 tombstone.namespace.as_str(),
3479 tombstone.provenance.created_by.as_str(),
3480 MemoryEvent::MemoryRetracted {
3481 logical_memory_id: rec.logical_memory_id,
3482 prior_revision_id: rec.revision_id,
3483 tombstone_revision_id: tombstone.revision_id,
3484 reason: tombstone.revision_reason.clone(),
3485 },
3486 )
3487 .await;
3488
3489 if node_removed {
3490 if let Err(error) = hirn_storage::update_mutation_envelope_state(
3491 self.storage_backend(),
3492 &envelope.id,
3493 hirn_storage::MutationEnvelopeState::Applied,
3494 None,
3495 )
3496 .await
3497 {
3498 tracing::warn!(
3499 id = %rec.id,
3500 envelope_id = %envelope.id,
3501 error = %error,
3502 "semantic retract mutation envelope finalize failed; recovery will retry graph cleanup"
3503 );
3504 }
3505 }
3506 Ok(tombstone)
3507 }
3508
3509 pub(crate) async fn reconcile_pending_semantic_create_mutations(&self) -> HirnResult<usize> {
3510 let envelopes = hirn_storage::list_pending_mutation_envelopes(
3511 self.storage_backend(),
3512 Some(SEMANTIC_CREATE_MUTATION_KIND),
3513 )
3514 .await
3515 .map_err(HirnError::storage)?;
3516 let mut reconciled = 0usize;
3517
3518 for envelope in envelopes {
3519 match self
3520 .reconcile_single_pending_semantic_create_mutation(&envelope)
3521 .await
3522 {
3523 Ok(true) => reconciled += 1,
3524 Ok(false) => {}
3525 Err(error) => {
3526 hirn_storage::update_mutation_envelope_state(
3527 self.storage_backend(),
3528 &envelope.id,
3529 hirn_storage::MutationEnvelopeState::Failed,
3530 Some(error.to_string()),
3531 )
3532 .await
3533 .map_err(HirnError::storage)?;
3534 }
3535 }
3536 }
3537
3538 Ok(reconciled)
3539 }
3540
3541 pub(crate) async fn reconcile_pending_semantic_retract_mutations(&self) -> HirnResult<usize> {
3542 let envelopes = hirn_storage::list_pending_mutation_envelopes(
3543 self.storage_backend(),
3544 Some(SEMANTIC_RETRACT_MUTATION_KIND),
3545 )
3546 .await
3547 .map_err(HirnError::storage)?;
3548 let mut reconciled = 0usize;
3549
3550 for envelope in envelopes {
3551 match self
3552 .reconcile_single_pending_semantic_retract_mutation(&envelope)
3553 .await
3554 {
3555 Ok(true) => reconciled += 1,
3556 Ok(false) => {}
3557 Err(error) => {
3558 hirn_storage::update_mutation_envelope_state(
3559 self.storage_backend(),
3560 &envelope.id,
3561 hirn_storage::MutationEnvelopeState::Failed,
3562 Some(error.to_string()),
3563 )
3564 .await
3565 .map_err(HirnError::storage)?;
3566 }
3567 }
3568 }
3569
3570 Ok(reconciled)
3571 }
3572
3573 pub(crate) async fn reconcile_pending_semantic_successor_mutations(&self) -> HirnResult<usize> {
3574 let envelopes = hirn_storage::list_pending_mutation_envelopes(
3575 self.storage_backend(),
3576 Some(SEMANTIC_SUCCESSOR_MUTATION_KIND),
3577 )
3578 .await
3579 .map_err(HirnError::storage)?;
3580 let mut reconciled = 0usize;
3581
3582 for envelope in envelopes {
3583 match self
3584 .reconcile_single_pending_semantic_successor_mutation(&envelope)
3585 .await
3586 {
3587 Ok(true) => reconciled += 1,
3588 Ok(false) => {}
3589 Err(error) => {
3590 hirn_storage::update_mutation_envelope_state(
3591 self.storage_backend(),
3592 &envelope.id,
3593 hirn_storage::MutationEnvelopeState::Failed,
3594 Some(error.to_string()),
3595 )
3596 .await
3597 .map_err(HirnError::storage)?;
3598 }
3599 }
3600 }
3601
3602 Ok(reconciled)
3603 }
3604
3605 pub(crate) async fn reconcile_pending_semantic_merge_mutations(&self) -> HirnResult<usize> {
3606 let envelopes = hirn_storage::list_pending_mutation_envelopes(
3607 self.storage_backend(),
3608 Some(SEMANTIC_MERGE_MUTATION_KIND),
3609 )
3610 .await
3611 .map_err(HirnError::storage)?;
3612 let mut reconciled = 0usize;
3613
3614 for envelope in envelopes {
3615 match self
3616 .reconcile_single_pending_semantic_merge_mutation(&envelope)
3617 .await
3618 {
3619 Ok(true) => reconciled += 1,
3620 Ok(false) => {}
3621 Err(error) => {
3622 hirn_storage::update_mutation_envelope_state(
3623 self.storage_backend(),
3624 &envelope.id,
3625 hirn_storage::MutationEnvelopeState::Failed,
3626 Some(error.to_string()),
3627 )
3628 .await
3629 .map_err(HirnError::storage)?;
3630 }
3631 }
3632 }
3633
3634 Ok(reconciled)
3635 }
3636
3637 pub(crate) async fn reconcile_pending_semantic_contradiction_sync_mutations(
3638 &self,
3639 ) -> HirnResult<usize> {
3640 let envelopes = hirn_storage::list_pending_mutation_envelopes(
3641 self.storage_backend(),
3642 Some(SEMANTIC_CONTRADICTION_SYNC_MUTATION_KIND),
3643 )
3644 .await
3645 .map_err(HirnError::storage)?;
3646 let mut reconciled = 0usize;
3647
3648 for envelope in envelopes {
3649 match self
3650 .reconcile_single_pending_semantic_contradiction_sync_mutation(&envelope)
3651 .await
3652 {
3653 Ok(true) => reconciled += 1,
3654 Ok(false) => {}
3655 Err(error) => {
3656 hirn_storage::update_mutation_envelope_state(
3657 self.storage_backend(),
3658 &envelope.id,
3659 hirn_storage::MutationEnvelopeState::Failed,
3660 Some(error.to_string()),
3661 )
3662 .await
3663 .map_err(HirnError::storage)?;
3664 }
3665 }
3666 }
3667
3668 Ok(reconciled)
3669 }
3670
3671 pub(crate) async fn reconcile_pending_semantic_purge_mutations(&self) -> HirnResult<usize> {
3672 let envelopes = hirn_storage::list_pending_mutation_envelopes(
3673 self.storage_backend(),
3674 Some(SEMANTIC_PURGE_MUTATION_KIND),
3675 )
3676 .await
3677 .map_err(HirnError::storage)?;
3678 let mut reconciled = 0usize;
3679
3680 for envelope in envelopes {
3681 match self
3682 .reconcile_single_pending_semantic_purge_mutation(&envelope)
3683 .await
3684 {
3685 Ok(true) => reconciled += 1,
3686 Ok(false) => {}
3687 Err(error) => {
3688 hirn_storage::update_mutation_envelope_state(
3689 self.storage_backend(),
3690 &envelope.id,
3691 hirn_storage::MutationEnvelopeState::Failed,
3692 Some(error.to_string()),
3693 )
3694 .await
3695 .map_err(HirnError::storage)?;
3696 }
3697 }
3698 }
3699
3700 Ok(reconciled)
3701 }
3702
3703 async fn remove_semantic_graph_nodes_if_present(&self, ids: &[MemoryId]) -> HirnResult<()> {
3704 for &id in ids {
3705 if self.cached_graph().has_node(id).await? {
3706 self.cached_graph().remove_node(id).await?;
3707 }
3708 }
3709 Ok(())
3710 }
3711
3712 async fn reconcile_single_pending_semantic_create_mutation(
3713 &self,
3714 envelope: &hirn_storage::MutationEnvelopeRecord,
3715 ) -> HirnResult<bool> {
3716 let payload = decode_semantic_create_envelope(envelope)?;
3717
3718 match self.read_semantic_record(payload.record_id).await {
3719 Ok(record) => {
3720 if self.semantic_record_is_current_head(&record).await? {
3721 if !self.cached_graph().has_node(record.id).await? {
3722 self.cached_graph()
3723 .add_node(
3724 record.id,
3725 Layer::Semantic,
3726 record.confidence,
3727 record.created_at,
3728 record.namespace,
3729 )
3730 .await?;
3731 }
3732 self.cache_semantic_head(&record);
3733 } else if self.cached_graph().has_node(record.id).await? {
3734 self.cached_graph().remove_node(record.id).await?;
3735 }
3736
3737 hirn_storage::update_mutation_envelope_state(
3738 self.storage_backend(),
3739 &envelope.id,
3740 hirn_storage::MutationEnvelopeState::Applied,
3741 None,
3742 )
3743 .await
3744 .map_err(HirnError::storage)?;
3745 Ok(true)
3746 }
3747 Err(HirnError::NotFound(_)) => {
3748 if self.cached_graph().has_node(payload.record_id).await? {
3749 self.cached_graph().remove_node(payload.record_id).await?;
3750 }
3751 hirn_storage::update_mutation_envelope_state(
3752 self.storage_backend(),
3753 &envelope.id,
3754 hirn_storage::MutationEnvelopeState::Failed,
3755 Some(format!(
3756 "semantic create record missing during recovery: {}",
3757 payload.record_id
3758 )),
3759 )
3760 .await
3761 .map_err(HirnError::storage)?;
3762 Ok(true)
3763 }
3764 Err(error) => Err(error),
3765 }
3766 }
3767
3768 async fn apply_semantic_purge_storage_delete(
3769 &self,
3770 logical_memory_id: LogicalMemoryId,
3771 ) -> HirnResult<()> {
3772 let exact_filter = Self::semantic_logical_exact_filter(logical_memory_id);
3773 self.storage_runtime
3774 .delete_exact(
3775 hirn_storage::datasets::semantic::DATASET_NAME,
3776 &exact_filter,
3777 )
3778 .await
3779 .map_err(HirnError::storage)?;
3780 self.evict_semantic_head(logical_memory_id);
3781 Ok(())
3782 }
3783
3784 async fn reconcile_single_pending_semantic_successor_mutation(
3785 &self,
3786 envelope: &hirn_storage::MutationEnvelopeRecord,
3787 ) -> HirnResult<bool> {
3788 let payload = decode_semantic_successor_envelope(envelope)?;
3789
3790 match self.read_semantic_record(payload.successor_id).await {
3791 Ok(successor) => {
3792 if self.semantic_record_is_current_head(&successor).await?
3793 && !self.cached_graph().has_node(payload.successor_id).await?
3794 {
3795 self.cached_graph()
3796 .add_node(
3797 successor.id,
3798 Layer::Semantic,
3799 successor.confidence,
3800 successor.created_at,
3801 successor.namespace,
3802 )
3803 .await?;
3804 }
3805 if self
3806 .cached_graph()
3807 .has_node(payload.prior_record_id)
3808 .await?
3809 {
3810 self.cached_graph()
3811 .remove_node(payload.prior_record_id)
3812 .await?;
3813 }
3814 hirn_storage::update_mutation_envelope_state(
3815 self.storage_backend(),
3816 &envelope.id,
3817 hirn_storage::MutationEnvelopeState::Applied,
3818 None,
3819 )
3820 .await
3821 .map_err(HirnError::storage)?;
3822 Ok(true)
3823 }
3824 Err(HirnError::NotFound(_)) => {
3825 self.remove_semantic_graph_nodes_if_present(std::slice::from_ref(
3826 &payload.successor_id,
3827 ))
3828 .await?;
3829 hirn_storage::update_mutation_envelope_state(
3830 self.storage_backend(),
3831 &envelope.id,
3832 hirn_storage::MutationEnvelopeState::Failed,
3833 Some(format!(
3834 "semantic successor record missing during recovery: {}",
3835 payload.successor_id
3836 )),
3837 )
3838 .await
3839 .map_err(HirnError::storage)?;
3840 Ok(true)
3841 }
3842 Err(error) => Err(error),
3843 }
3844 }
3845
3846 async fn reconcile_single_pending_semantic_merge_mutation(
3847 &self,
3848 envelope: &hirn_storage::MutationEnvelopeRecord,
3849 ) -> HirnResult<bool> {
3850 let payload = decode_semantic_merge_envelope(envelope)?;
3851
3852 let mut merged_node_ids = Vec::with_capacity(1 + payload.merged_source_ids.len());
3853 merged_node_ids.push(payload.merged_target_id);
3854 merged_node_ids.extend(payload.merged_source_ids.iter().copied());
3855
3856 let target = match self.read_semantic_record(payload.merged_target_id).await {
3857 Ok(target) => target,
3858 Err(HirnError::NotFound(_)) => {
3859 self.remove_semantic_graph_nodes_if_present(&merged_node_ids)
3860 .await?;
3861 hirn_storage::update_mutation_envelope_state(
3862 self.storage_backend(),
3863 &envelope.id,
3864 hirn_storage::MutationEnvelopeState::Failed,
3865 Some(format!(
3866 "semantic merge target missing during recovery: {}",
3867 payload.merged_target_id
3868 )),
3869 )
3870 .await
3871 .map_err(HirnError::storage)?;
3872 return Ok(true);
3873 }
3874 Err(error) => return Err(error),
3875 };
3876
3877 let mut merged_sources = Vec::with_capacity(payload.merged_source_ids.len());
3878 for merged_source_id in &payload.merged_source_ids {
3879 match self.read_semantic_record(*merged_source_id).await {
3880 Ok(record) => merged_sources.push(record),
3881 Err(HirnError::NotFound(_)) => {
3882 self.remove_semantic_graph_nodes_if_present(&merged_node_ids)
3883 .await?;
3884 hirn_storage::update_mutation_envelope_state(
3885 self.storage_backend(),
3886 &envelope.id,
3887 hirn_storage::MutationEnvelopeState::Failed,
3888 Some(format!(
3889 "semantic merge source missing during recovery: {}",
3890 merged_source_id
3891 )),
3892 )
3893 .await
3894 .map_err(HirnError::storage)?;
3895 return Ok(true);
3896 }
3897 Err(error) => return Err(error),
3898 }
3899 }
3900
3901 if self.semantic_record_is_current_head(&target).await?
3902 && !self.cached_graph().has_node(target.id).await?
3903 {
3904 self.cached_graph()
3905 .add_node(
3906 target.id,
3907 Layer::Semantic,
3908 target.confidence,
3909 target.created_at,
3910 target.namespace,
3911 )
3912 .await?;
3913 }
3914
3915 for merged_source in &merged_sources {
3916 if self.semantic_record_is_current_head(merged_source).await?
3917 && !self.cached_graph().has_node(merged_source.id).await?
3918 {
3919 self.cached_graph()
3920 .add_node(
3921 merged_source.id,
3922 Layer::Semantic,
3923 merged_source.confidence,
3924 merged_source.created_at,
3925 merged_source.namespace,
3926 )
3927 .await?;
3928 }
3929 }
3930
3931 let mut predecessor_ids = Vec::with_capacity(1 + payload.prior_source_ids.len());
3932 predecessor_ids.push(payload.prior_target_id);
3933 predecessor_ids.extend(payload.prior_source_ids.iter().copied());
3934 self.remove_semantic_graph_nodes_if_present(&predecessor_ids)
3935 .await?;
3936
3937 hirn_storage::update_mutation_envelope_state(
3938 self.storage_backend(),
3939 &envelope.id,
3940 hirn_storage::MutationEnvelopeState::Applied,
3941 None,
3942 )
3943 .await
3944 .map_err(HirnError::storage)?;
3945 Ok(true)
3946 }
3947
3948 async fn reconcile_single_pending_semantic_contradiction_sync_mutation(
3949 &self,
3950 envelope: &hirn_storage::MutationEnvelopeRecord,
3951 ) -> HirnResult<bool> {
3952 let payload = decode_semantic_contradiction_sync_envelope(envelope)?;
3953
3954 let mut successors = Vec::with_capacity(payload.successor_ids.len());
3955 for successor_id in &payload.successor_ids {
3956 match self.read_semantic_record(*successor_id).await {
3957 Ok(record) => successors.push(record),
3958 Err(HirnError::NotFound(_)) => {
3959 self.remove_semantic_graph_nodes_if_present(&payload.successor_ids)
3960 .await?;
3961 hirn_storage::update_mutation_envelope_state(
3962 self.storage_backend(),
3963 &envelope.id,
3964 hirn_storage::MutationEnvelopeState::Failed,
3965 Some(format!(
3966 "semantic contradiction successor missing during recovery: {}",
3967 successor_id
3968 )),
3969 )
3970 .await
3971 .map_err(HirnError::storage)?;
3972 return Ok(true);
3973 }
3974 Err(error) => return Err(error),
3975 }
3976 }
3977
3978 for successor in &successors {
3979 if self.semantic_record_is_current_head(successor).await?
3980 && !self.cached_graph().has_node(successor.id).await?
3981 {
3982 self.cached_graph()
3983 .add_node(
3984 successor.id,
3985 Layer::Semantic,
3986 successor.confidence,
3987 successor.created_at,
3988 successor.namespace,
3989 )
3990 .await?;
3991 }
3992 }
3993
3994 self.remove_semantic_graph_nodes_if_present(&payload.prior_record_ids)
3995 .await?;
3996
3997 hirn_storage::update_mutation_envelope_state(
3998 self.storage_backend(),
3999 &envelope.id,
4000 hirn_storage::MutationEnvelopeState::Applied,
4001 None,
4002 )
4003 .await
4004 .map_err(HirnError::storage)?;
4005 Ok(true)
4006 }
4007
4008 async fn reconcile_single_pending_semantic_purge_mutation(
4009 &self,
4010 envelope: &hirn_storage::MutationEnvelopeRecord,
4011 ) -> HirnResult<bool> {
4012 let payload = decode_semantic_purge_envelope(envelope)?;
4013
4014 self.apply_semantic_purge_storage_delete(payload.logical_memory_id)
4015 .await?;
4016 self.remove_semantic_graph_nodes_if_present(&payload.revision_ids)
4017 .await?;
4018
4019 hirn_storage::update_mutation_envelope_state(
4020 self.storage_backend(),
4021 &envelope.id,
4022 hirn_storage::MutationEnvelopeState::Applied,
4023 None,
4024 )
4025 .await
4026 .map_err(HirnError::storage)?;
4027 Ok(true)
4028 }
4029
4030 async fn reconcile_single_pending_semantic_retract_mutation(
4031 &self,
4032 envelope: &hirn_storage::MutationEnvelopeRecord,
4033 ) -> HirnResult<bool> {
4034 let payload = decode_semantic_retract_envelope(envelope)?;
4035
4036 match self.read_semantic_record(payload.tombstone_id).await {
4037 Ok(tombstone) => {
4038 self.cache_semantic_head(&tombstone);
4039 if self
4040 .cached_graph()
4041 .has_node(payload.prior_record_id)
4042 .await?
4043 {
4044 self.cached_graph()
4045 .remove_node(payload.prior_record_id)
4046 .await?;
4047 }
4048 hirn_storage::update_mutation_envelope_state(
4049 self.storage_backend(),
4050 &envelope.id,
4051 hirn_storage::MutationEnvelopeState::Applied,
4052 None,
4053 )
4054 .await
4055 .map_err(HirnError::storage)?;
4056 Ok(true)
4057 }
4058 Err(HirnError::NotFound(_)) => {
4059 hirn_storage::update_mutation_envelope_state(
4060 self.storage_backend(),
4061 &envelope.id,
4062 hirn_storage::MutationEnvelopeState::Failed,
4063 Some(format!(
4064 "semantic retract tombstone missing during recovery: {}",
4065 payload.tombstone_id
4066 )),
4067 )
4068 .await
4069 .map_err(HirnError::storage)?;
4070 Ok(true)
4071 }
4072 Err(error) => Err(error),
4073 }
4074 }
4075
4076 pub(crate) async fn purge_semantic(&self, id: MemoryId) -> HirnResult<()> {
4078 self.purge_semantic_as(id, None).await
4079 }
4080
4081 pub(crate) async fn purge_semantic_as(
4082 &self,
4083 id: MemoryId,
4084 actor_id: Option<AgentId>,
4085 ) -> HirnResult<()> {
4086 let rec = self.read_semantic_record(id).await?;
4087 let actor_id = actor_id.unwrap_or_else(|| rec.provenance.created_by.clone());
4088 self.enforce(
4089 actor_id.as_str(),
4090 crate::policy::Action::Purge,
4091 &self.config.default_realm,
4092 rec.namespace.as_str(),
4093 )
4094 .await?;
4095
4096 let history = self.semantic_history(id).await?;
4097 let revision_ids = history
4098 .iter()
4099 .map(|revision| revision.id)
4100 .collect::<Vec<_>>();
4101 let envelope = build_semantic_purge_envelope(rec.logical_memory_id, revision_ids.clone())?;
4102 hirn_storage::append_mutation_envelope(self.storage_backend(), &envelope)
4103 .await
4104 .map_err(HirnError::storage)?;
4105
4106 self.apply_semantic_purge_storage_delete(rec.logical_memory_id)
4107 .await?;
4108
4109 let graph_cleanup_applied = match self
4110 .remove_semantic_graph_nodes_if_present(&revision_ids)
4111 .await
4112 {
4113 Ok(()) => true,
4114 Err(error) => {
4115 tracing::warn!(
4116 logical_memory_id = %rec.logical_memory_id,
4117 envelope_id = %envelope.id,
4118 error = %error,
4119 "semantic purge graph cleanup incomplete; recovery will retry"
4120 );
4121 false
4122 }
4123 };
4124
4125 self.emit_scoped(
4126 rec.namespace.as_str(),
4127 actor_id.as_str(),
4128 MemoryEvent::Forgotten { id },
4129 )
4130 .await;
4131
4132 if graph_cleanup_applied {
4133 if let Err(error) = hirn_storage::update_mutation_envelope_state(
4134 self.storage_backend(),
4135 &envelope.id,
4136 hirn_storage::MutationEnvelopeState::Applied,
4137 None,
4138 )
4139 .await
4140 {
4141 tracing::warn!(
4142 logical_memory_id = %rec.logical_memory_id,
4143 envelope_id = %envelope.id,
4144 error = %error,
4145 "semantic purge mutation envelope finalize failed; recovery will retry graph cleanup"
4146 );
4147 }
4148 }
4149 Ok(())
4150 }
4151
4152 pub(crate) async fn get_memory(&self, id: MemoryId) -> HirnResult<MemoryRecord> {
4156 let exact_filter = hirn_storage::store::ExactMatchFilter::utf8_value("id", id.to_string());
4157 let opts = || hirn_storage::store::ScanOptions {
4158 exact_filter: Some(exact_filter.clone()),
4159 limit: Some(1),
4160 ..Default::default()
4161 };
4162
4163 if let Some(record) = Self::scan_memory_dataset_record(
4165 self.storage_backend(),
4166 "memory lookup",
4167 hirn_storage::datasets::working::DATASET_NAME,
4168 opts(),
4169 hirn_storage::datasets::working::from_batch,
4170 )
4171 .await?
4172 {
4173 return Ok(MemoryRecord::Working(record));
4174 }
4175
4176 if let Some(record) = Self::scan_memory_dataset_record(
4178 self.storage_backend(),
4179 "memory lookup",
4180 hirn_storage::datasets::episodic::DATASET_NAME,
4181 opts(),
4182 hirn_storage::datasets::episodic::from_batch,
4183 )
4184 .await?
4185 {
4186 return Ok(MemoryRecord::Episodic(record));
4187 }
4188
4189 if let Some(record) = Self::scan_memory_dataset_record(
4191 self.storage_backend(),
4192 "memory lookup",
4193 hirn_storage::datasets::semantic::DATASET_NAME,
4194 opts(),
4195 hirn_storage::datasets::semantic::from_batch,
4196 )
4197 .await?
4198 {
4199 return Ok(MemoryRecord::Semantic(record));
4200 }
4201
4202 if let Some(record) = Self::scan_memory_dataset_record(
4204 self.storage_backend(),
4205 "memory lookup",
4206 hirn_storage::datasets::procedural::DATASET_NAME,
4207 opts(),
4208 hirn_storage::datasets::procedural::from_batch,
4209 )
4210 .await?
4211 {
4212 return Ok(MemoryRecord::Procedural(record));
4213 }
4214
4215 Err(HirnError::NotFound(format!("memory record {id}")))
4216 }
4217
4218 pub(crate) async fn get_memories_batch(
4223 &self,
4224 ids: &[MemoryId],
4225 ) -> HirnResult<HashMap<MemoryId, MemoryRecord>> {
4226 if ids.is_empty() {
4227 return Ok(HashMap::new());
4228 }
4229
4230 let (working, episodic, semantic, procedural) = tokio::try_join!(
4231 Self::scan_memory_dataset_records_for_ids(
4232 self.storage_backend(),
4233 "recall hydration",
4234 hirn_storage::datasets::working::DATASET_NAME,
4235 ids,
4236 hirn_storage::datasets::working::from_batch,
4237 MemoryRecord::Working,
4238 ),
4239 Self::scan_memory_dataset_records_for_ids(
4240 self.storage_backend(),
4241 "recall hydration",
4242 hirn_storage::datasets::episodic::DATASET_NAME,
4243 ids,
4244 hirn_storage::datasets::episodic::from_batch,
4245 MemoryRecord::Episodic,
4246 ),
4247 Self::scan_memory_dataset_records_for_ids(
4248 self.storage_backend(),
4249 "recall hydration",
4250 hirn_storage::datasets::semantic::DATASET_NAME,
4251 ids,
4252 hirn_storage::datasets::semantic::from_batch,
4253 MemoryRecord::Semantic,
4254 ),
4255 Self::scan_memory_dataset_records_for_ids(
4256 self.storage_backend(),
4257 "recall hydration",
4258 hirn_storage::datasets::procedural::DATASET_NAME,
4259 ids,
4260 hirn_storage::datasets::procedural::from_batch,
4261 MemoryRecord::Procedural,
4262 ),
4263 )?;
4264
4265 let mut result: HashMap<MemoryId, MemoryRecord> = HashMap::with_capacity(ids.len());
4266 result.extend(working);
4267 result.extend(episodic);
4268 result.extend(semantic);
4269 result.extend(procedural);
4270
4271 Ok(result)
4272 }
4273
4274 pub(crate) async fn get_memories_batch_with_hints(
4275 &self,
4276 ids: &[MemoryId],
4277 layer_hints: &HashMap<MemoryId, Layer>,
4278 ) -> HirnResult<HashMap<MemoryId, MemoryRecord>> {
4279 if ids.is_empty() {
4280 return Ok(HashMap::new());
4281 }
4282
4283 let mut working_ids = Vec::new();
4284 let mut episodic_ids = Vec::new();
4285 let mut semantic_ids = Vec::new();
4286 let mut procedural_ids = Vec::new();
4287 let mut unknown_ids = Vec::new();
4288
4289 for &id in ids {
4290 match layer_hints.get(&id).copied() {
4291 Some(Layer::Working) => working_ids.push(id),
4292 Some(Layer::Episodic) => episodic_ids.push(id),
4293 Some(Layer::Semantic) => semantic_ids.push(id),
4294 Some(Layer::Procedural) => procedural_ids.push(id),
4295 None => unknown_ids.push(id),
4296 }
4297 }
4298
4299 let mut working_scan_ids = working_ids;
4300 let mut episodic_scan_ids = episodic_ids;
4301 let mut semantic_scan_ids = semantic_ids;
4302 let mut procedural_scan_ids = procedural_ids;
4303
4304 if !unknown_ids.is_empty() {
4305 working_scan_ids.extend_from_slice(&unknown_ids);
4306 episodic_scan_ids.extend_from_slice(&unknown_ids);
4307 semantic_scan_ids.extend_from_slice(&unknown_ids);
4308 procedural_scan_ids.extend_from_slice(&unknown_ids);
4309 }
4310
4311 let (working, episodic, semantic, procedural) = tokio::try_join!(
4312 Self::scan_memory_dataset_records_for_ids_projected(
4313 self.storage_backend(),
4314 "recall hydration",
4315 hirn_storage::datasets::working::DATASET_NAME,
4316 &working_scan_ids,
4317 None, hirn_storage::datasets::working::from_batch,
4319 MemoryRecord::Working,
4320 ),
4321 Self::scan_memory_dataset_records_for_ids_projected(
4322 self.storage_backend(),
4323 "recall hydration",
4324 hirn_storage::datasets::episodic::DATASET_NAME,
4325 &episodic_scan_ids,
4326 Some(hirn_storage::datasets::episodic::RECALL_HYDRATION_COLUMNS),
4327 hirn_storage::datasets::episodic::from_batch,
4328 MemoryRecord::Episodic,
4329 ),
4330 Self::scan_memory_dataset_records_for_ids_projected(
4331 self.storage_backend(),
4332 "recall hydration",
4333 hirn_storage::datasets::semantic::DATASET_NAME,
4334 &semantic_scan_ids,
4335 Some(hirn_storage::datasets::semantic::RECALL_HYDRATION_COLUMNS),
4336 hirn_storage::datasets::semantic::from_batch,
4337 MemoryRecord::Semantic,
4338 ),
4339 Self::scan_memory_dataset_records_for_ids_projected(
4340 self.storage_backend(),
4341 "recall hydration",
4342 hirn_storage::datasets::procedural::DATASET_NAME,
4343 &procedural_scan_ids,
4344 Some(hirn_storage::datasets::procedural::RECALL_HYDRATION_COLUMNS),
4345 hirn_storage::datasets::procedural::from_batch,
4346 MemoryRecord::Procedural,
4347 ),
4348 )?;
4349
4350 let mut result: HashMap<MemoryId, MemoryRecord> = HashMap::with_capacity(ids.len());
4351 result.extend(working);
4352 result.extend(episodic);
4353 result.extend(semantic);
4354 result.extend(procedural);
4355
4356 Ok(result)
4357 }
4358
4359 fn memory_ids_exact_filter(ids: &[MemoryId]) -> Option<hirn_storage::store::ExactMatchFilter> {
4360 hirn_storage::store::ExactMatchFilter::utf8_values(
4361 "id",
4362 ids.iter().map(ToString::to_string),
4363 )
4364 }
4365
4366 async fn scan_memory_dataset_records_for_ids<T, F>(
4367 storage: &dyn hirn_storage::PhysicalStore,
4368 context: &'static str,
4369 dataset: &'static str,
4370 ids: &[MemoryId],
4371 from_batch: fn(&arrow_array::RecordBatch) -> Result<Vec<T>, hirn_storage::HirnDbError>,
4372 wrap: F,
4373 ) -> HirnResult<HashMap<MemoryId, MemoryRecord>>
4374 where
4375 F: Fn(T) -> MemoryRecord,
4376 {
4377 Self::scan_memory_dataset_records_for_ids_projected(
4378 storage, context, dataset, ids, None, from_batch, wrap,
4379 )
4380 .await
4381 }
4382
4383 async fn scan_memory_dataset_records_for_ids_projected<T, F>(
4388 storage: &dyn hirn_storage::PhysicalStore,
4389 context: &'static str,
4390 dataset: &'static str,
4391 ids: &[MemoryId],
4392 columns: Option<&[&str]>,
4393 from_batch: fn(&arrow_array::RecordBatch) -> Result<Vec<T>, hirn_storage::HirnDbError>,
4394 wrap: F,
4395 ) -> HirnResult<HashMap<MemoryId, MemoryRecord>>
4396 where
4397 F: Fn(T) -> MemoryRecord,
4398 {
4399 let Some(exact_filter) = Self::memory_ids_exact_filter(ids) else {
4400 return Ok(HashMap::new());
4401 };
4402
4403 Self::scan_memory_dataset_records(
4404 storage,
4405 context,
4406 dataset,
4407 hirn_storage::store::ScanOptions {
4408 exact_filter: Some(exact_filter),
4409 columns: columns.map(|cols| cols.iter().map(|c| (*c).to_string()).collect()),
4410 limit: None,
4411 ..Default::default()
4412 },
4413 from_batch,
4414 wrap,
4415 )
4416 .await
4417 }
4418
4419 async fn scan_memory_dataset_record<T>(
4420 storage: &dyn hirn_storage::PhysicalStore,
4421 context: &'static str,
4422 dataset: &'static str,
4423 options: hirn_storage::store::ScanOptions,
4424 from_batch: fn(&arrow_array::RecordBatch) -> Result<Vec<T>, hirn_storage::HirnDbError>,
4425 ) -> HirnResult<Option<T>> {
4426 let mut stream = storage
4427 .scan_stream(dataset, options)
4428 .await
4429 .map_err(|error| {
4430 HirnError::storage(format!(
4431 "failed to scan {context} dataset `{dataset}`: {error}"
4432 ))
4433 })?;
4434
4435 while let Some(batch) = stream.try_next().await.map_err(|error| {
4436 HirnError::storage(format!(
4437 "failed to stream {context} dataset `{dataset}`: {error}"
4438 ))
4439 })? {
4440 let records = from_batch(&batch).map_err(|error| {
4441 HirnError::storage(format!(
4442 "failed to decode {context} dataset `{dataset}`: {error}"
4443 ))
4444 })?;
4445 if let Some(record) = records.into_iter().next() {
4446 return Ok(Some(record));
4447 }
4448 }
4449
4450 Ok(None)
4451 }
4452
4453 async fn scan_memory_dataset_records<T, F>(
4454 storage: &dyn hirn_storage::PhysicalStore,
4455 context: &'static str,
4456 dataset: &'static str,
4457 options: hirn_storage::store::ScanOptions,
4458 from_batch: fn(&arrow_array::RecordBatch) -> Result<Vec<T>, hirn_storage::HirnDbError>,
4459 wrap: F,
4460 ) -> HirnResult<HashMap<MemoryId, MemoryRecord>>
4461 where
4462 F: Fn(T) -> MemoryRecord,
4463 {
4464 let mut stream = storage
4465 .scan_stream(dataset, options)
4466 .await
4467 .map_err(|error| {
4468 HirnError::storage(format!(
4469 "failed to scan {context} dataset `{dataset}`: {error}"
4470 ))
4471 })?;
4472
4473 let mut records = HashMap::new();
4474 while let Some(batch) = stream.try_next().await.map_err(|error| {
4475 HirnError::storage(format!(
4476 "failed to stream {context} dataset `{dataset}`: {error}"
4477 ))
4478 })? {
4479 let entries = from_batch(&batch).map_err(|error| {
4480 HirnError::storage(format!(
4481 "failed to decode {context} dataset `{dataset}`: {error}"
4482 ))
4483 })?;
4484 for entry in entries {
4485 let record = wrap(entry);
4486 records.insert(record.id(), record);
4487 }
4488 }
4489
4490 Ok(records)
4491 }
4492 pub(crate) async fn count(&self) -> HirnResult<LayerCounts> {
4494 let storage = self.storage_backend();
4495 let working = self
4496 .storage_backend()
4497 .count(hirn_storage::datasets::working::DATASET_NAME, None)
4498 .await
4499 .map_err(|e| HirnError::storage(e))?;
4500
4501 let episodic = storage
4502 .count(hirn_storage::datasets::episodic::DATASET_NAME, None)
4503 .await
4504 .map_err(|e| HirnError::storage(e))?;
4505
4506 let semantic = storage
4507 .count(hirn_storage::datasets::semantic::DATASET_NAME, None)
4508 .await
4509 .map_err(|e| HirnError::storage(e))?;
4510
4511 let procedural = storage
4512 .count(hirn_storage::datasets::procedural::DATASET_NAME, None)
4513 .await
4514 .map_err(|e| HirnError::storage(e))?;
4515
4516 Ok(LayerCounts {
4517 working,
4518 episodic,
4519 semantic,
4520 procedural,
4521 total: working + episodic + semantic + procedural,
4522 })
4523 }
4524
4525 pub(crate) async fn stats(&self) -> HirnResult<DbStats> {
4527 let counts = self.count().await?;
4528
4529 let file_size_bytes = self.file_size_bytes();
4530
4531 let edge_count = self.cached_graph().edge_count().await.unwrap_or(0) as u64;
4532
4533 let node_count = self.cached_graph().node_count().await.unwrap_or(0) as u64;
4534
4535 metrics::gauge!(crate::metrics::MEMORY_COUNT).set(counts.total as f64);
4537 metrics::gauge!(crate::metrics::GRAPH_NODE_COUNT).set(node_count as f64);
4538 metrics::gauge!(crate::metrics::GRAPH_EDGES_TOTAL).set(edge_count as f64);
4539
4540 Ok(DbStats {
4541 working_count: counts.working,
4542 episodic_count: counts.episodic,
4543 semantic_count: counts.semantic,
4544 procedural_count: counts.procedural,
4545 total_count: counts.total,
4546 edge_count,
4547 file_size_bytes,
4548 })
4549 }
4550
4551 pub(crate) async fn episodes_in_range(
4556 &self,
4557 after: Timestamp,
4558 before: Timestamp,
4559 ) -> HirnResult<Vec<EpisodicRecord>> {
4560 self.list_episodes(&EpisodicFilter {
4561 after: Some(after),
4562 before: Some(before),
4563 ..Default::default()
4564 })
4565 .await
4566 }
4567
4568 pub(crate) async fn episodes_after(&self, after: Timestamp) -> HirnResult<Vec<EpisodicRecord>> {
4570 self.list_episodes(&EpisodicFilter {
4571 after: Some(after),
4572 ..Default::default()
4573 })
4574 .await
4575 }
4576
4577 pub(crate) async fn episodes_before(
4579 &self,
4580 before: Timestamp,
4581 ) -> HirnResult<Vec<EpisodicRecord>> {
4582 self.list_episodes(&EpisodicFilter {
4583 before: Some(before),
4584 ..Default::default()
4585 })
4586 .await
4587 }
4588
4589 pub(crate) async fn episodes_reverse(&self) -> HirnResult<Vec<EpisodicRecord>> {
4591 let mut records = self.list_episodes(&EpisodicFilter::default()).await?;
4592 records.reverse();
4593 Ok(records)
4594 }
4595
4596 pub(crate) fn recall(&self, query_embedding: Vec<f32>) -> RecallBuilder<'_> {
4600 RecallBuilder::new(self, query_embedding)
4601 }
4602
4603 pub async fn batch_recall<'a>(
4605 &'a self,
4606 builders: Vec<RecallBuilder<'a>>,
4607 ) -> Vec<HirnResult<Vec<RecallResult>>> {
4608 if builders.is_empty() {
4609 return Vec::new();
4610 }
4611
4612 {
4614 let mut checked: HashSet<(String, String)> = HashSet::new();
4615 for b in &builders {
4616 let agent = b.agent_id.as_deref().unwrap_or("anonymous").to_string();
4617 let ns = b
4618 .namespace
4619 .as_ref()
4620 .map_or(String::new(), |n| n.as_str().to_string());
4621 if checked.insert((agent.clone(), ns.clone())) {
4622 if let Err(e) = self
4623 .enforce(
4624 &agent,
4625 crate::policy::Action::Recall,
4626 &self.config.default_realm,
4627 &ns,
4628 )
4629 .await
4630 {
4631 let msg = format!("{e}");
4632 return builders
4633 .iter()
4634 .map(|_| Err(HirnError::AccessDenied(msg.clone())))
4635 .collect();
4636 }
4637 }
4638 }
4639 }
4640
4641 let futs = builders.into_iter().map(|b| b.execute());
4643 futures::future::join_all(futs).await
4644 }
4645
4646 pub(crate) fn think(&self, query_embedding: Vec<f32>) -> crate::think::ThinkBuilder<'_> {
4648 crate::think::ThinkBuilder::new(self, query_embedding)
4649 }
4650
4651 pub(crate) fn inspect(&self, id: MemoryId) -> crate::inspect::InspectBuilder<'_> {
4653 crate::inspect::InspectBuilder::new(self, id)
4654 }
4655
4656 pub(crate) fn trace(&self, id: MemoryId) -> crate::trace::TraceBuilder<'_> {
4658 crate::trace::TraceBuilder::new(self, id)
4659 }
4660
4661 pub fn embedding_dims(&self) -> usize {
4663 self.config.embedding_dimensions.as_usize()
4664 }
4665
4666 pub(crate) async fn mark_contradiction(
4668 &self,
4669 id: MemoryId,
4670 contradicts: MemoryId,
4671 ) -> HirnResult<()> {
4672 let _ = self
4673 .synchronize_contradiction_refs(id, contradicts, None)
4674 .await?;
4675 Ok(())
4676 }
4677
4678 pub(crate) async fn flush_semantic_access(&self) -> HirnResult<()> {
4681 let pending = self.graph_runtime().drain_semantic_access();
4682
4683 if pending.is_empty() {
4684 return Ok(());
4685 }
4686
4687 for (id, count) in &pending {
4688 if let Ok(mut record) = self.read_semantic_record(*id).await {
4689 for _ in 0..*count {
4690 record.record_access();
4691 }
4692 let _ = self.overwrite_semantic_record(&record).await;
4693 }
4694 }
4695
4696 Ok(())
4697 }
4698}
4699
4700#[cfg(test)]
4701mod tests {
4702 use std::sync::Arc;
4703
4704 use hirn_core::RevisionOperation;
4705 use hirn_core::Timestamp;
4706 use hirn_core::episodic::EpisodicRecord;
4707 use hirn_core::id::MemoryId;
4708 use hirn_core::record::MemoryRecord;
4709 use hirn_core::revision::{LogicalMemoryId, RevisionId};
4710 use hirn_core::types::{AgentId, EdgeRelation, EventType, Origin};
4711 use hirn_storage::memory_store::MemoryStore;
4712
4713 use super::*;
4714 use crate::retrieval::recall::{RecallPresentation, RecallResult};
4715 use crate::scoring::ScoreBreakdown;
4716
4717 fn agent() -> AgentId {
4718 AgentId::new("semantic_test").unwrap()
4719 }
4720
4721 async fn temp_db() -> HirnDB {
4722 let dir = tempfile::tempdir().unwrap();
4723 let config = HirnConfig::builder()
4724 .db_path(dir.path().join("semantic-db"))
4725 .working_memory_token_limit(1000)
4726 .build()
4727 .unwrap();
4728 HirnDB::open_with_config(config, Arc::new(MemoryStore::new()))
4729 .await
4730 .unwrap()
4731 }
4732
4733 fn semantic_record(
4734 id: MemoryId,
4735 logical_memory_id: LogicalMemoryId,
4736 created_at: Timestamp,
4737 version: u32,
4738 ) -> SemanticRecord {
4739 let mut record = SemanticRecord::builder()
4740 .concept("deploy_status")
4741 .description("deployment status")
4742 .agent_id(agent())
4743 .build()
4744 .unwrap();
4745 record.id = id;
4746 record.logical_memory_id = logical_memory_id;
4747 record.revision_id = RevisionId::from_memory_id(id);
4748 record.version = version;
4749 record.created_at = created_at;
4750 record.updated_at = created_at;
4751 record.last_accessed = created_at;
4752 record.valid_from = created_at;
4753 record.valid_until = None;
4754 record
4755 }
4756
4757 fn recall_result(record: MemoryRecord) -> RecallResult {
4758 RecallResult {
4759 record,
4760 similarity: 1.0,
4761 composite_score: 1.0,
4762 score_breakdown: ScoreBreakdown {
4763 similarity: 1.0,
4764 importance: 1.0,
4765 recency: 1.0,
4766 activation: 0.0,
4767 causal_relevance: 0.0,
4768 surprise: 0.0,
4769 source_reliability: 1.0,
4770 },
4771 revision: None,
4772 resource_evidence: Vec::new(),
4773 resource_preview_packages: Vec::new(),
4774 resource_score_attribution: Vec::new(),
4775 presentation: RecallPresentation::default(),
4776 }
4777 }
4778
4779 #[test]
4780 fn revision_snapshot_preserves_exact_recorded_boundary_when_timestamps_tie() {
4781 let created_at = Timestamp::from_millis(1_700_000_000_000);
4782 let original_id = MemoryId::parse("01ARZ3NDEKTSV4RRFFQ69G5FAW").unwrap();
4783 let successor_id = MemoryId::parse("01ARZ3NDEKTSV4RRFFQ69G5FAV").unwrap();
4784 let logical_memory_id = LogicalMemoryId::from_memory_id(original_id);
4785
4786 let original = semantic_record(original_id, logical_memory_id, created_at, 1);
4787 let mut successor = semantic_record(successor_id, logical_memory_id, created_at, 2);
4788 successor.revision_operation = RevisionOperation::Correct;
4789 successor.revision_reason = Some("post-incident correction".to_string());
4790 successor.revision_causation_id = Some(original.id);
4791
4792 let revision = semantic_snapshot_head_recorded_at_snapshot(
4793 &[original.clone(), successor],
4794 ResolvedRecallSnapshot::Revision {
4795 cutoff: created_at,
4796 revision_id: original.revision_id,
4797 logical_memory_id,
4798 version: original.version,
4799 },
4800 )
4801 .unwrap();
4802
4803 assert_eq!(revision.id, original.id);
4804 assert_eq!(revision.revision_id, original.revision_id);
4805 assert_eq!(revision.version, 1);
4806 }
4807
4808 #[tokio::test(flavor = "multi_thread")]
4809 async fn override_preserves_conflict_group_visibility_via_carried_contradictions() {
4810 let db = temp_db().await;
4811 let id_a = db
4812 .store_semantic(
4813 SemanticRecord::builder()
4814 .concept("status_a")
4815 .description("deployment succeeded")
4816 .origin(Origin::CrossAgent)
4817 .agent_id(agent())
4818 .build()
4819 .unwrap(),
4820 )
4821 .await
4822 .unwrap();
4823 let id_b = db
4824 .store_semantic(
4825 SemanticRecord::builder()
4826 .concept("status_b")
4827 .description("deployment failed")
4828 .origin(Origin::DirectObservation)
4829 .agent_id(AgentId::new("other_agent").unwrap())
4830 .build()
4831 .unwrap(),
4832 )
4833 .await
4834 .unwrap();
4835
4836 db.connect_with(id_a, id_b, EdgeRelation::Contradicts, 0.9, Metadata::new())
4837 .await
4838 .unwrap();
4839 db.mark_contradiction(id_a, id_b).await.unwrap();
4840
4841 let override_head = db
4842 .override_semantic(
4843 id_a,
4844 SemanticOverride {
4845 reason: Some("operator confirmed the successful rollout".into()),
4846 ..SemanticOverride::with_metadata(agent(), id_a)
4847 },
4848 )
4849 .await
4850 .unwrap();
4851
4852 let summary = crate::ql::context::detect_conflicts_for_record(
4853 &db,
4854 &MemoryRecord::Semantic(override_head.clone()),
4855 None,
4856 )
4857 .await;
4858
4859 assert_eq!(summary.groups.len(), 1);
4860 assert_eq!(
4861 summary.groups[0].preferred_memory_id,
4862 Some(override_head.id)
4863 );
4864 }
4865
4866 #[tokio::test(flavor = "multi_thread")]
4867 async fn normalize_current_recall_results_batches_episodic_heads() {
4868 let db = temp_db().await;
4869 let now = Timestamp::now();
4870
4871 let first_id = db
4872 .remember(
4873 EpisodicRecord::builder()
4874 .event_type(EventType::Observation)
4875 .content("first chain original")
4876 .summary("first chain original")
4877 .importance(0.6)
4878 .timestamp(now)
4879 .agent_id(agent())
4880 .build()
4881 .unwrap(),
4882 )
4883 .await
4884 .unwrap();
4885 let first_original = db.read_episodic_record(first_id).await.unwrap();
4886 let first_head = db
4887 .append_episodic_successor(
4888 &first_original,
4889 RevisionOperation::Correct,
4890 Some("normalize episodic head".to_string()),
4891 |next| {
4892 next.importance = 0.9;
4893 },
4894 )
4895 .await
4896 .unwrap();
4897
4898 let second_id = db
4899 .remember(
4900 EpisodicRecord::builder()
4901 .event_type(EventType::Observation)
4902 .content("second chain current")
4903 .summary("second chain current")
4904 .importance(0.5)
4905 .timestamp(now)
4906 .agent_id(agent())
4907 .build()
4908 .unwrap(),
4909 )
4910 .await
4911 .unwrap();
4912 let second_head = db.read_episodic_record(second_id).await.unwrap();
4913
4914 let normalized = db
4915 .normalize_current_recall_results(vec![
4916 recall_result(MemoryRecord::Episodic(first_original.clone())),
4917 recall_result(MemoryRecord::Episodic(second_head.clone())),
4918 recall_result(MemoryRecord::Episodic(first_head.clone())),
4919 ])
4920 .await
4921 .unwrap();
4922
4923 assert_eq!(normalized.len(), 2);
4924
4925 let normalized_ids = normalized
4926 .iter()
4927 .map(|result| match &result.record {
4928 MemoryRecord::Episodic(record) => record.id,
4929 other => panic!("expected episodic record, got {other:?}"),
4930 })
4931 .collect::<Vec<_>>();
4932
4933 assert!(normalized_ids.contains(&first_head.id));
4934 assert!(normalized_ids.contains(&second_head.id));
4935 assert!(!normalized_ids.contains(&first_original.id));
4936 assert!(normalized.iter().all(|result| result.revision.is_some()));
4937 }
4938
4939 #[tokio::test(flavor = "multi_thread")]
4940 async fn semantic_successors_preserve_nonsemantic_contradictions() {
4941 let db = temp_db().await;
4942 let semantic_id = db
4943 .store_semantic(
4944 SemanticRecord::builder()
4945 .concept("deploy_health")
4946 .description("deployment remained healthy")
4947 .agent_id(agent())
4948 .build()
4949 .unwrap(),
4950 )
4951 .await
4952 .unwrap();
4953 let episodic_id = db
4954 .remember_bypass_admission(
4955 EpisodicRecord::builder()
4956 .event_type(EventType::Observation)
4957 .content("deployment triggered error spikes")
4958 .summary("error spikes after deployment")
4959 .importance(0.9)
4960 .agent_id(agent())
4961 .build()
4962 .unwrap(),
4963 )
4964 .await
4965 .unwrap();
4966
4967 db.connect_with(
4968 semantic_id,
4969 episodic_id,
4970 EdgeRelation::Contradicts,
4971 0.9,
4972 Metadata::new(),
4973 )
4974 .await
4975 .unwrap();
4976
4977 let history_after_connect = db.semantic_history(semantic_id).await.unwrap();
4978 assert_eq!(history_after_connect.len(), 2);
4979 assert!(history_after_connect[0].contradiction_ids.is_empty());
4980 assert_eq!(
4981 history_after_connect[1].contradiction_ids,
4982 vec![episodic_id]
4983 );
4984
4985 let next = db
4986 .correct_semantic(
4987 semantic_id,
4988 SemanticUpdate {
4989 description: Some("deployment required rollback".into()),
4990 reason: Some("post-incident correction".into()),
4991 ..SemanticUpdate::with_metadata(agent(), semantic_id)
4992 },
4993 )
4994 .await
4995 .unwrap();
4996
4997 let current = db.read_semantic_record(next.id).await.unwrap();
4998 assert_eq!(current.contradiction_ids, vec![episodic_id]);
4999
5000 let edges = db
5001 .cached_graph()
5002 .get_edges_between(next.id, episodic_id)
5003 .await
5004 .unwrap();
5005 assert!(
5006 edges
5007 .iter()
5008 .any(|edge| edge.relation == EdgeRelation::Contradicts)
5009 );
5010
5011 let history = db.semantic_history(semantic_id).await.unwrap();
5012 assert_eq!(history.len(), 3);
5013 assert!(history[0].contradiction_ids.is_empty());
5014 assert_eq!(history[1].contradiction_ids, vec![episodic_id]);
5015 assert_eq!(history[2].contradiction_ids, vec![episodic_id]);
5016 }
5017
5018 #[tokio::test(flavor = "multi_thread")]
5019 async fn retract_semantic_records_applied_mutation_envelope() {
5020 let store = Arc::new(MemoryStore::new());
5021 let dir = tempfile::tempdir().unwrap();
5022 let config = HirnConfig::builder()
5023 .db_path(dir.path().join("semantic-retract-envelope"))
5024 .working_memory_token_limit(1000)
5025 .build()
5026 .unwrap();
5027 let db = HirnDB::open_with_config(config, store.clone())
5028 .await
5029 .unwrap();
5030
5031 let id = db
5032 .store_semantic(
5033 SemanticRecord::builder()
5034 .concept("retract_target")
5035 .description("retract me")
5036 .agent_id(agent())
5037 .build()
5038 .unwrap(),
5039 )
5040 .await
5041 .unwrap();
5042
5043 let tombstone = db
5044 .retract_semantic(id, SemanticRetraction::with_metadata(agent(), id))
5045 .await
5046 .unwrap();
5047
5048 let envelope = hirn_storage::get_mutation_envelope(
5049 store.as_ref(),
5050 &format!("semantic-retract:{}", tombstone.id),
5051 )
5052 .await
5053 .unwrap()
5054 .unwrap();
5055 assert_eq!(envelope.state, hirn_storage::MutationEnvelopeState::Applied);
5056 }
5057
5058 #[tokio::test(flavor = "multi_thread")]
5059 async fn store_semantic_records_applied_mutation_envelope() {
5060 let store = Arc::new(MemoryStore::new());
5061 let dir = tempfile::tempdir().unwrap();
5062 let config = HirnConfig::builder()
5063 .db_path(dir.path().join("semantic-create-envelope"))
5064 .working_memory_token_limit(1000)
5065 .build()
5066 .unwrap();
5067 let db = HirnDB::open_with_config(config, store.clone())
5068 .await
5069 .unwrap();
5070
5071 let id = db
5072 .store_semantic(
5073 SemanticRecord::builder()
5074 .concept("create_target")
5075 .description("create me")
5076 .agent_id(agent())
5077 .build()
5078 .unwrap(),
5079 )
5080 .await
5081 .unwrap();
5082
5083 let envelope =
5084 hirn_storage::get_mutation_envelope(store.as_ref(), &format!("semantic-create:{id}"))
5085 .await
5086 .unwrap()
5087 .unwrap();
5088 assert_eq!(envelope.state, hirn_storage::MutationEnvelopeState::Applied);
5089 assert!(db.cached_graph().has_node(id).await.unwrap());
5090 }
5091
5092 #[tokio::test(flavor = "multi_thread")]
5093 async fn batch_store_semantic_records_applied_mutation_envelopes() {
5094 let store = Arc::new(MemoryStore::new());
5095 let dir = tempfile::tempdir().unwrap();
5096 let config = HirnConfig::builder()
5097 .db_path(dir.path().join("semantic-batch-create-envelope"))
5098 .working_memory_token_limit(1000)
5099 .build()
5100 .unwrap();
5101 let db = HirnDB::open_with_config(config, store.clone())
5102 .await
5103 .unwrap();
5104
5105 let records = vec![
5106 SemanticRecord::builder()
5107 .concept("batch_create_a")
5108 .description("alpha")
5109 .agent_id(agent())
5110 .build()
5111 .unwrap(),
5112 SemanticRecord::builder()
5113 .concept("batch_create_b")
5114 .description("beta")
5115 .agent_id(agent())
5116 .build()
5117 .unwrap(),
5118 ];
5119 let record_ids = records.iter().map(|record| record.id).collect::<Vec<_>>();
5120
5121 let results = db.batch_store_semantic(records).await;
5122
5123 assert!(results.iter().all(|result| result.is_ok()));
5124 for id in record_ids {
5125 let envelope = hirn_storage::get_mutation_envelope(
5126 store.as_ref(),
5127 &format!("semantic-create:{id}"),
5128 )
5129 .await
5130 .unwrap()
5131 .unwrap();
5132 assert_eq!(envelope.state, hirn_storage::MutationEnvelopeState::Applied);
5133 assert!(db.cached_graph().has_node(id).await.unwrap());
5134 }
5135 }
5136
5137 #[tokio::test(flavor = "multi_thread")]
5138 async fn correct_semantic_records_applied_mutation_envelope() {
5139 let store = Arc::new(MemoryStore::new());
5140 let dir = tempfile::tempdir().unwrap();
5141 let config = HirnConfig::builder()
5142 .db_path(dir.path().join("semantic-successor-envelope"))
5143 .working_memory_token_limit(1000)
5144 .build()
5145 .unwrap();
5146 let db = HirnDB::open_with_config(config, store.clone())
5147 .await
5148 .unwrap();
5149
5150 let id = db
5151 .store_semantic(
5152 SemanticRecord::builder()
5153 .concept("successor_target")
5154 .description("original description")
5155 .agent_id(agent())
5156 .build()
5157 .unwrap(),
5158 )
5159 .await
5160 .unwrap();
5161
5162 let corrected = db
5163 .correct_semantic(
5164 id,
5165 SemanticUpdate {
5166 description: Some("corrected description".into()),
5167 ..SemanticUpdate::with_metadata(agent(), id)
5168 },
5169 )
5170 .await
5171 .unwrap();
5172
5173 let envelope = hirn_storage::get_mutation_envelope(
5174 store.as_ref(),
5175 &format!("semantic-successor:{}", corrected.id),
5176 )
5177 .await
5178 .unwrap()
5179 .unwrap();
5180 assert_eq!(envelope.state, hirn_storage::MutationEnvelopeState::Applied);
5181 assert!(!db.cached_graph().has_node(id).await.unwrap());
5182 }
5183
5184 #[tokio::test(flavor = "multi_thread")]
5185 async fn merge_semantic_records_applied_mutation_envelope() {
5186 let store = Arc::new(MemoryStore::new());
5187 let dir = tempfile::tempdir().unwrap();
5188 let config = HirnConfig::builder()
5189 .db_path(dir.path().join("semantic-merge-envelope"))
5190 .working_memory_token_limit(1000)
5191 .build()
5192 .unwrap();
5193 let db = HirnDB::open_with_config(config, store.clone())
5194 .await
5195 .unwrap();
5196
5197 let target_id = db
5198 .store_semantic(
5199 SemanticRecord::builder()
5200 .concept("deploy_summary")
5201 .description("deployment succeeded")
5202 .agent_id(agent())
5203 .build()
5204 .unwrap(),
5205 )
5206 .await
5207 .unwrap();
5208 let source_id = db
5209 .store_semantic(
5210 SemanticRecord::builder()
5211 .concept("deploy_summary")
5212 .description("deployment recovered after retry")
5213 .agent_id(AgentId::new("other_agent").unwrap())
5214 .build()
5215 .unwrap(),
5216 )
5217 .await
5218 .unwrap();
5219
5220 let outcome = db
5221 .merge_semantic(
5222 target_id,
5223 SemanticMerge {
5224 source_ids: vec![source_id],
5225 description: Some("canonical deployment summary".into()),
5226 reason: Some("dedupe".into()),
5227 ..SemanticMerge::with_metadata(agent(), target_id)
5228 },
5229 )
5230 .await
5231 .unwrap();
5232
5233 let envelope = hirn_storage::get_mutation_envelope(
5234 store.as_ref(),
5235 &format!("semantic-merge:{}", outcome.target.id),
5236 )
5237 .await
5238 .unwrap()
5239 .unwrap();
5240 assert_eq!(envelope.state, hirn_storage::MutationEnvelopeState::Applied);
5241 assert!(!db.cached_graph().has_node(target_id).await.unwrap());
5242 assert!(!db.cached_graph().has_node(source_id).await.unwrap());
5243 assert!(db.cached_graph().has_node(outcome.target.id).await.unwrap());
5244 assert!(
5245 db.cached_graph()
5246 .has_node(outcome.merged_sources[0].id)
5247 .await
5248 .unwrap()
5249 );
5250 }
5251
5252 #[tokio::test(flavor = "multi_thread")]
5253 async fn contradiction_connect_records_applied_mutation_envelope() {
5254 let store = Arc::new(MemoryStore::new());
5255 let dir = tempfile::tempdir().unwrap();
5256 let config = HirnConfig::builder()
5257 .db_path(dir.path().join("semantic-contradiction-envelope"))
5258 .working_memory_token_limit(1000)
5259 .build()
5260 .unwrap();
5261 let db = HirnDB::open_with_config(config, store.clone())
5262 .await
5263 .unwrap();
5264
5265 let id_a = db
5266 .store_semantic(
5267 SemanticRecord::builder()
5268 .concept("deploy_status_a")
5269 .description("deployment succeeded")
5270 .agent_id(agent())
5271 .build()
5272 .unwrap(),
5273 )
5274 .await
5275 .unwrap();
5276 let id_b = db
5277 .store_semantic(
5278 SemanticRecord::builder()
5279 .concept("deploy_status_b")
5280 .description("deployment failed")
5281 .agent_id(AgentId::new("other_agent").unwrap())
5282 .build()
5283 .unwrap(),
5284 )
5285 .await
5286 .unwrap();
5287
5288 db.connect_with(id_a, id_b, EdgeRelation::Contradicts, 0.9, Metadata::new())
5289 .await
5290 .unwrap();
5291
5292 let history_a = db.semantic_history(id_a).await.unwrap();
5293 let history_b = db.semantic_history(id_b).await.unwrap();
5294 let head_a = history_a.last().unwrap();
5295 let head_b = history_b.last().unwrap();
5296 let envelope = build_semantic_contradiction_sync_envelope(
5297 vec![id_a, id_b],
5298 vec![head_a.id, head_b.id],
5299 )
5300 .unwrap();
5301
5302 let stored = hirn_storage::get_mutation_envelope(store.as_ref(), &envelope.id)
5303 .await
5304 .unwrap()
5305 .unwrap();
5306 assert_eq!(stored.state, hirn_storage::MutationEnvelopeState::Applied);
5307 assert!(!db.cached_graph().has_node(id_a).await.unwrap());
5308 assert!(!db.cached_graph().has_node(id_b).await.unwrap());
5309 }
5310
5311 #[tokio::test(flavor = "multi_thread")]
5312 async fn purge_semantic_records_applied_mutation_envelope() {
5313 let store = Arc::new(MemoryStore::new());
5314 let dir = tempfile::tempdir().unwrap();
5315 let config = HirnConfig::builder()
5316 .db_path(dir.path().join("semantic-purge-envelope"))
5317 .working_memory_token_limit(1000)
5318 .build()
5319 .unwrap();
5320 let db = HirnDB::open_with_config(config, store.clone())
5321 .await
5322 .unwrap();
5323
5324 let id = db
5325 .store_semantic(
5326 SemanticRecord::builder()
5327 .concept("purge_target")
5328 .description("purge me")
5329 .agent_id(agent())
5330 .build()
5331 .unwrap(),
5332 )
5333 .await
5334 .unwrap();
5335 let corrected = db
5336 .correct_semantic(
5337 id,
5338 SemanticUpdate {
5339 description: Some("purge me v2".into()),
5340 ..SemanticUpdate::with_metadata(agent(), id)
5341 },
5342 )
5343 .await
5344 .unwrap();
5345 let logical_memory_id = db
5346 .read_semantic_record(corrected.id)
5347 .await
5348 .unwrap()
5349 .logical_memory_id;
5350
5351 db.purge_semantic(corrected.id).await.unwrap();
5352
5353 let envelope = hirn_storage::get_mutation_envelope(
5354 store.as_ref(),
5355 &format!("semantic-purge:{logical_memory_id}"),
5356 )
5357 .await
5358 .unwrap()
5359 .unwrap();
5360 assert_eq!(envelope.state, hirn_storage::MutationEnvelopeState::Applied);
5361 assert!(!db.cached_graph().has_node(id).await.unwrap());
5362 assert!(!db.cached_graph().has_node(corrected.id).await.unwrap());
5363 assert!(matches!(
5364 db.semantic_head_for_logical_id(logical_memory_id).await,
5365 Err(HirnError::NotFound(_))
5366 ));
5367 }
5368
5369 #[tokio::test(flavor = "multi_thread")]
5370 async fn open_reconciles_pending_semantic_retract_mutations() {
5371 let dir = tempfile::tempdir().unwrap();
5372 let path = dir.path().join("semantic-retract-envelope-recovery");
5373 let store = Arc::new(MemoryStore::new());
5374 let config = HirnConfig::builder()
5375 .db_path(&path)
5376 .working_memory_token_limit(1000)
5377 .build()
5378 .unwrap();
5379
5380 let db = HirnDB::open_with_config(config.clone(), store.clone())
5381 .await
5382 .unwrap();
5383 let id = db
5384 .store_semantic(
5385 SemanticRecord::builder()
5386 .concept("recovery_target")
5387 .description("pending retract")
5388 .agent_id(agent())
5389 .build()
5390 .unwrap(),
5391 )
5392 .await
5393 .unwrap();
5394 let current = db.read_semantic_record(id).await.unwrap();
5395
5396 let mut tombstone = current.clone();
5397 let tombstone_id = MemoryId::new();
5398 let now = Timestamp::now();
5399 tombstone.id = tombstone_id;
5400 tombstone.revision_id = RevisionId::from_memory_id(tombstone_id);
5401 tombstone.version = current.version + 1;
5402 tombstone.revision_operation = RevisionOperation::Retract;
5403 tombstone.revision_reason = Some("manual recovery test".into());
5404 tombstone.revision_causation_id = Some(current.id);
5405 tombstone.created_at = now;
5406 tombstone.updated_at = now;
5407 tombstone.valid_from = now;
5408 tombstone.valid_until = None;
5409 tombstone.superseded_by = None;
5410 tombstone.merged_into = None;
5411 normalize_semantic_record_timestamps(&mut tombstone);
5412
5413 db.append_semantic_record(&tombstone).await.unwrap();
5414 let envelope = build_semantic_retract_envelope(current.id, tombstone.id).unwrap();
5415 hirn_storage::append_mutation_envelope(store.as_ref(), &envelope)
5416 .await
5417 .unwrap();
5418
5419 assert!(db.cached_graph().has_node(current.id).await.unwrap());
5420 drop(db);
5421
5422 let reopened = HirnDB::open_with_config(config, store.clone())
5423 .await
5424 .unwrap();
5425
5426 assert!(!reopened.cached_graph().has_node(current.id).await.unwrap());
5427 let stored_envelope = hirn_storage::get_mutation_envelope(store.as_ref(), &envelope.id)
5428 .await
5429 .unwrap()
5430 .unwrap();
5431 assert_eq!(
5432 stored_envelope.state,
5433 hirn_storage::MutationEnvelopeState::Applied
5434 );
5435 }
5436
5437 #[tokio::test(flavor = "multi_thread")]
5438 async fn open_reconciles_pending_semantic_create_mutations_without_resurrecting_stale_heads() {
5439 let dir = tempfile::tempdir().unwrap();
5440 let path = dir
5441 .path()
5442 .join("semantic-create-envelope-recovery-stale-head");
5443 let store = Arc::new(MemoryStore::new());
5444 let config = HirnConfig::builder()
5445 .db_path(&path)
5446 .working_memory_token_limit(1000)
5447 .build()
5448 .unwrap();
5449
5450 let db = HirnDB::open_with_config(config.clone(), store.clone())
5451 .await
5452 .unwrap();
5453 let id = db
5454 .store_semantic(
5455 SemanticRecord::builder()
5456 .concept("create_recovery_target")
5457 .description("pending create")
5458 .agent_id(agent())
5459 .build()
5460 .unwrap(),
5461 )
5462 .await
5463 .unwrap();
5464 let create_envelope = build_semantic_create_envelope(id).unwrap();
5465 hirn_storage::append_mutation_envelope(store.as_ref(), &create_envelope)
5466 .await
5467 .unwrap();
5468
5469 let corrected = db
5470 .correct_semantic(
5471 id,
5472 SemanticUpdate {
5473 description: Some("pending create v2".into()),
5474 ..SemanticUpdate::with_metadata(agent(), id)
5475 },
5476 )
5477 .await
5478 .unwrap();
5479 let corrected_record = db.read_semantic_record(corrected.id).await.unwrap();
5480
5481 assert!(!db.cached_graph().has_node(id).await.unwrap());
5482 assert!(db.cached_graph().has_node(corrected.id).await.unwrap());
5483 drop(db);
5484
5485 let reopened = HirnDB::open_with_config(config, store.clone())
5486 .await
5487 .unwrap();
5488
5489 assert!(!reopened.cached_graph().has_node(id).await.unwrap());
5490 assert!(
5491 reopened
5492 .cached_graph()
5493 .has_node(corrected.id)
5494 .await
5495 .unwrap()
5496 );
5497 let head = reopened
5498 .semantic_head_for_logical_id(corrected_record.logical_memory_id)
5499 .await
5500 .unwrap();
5501 assert_eq!(head.id, corrected.id);
5502 let stored_envelope =
5503 hirn_storage::get_mutation_envelope(store.as_ref(), &create_envelope.id)
5504 .await
5505 .unwrap()
5506 .unwrap();
5507 assert_eq!(
5508 stored_envelope.state,
5509 hirn_storage::MutationEnvelopeState::Applied
5510 );
5511 }
5512
5513 #[tokio::test(flavor = "multi_thread")]
5514 async fn open_marks_missing_pending_semantic_create_mutations_failed_and_cleans_graph() {
5515 let dir = tempfile::tempdir().unwrap();
5516 let path = dir.path().join("semantic-create-envelope-recovery-missing");
5517 let store = Arc::new(MemoryStore::new());
5518 let config = HirnConfig::builder()
5519 .db_path(&path)
5520 .working_memory_token_limit(1000)
5521 .build()
5522 .unwrap();
5523
5524 let db = HirnDB::open_with_config(config.clone(), store.clone())
5525 .await
5526 .unwrap();
5527 let record = SemanticRecord::builder()
5528 .concept("missing_create_recovery")
5529 .description("orphaned graph node")
5530 .agent_id(agent())
5531 .build()
5532 .unwrap();
5533 let envelope = build_semantic_create_envelope(record.id).unwrap();
5534 hirn_storage::append_mutation_envelope(store.as_ref(), &envelope)
5535 .await
5536 .unwrap();
5537 db.cached_graph()
5538 .add_node(
5539 record.id,
5540 Layer::Semantic,
5541 record.confidence,
5542 record.created_at,
5543 record.namespace,
5544 )
5545 .await
5546 .unwrap();
5547
5548 assert!(db.cached_graph().has_node(record.id).await.unwrap());
5549 drop(db);
5550
5551 let reopened = HirnDB::open_with_config(config, store.clone())
5552 .await
5553 .unwrap();
5554
5555 assert!(!reopened.cached_graph().has_node(record.id).await.unwrap());
5556 let stored_envelope = hirn_storage::get_mutation_envelope(store.as_ref(), &envelope.id)
5557 .await
5558 .unwrap()
5559 .unwrap();
5560 assert_eq!(
5561 stored_envelope.state,
5562 hirn_storage::MutationEnvelopeState::Failed
5563 );
5564 }
5565
5566 #[tokio::test(flavor = "multi_thread")]
5567 async fn open_reconciles_pending_semantic_successor_mutations() {
5568 let dir = tempfile::tempdir().unwrap();
5569 let path = dir.path().join("semantic-successor-envelope-recovery");
5570 let store = Arc::new(MemoryStore::new());
5571 let config = HirnConfig::builder()
5572 .db_path(&path)
5573 .working_memory_token_limit(1000)
5574 .build()
5575 .unwrap();
5576
5577 let db = HirnDB::open_with_config(config.clone(), store.clone())
5578 .await
5579 .unwrap();
5580 let id = db
5581 .store_semantic(
5582 SemanticRecord::builder()
5583 .concept("successor_recovery_target")
5584 .description("pending successor")
5585 .agent_id(agent())
5586 .build()
5587 .unwrap(),
5588 )
5589 .await
5590 .unwrap();
5591 let current = db.read_semantic_record(id).await.unwrap();
5592
5593 let mut successor = current.clone();
5594 let successor_id = MemoryId::new();
5595 let now = Timestamp::now();
5596 successor.id = successor_id;
5597 successor.revision_id = RevisionId::from_memory_id(successor_id);
5598 successor.version = current.version + 1;
5599 successor.revision_operation = RevisionOperation::Correct;
5600 successor.revision_reason = Some("manual recovery test".into());
5601 successor.revision_causation_id = Some(current.id);
5602 successor.description = "recovered successor".into();
5603 successor.created_at = now;
5604 successor.updated_at = now;
5605 successor.valid_from = current.valid_from;
5606 successor.valid_until = None;
5607 successor.superseded_by = None;
5608 successor.merged_into = None;
5609 normalize_semantic_record_timestamps(&mut successor);
5610
5611 db.append_semantic_record(&successor).await.unwrap();
5612 let envelope = build_semantic_successor_envelope(current.id, successor.id).unwrap();
5613 hirn_storage::append_mutation_envelope(store.as_ref(), &envelope)
5614 .await
5615 .unwrap();
5616
5617 assert!(db.cached_graph().has_node(current.id).await.unwrap());
5618 drop(db);
5619
5620 let reopened = HirnDB::open_with_config(config, store.clone())
5621 .await
5622 .unwrap();
5623
5624 assert!(!reopened.cached_graph().has_node(current.id).await.unwrap());
5625 assert!(
5626 reopened
5627 .cached_graph()
5628 .has_node(successor.id)
5629 .await
5630 .unwrap()
5631 );
5632 let head = reopened
5633 .semantic_head_for_logical_id(current.logical_memory_id)
5634 .await
5635 .unwrap();
5636 assert_eq!(head.id, successor.id);
5637 let stored_envelope = hirn_storage::get_mutation_envelope(store.as_ref(), &envelope.id)
5638 .await
5639 .unwrap()
5640 .unwrap();
5641 assert_eq!(
5642 stored_envelope.state,
5643 hirn_storage::MutationEnvelopeState::Applied
5644 );
5645 }
5646
5647 #[tokio::test(flavor = "multi_thread")]
5648 async fn open_reconciles_pending_semantic_merge_mutations_without_resurrecting_stale_heads() {
5649 let dir = tempfile::tempdir().unwrap();
5650 let path = dir.path().join("semantic-merge-envelope-recovery");
5651 let store = Arc::new(MemoryStore::new());
5652 let config = HirnConfig::builder()
5653 .db_path(&path)
5654 .working_memory_token_limit(1000)
5655 .build()
5656 .unwrap();
5657
5658 let db = HirnDB::open_with_config(config.clone(), store.clone())
5659 .await
5660 .unwrap();
5661 let target_id = db
5662 .store_semantic(
5663 SemanticRecord::builder()
5664 .concept("merge_recovery")
5665 .description("target head")
5666 .agent_id(agent())
5667 .build()
5668 .unwrap(),
5669 )
5670 .await
5671 .unwrap();
5672 let source_id = db
5673 .store_semantic(
5674 SemanticRecord::builder()
5675 .concept("merge_recovery")
5676 .description("source head")
5677 .agent_id(AgentId::new("merge_source_agent").unwrap())
5678 .build()
5679 .unwrap(),
5680 )
5681 .await
5682 .unwrap();
5683 let current = db.read_semantic_record(target_id).await.unwrap();
5684 let source = db.read_semantic_record(source_id).await.unwrap();
5685
5686 let successor_id = MemoryId::new();
5687 let successor_now = Timestamp::now();
5688 let mut successor = current.clone();
5689 successor.id = successor_id;
5690 successor.revision_id = RevisionId::from_memory_id(successor_id);
5691 successor.version = current.version + 1;
5692 successor.revision_operation = RevisionOperation::Correct;
5693 successor.revision_reason = Some("manual successor recovery".into());
5694 successor.revision_causation_id = Some(current.id);
5695 successor.description = "intermediate target head".into();
5696 successor.created_at = successor_now;
5697 successor.updated_at = successor_now;
5698 successor.valid_from = current.valid_from;
5699 successor.valid_until = None;
5700 successor.superseded_by = None;
5701 successor.merged_into = None;
5702 normalize_semantic_record_timestamps(&mut successor);
5703
5704 db.append_semantic_record(&successor).await.unwrap();
5705 let successor_envelope =
5706 build_semantic_successor_envelope(current.id, successor.id).unwrap();
5707 hirn_storage::append_mutation_envelope(store.as_ref(), &successor_envelope)
5708 .await
5709 .unwrap();
5710
5711 let merge_now = Timestamp::now();
5712 let merged_target_id = MemoryId::new();
5713 let mut merged_target = successor.clone();
5714 merged_target.id = merged_target_id;
5715 merged_target.revision_id = RevisionId::from_memory_id(merged_target_id);
5716 merged_target.version = successor.version + 1;
5717 merged_target.revision_operation = RevisionOperation::Merge;
5718 merged_target.revision_reason = Some("manual merge recovery".into());
5719 merged_target.revision_causation_id = Some(successor.id);
5720 merged_target.description = "canonical merged target".into();
5721 merged_target.created_at = merge_now;
5722 merged_target.updated_at = merge_now;
5723 merged_target.valid_from = merge_now;
5724 merged_target.valid_until = None;
5725 merged_target.superseded_by = None;
5726 merged_target.merged_into = None;
5727 normalize_semantic_record_timestamps(&mut merged_target);
5728
5729 let merged_source_id = MemoryId::new();
5730 let mut merged_source = source.clone();
5731 merged_source.id = merged_source_id;
5732 merged_source.revision_id = RevisionId::from_memory_id(merged_source_id);
5733 merged_source.version = source.version + 1;
5734 merged_source.revision_operation = RevisionOperation::Merge;
5735 merged_source.revision_reason = Some("manual merge recovery".into());
5736 merged_source.revision_causation_id = Some(merged_target.id);
5737 merged_source.created_at = merge_now;
5738 merged_source.updated_at = merge_now;
5739 merged_source.valid_from = merge_now;
5740 merged_source.valid_until = None;
5741 merged_source.superseded_by = None;
5742 merged_source.merged_into = Some(merged_target.logical_memory_id);
5743 normalize_semantic_record_timestamps(&mut merged_source);
5744
5745 db.append_semantic_records(&[merged_target.clone(), merged_source.clone()])
5746 .await
5747 .unwrap();
5748 let merge_envelope = build_semantic_merge_envelope(
5749 successor.id,
5750 merged_target.id,
5751 vec![source.id],
5752 vec![merged_source.id],
5753 )
5754 .unwrap();
5755 hirn_storage::append_mutation_envelope(store.as_ref(), &merge_envelope)
5756 .await
5757 .unwrap();
5758
5759 assert!(db.cached_graph().has_node(current.id).await.unwrap());
5760 assert!(db.cached_graph().has_node(source.id).await.unwrap());
5761 drop(db);
5762
5763 let reopened = HirnDB::open_with_config(config, store.clone())
5764 .await
5765 .unwrap();
5766
5767 assert!(!reopened.cached_graph().has_node(current.id).await.unwrap());
5768 assert!(!reopened.cached_graph().has_node(source.id).await.unwrap());
5769 assert!(
5770 !reopened
5771 .cached_graph()
5772 .has_node(successor.id)
5773 .await
5774 .unwrap()
5775 );
5776 assert!(
5777 reopened
5778 .cached_graph()
5779 .has_node(merged_target.id)
5780 .await
5781 .unwrap()
5782 );
5783 assert!(
5784 reopened
5785 .cached_graph()
5786 .has_node(merged_source.id)
5787 .await
5788 .unwrap()
5789 );
5790
5791 let target_head = reopened
5792 .semantic_head_for_logical_id(current.logical_memory_id)
5793 .await
5794 .unwrap();
5795 let source_head = reopened
5796 .semantic_head_for_logical_id(source.logical_memory_id)
5797 .await
5798 .unwrap();
5799 assert_eq!(target_head.id, merged_target.id);
5800 assert_eq!(source_head.id, merged_source.id);
5801
5802 let stored_successor_envelope =
5803 hirn_storage::get_mutation_envelope(store.as_ref(), &successor_envelope.id)
5804 .await
5805 .unwrap()
5806 .unwrap();
5807 assert_eq!(
5808 stored_successor_envelope.state,
5809 hirn_storage::MutationEnvelopeState::Applied
5810 );
5811 let stored_merge_envelope =
5812 hirn_storage::get_mutation_envelope(store.as_ref(), &merge_envelope.id)
5813 .await
5814 .unwrap()
5815 .unwrap();
5816 assert_eq!(
5817 stored_merge_envelope.state,
5818 hirn_storage::MutationEnvelopeState::Applied
5819 );
5820 }
5821
5822 #[tokio::test(flavor = "multi_thread")]
5823 async fn open_reconciles_pending_semantic_contradiction_sync_mutations() {
5824 let dir = tempfile::tempdir().unwrap();
5825 let path = dir.path().join("semantic-contradiction-envelope-recovery");
5826 let store = Arc::new(MemoryStore::new());
5827 let config = HirnConfig::builder()
5828 .db_path(&path)
5829 .working_memory_token_limit(1000)
5830 .build()
5831 .unwrap();
5832
5833 let db = HirnDB::open_with_config(config.clone(), store.clone())
5834 .await
5835 .unwrap();
5836 let id_a = db
5837 .store_semantic(
5838 SemanticRecord::builder()
5839 .concept("contradiction_recovery_a")
5840 .description("deployment succeeded")
5841 .agent_id(agent())
5842 .build()
5843 .unwrap(),
5844 )
5845 .await
5846 .unwrap();
5847 let id_b = db
5848 .store_semantic(
5849 SemanticRecord::builder()
5850 .concept("contradiction_recovery_b")
5851 .description("deployment failed")
5852 .agent_id(AgentId::new("contradiction_source_agent").unwrap())
5853 .build()
5854 .unwrap(),
5855 )
5856 .await
5857 .unwrap();
5858 let current_a = db.read_semantic_record(id_a).await.unwrap();
5859 let current_b = db.read_semantic_record(id_b).await.unwrap();
5860
5861 let now = Timestamp::now();
5862 let successor_a_id = MemoryId::new();
5863 let successor_b_id = MemoryId::new();
5864 let successor_a = db.prepare_semantic_contradiction_successor(
5865 ¤t_a,
5866 successor_a_id,
5867 successor_b_id,
5868 now,
5869 );
5870 let successor_b = db.prepare_semantic_contradiction_successor(
5871 ¤t_b,
5872 successor_b_id,
5873 successor_a_id,
5874 now,
5875 );
5876
5877 db.append_semantic_records(&[successor_a.clone(), successor_b.clone()])
5878 .await
5879 .unwrap();
5880 let envelope = build_semantic_contradiction_sync_envelope(
5881 vec![current_a.id, current_b.id],
5882 vec![successor_a.id, successor_b.id],
5883 )
5884 .unwrap();
5885 hirn_storage::append_mutation_envelope(store.as_ref(), &envelope)
5886 .await
5887 .unwrap();
5888
5889 assert!(db.cached_graph().has_node(current_a.id).await.unwrap());
5890 assert!(db.cached_graph().has_node(current_b.id).await.unwrap());
5891 drop(db);
5892
5893 let reopened = HirnDB::open_with_config(config, store.clone())
5894 .await
5895 .unwrap();
5896
5897 assert!(
5898 !reopened
5899 .cached_graph()
5900 .has_node(current_a.id)
5901 .await
5902 .unwrap()
5903 );
5904 assert!(
5905 !reopened
5906 .cached_graph()
5907 .has_node(current_b.id)
5908 .await
5909 .unwrap()
5910 );
5911 assert!(
5912 reopened
5913 .cached_graph()
5914 .has_node(successor_a.id)
5915 .await
5916 .unwrap()
5917 );
5918 assert!(
5919 reopened
5920 .cached_graph()
5921 .has_node(successor_b.id)
5922 .await
5923 .unwrap()
5924 );
5925
5926 let head_a = reopened
5927 .semantic_head_for_logical_id(current_a.logical_memory_id)
5928 .await
5929 .unwrap();
5930 let head_b = reopened
5931 .semantic_head_for_logical_id(current_b.logical_memory_id)
5932 .await
5933 .unwrap();
5934 assert_eq!(head_a.id, successor_a.id);
5935 assert_eq!(head_b.id, successor_b.id);
5936
5937 let stored = hirn_storage::get_mutation_envelope(store.as_ref(), &envelope.id)
5938 .await
5939 .unwrap()
5940 .unwrap();
5941 assert_eq!(stored.state, hirn_storage::MutationEnvelopeState::Applied);
5942 }
5943
5944 #[tokio::test(flavor = "multi_thread")]
5945 async fn open_reconciles_pending_semantic_purge_mutations() {
5946 let dir = tempfile::tempdir().unwrap();
5947 let path = dir.path().join("semantic-purge-envelope-recovery");
5948 let store = Arc::new(MemoryStore::new());
5949 let config = HirnConfig::builder()
5950 .db_path(&path)
5951 .working_memory_token_limit(1000)
5952 .build()
5953 .unwrap();
5954
5955 let db = HirnDB::open_with_config(config.clone(), store.clone())
5956 .await
5957 .unwrap();
5958 let id = db
5959 .store_semantic(
5960 SemanticRecord::builder()
5961 .concept("purge_recovery_target")
5962 .description("pending purge")
5963 .agent_id(agent())
5964 .build()
5965 .unwrap(),
5966 )
5967 .await
5968 .unwrap();
5969 let corrected = db
5970 .correct_semantic(
5971 id,
5972 SemanticUpdate {
5973 description: Some("pending purge v2".into()),
5974 ..SemanticUpdate::with_metadata(agent(), id)
5975 },
5976 )
5977 .await
5978 .unwrap();
5979 let current = db.read_semantic_record(corrected.id).await.unwrap();
5980 let history = db.semantic_history(corrected.id).await.unwrap();
5981 let revision_ids = history
5982 .iter()
5983 .map(|revision| revision.id)
5984 .collect::<Vec<_>>();
5985 let envelope =
5986 build_semantic_purge_envelope(current.logical_memory_id, revision_ids.clone()).unwrap();
5987 hirn_storage::append_mutation_envelope(store.as_ref(), &envelope)
5988 .await
5989 .unwrap();
5990
5991 assert!(!db.cached_graph().has_node(id).await.unwrap());
5992 assert!(db.cached_graph().has_node(corrected.id).await.unwrap());
5993 drop(db);
5994
5995 let reopened = HirnDB::open_with_config(config, store.clone())
5996 .await
5997 .unwrap();
5998
5999 for revision_id in &revision_ids {
6000 assert!(
6001 !reopened
6002 .cached_graph()
6003 .has_node(*revision_id)
6004 .await
6005 .unwrap()
6006 );
6007 }
6008 assert!(matches!(
6009 reopened
6010 .semantic_head_for_logical_id(current.logical_memory_id)
6011 .await,
6012 Err(HirnError::NotFound(_))
6013 ));
6014 let stored = hirn_storage::get_mutation_envelope(store.as_ref(), &envelope.id)
6015 .await
6016 .unwrap()
6017 .unwrap();
6018 assert_eq!(stored.state, hirn_storage::MutationEnvelopeState::Applied);
6019 }
6020
6021 #[tokio::test(flavor = "multi_thread")]
6022 async fn contradiction_connect_appends_revision_native_semantic_successors() {
6023 let db = temp_db().await;
6024 let id_a = db
6025 .store_semantic(
6026 SemanticRecord::builder()
6027 .concept("deploy_status_a")
6028 .description("deployment succeeded")
6029 .agent_id(agent())
6030 .build()
6031 .unwrap(),
6032 )
6033 .await
6034 .unwrap();
6035 let id_b = db
6036 .store_semantic(
6037 SemanticRecord::builder()
6038 .concept("deploy_status_b")
6039 .description("deployment failed")
6040 .agent_id(AgentId::new("other_agent").unwrap())
6041 .build()
6042 .unwrap(),
6043 )
6044 .await
6045 .unwrap();
6046
6047 db.connect_with(id_a, id_b, EdgeRelation::Contradicts, 0.9, Metadata::new())
6048 .await
6049 .unwrap();
6050
6051 let history_a = db.semantic_history(id_a).await.unwrap();
6052 let history_b = db.semantic_history(id_b).await.unwrap();
6053
6054 assert_eq!(history_a.len(), 2);
6055 assert_eq!(history_b.len(), 2);
6056 assert!(history_a[0].contradiction_ids.is_empty());
6057 assert!(history_b[0].contradiction_ids.is_empty());
6058
6059 let head_a = history_a.last().unwrap();
6060 let head_b = history_b.last().unwrap();
6061
6062 assert_eq!(head_a.revision_operation, RevisionOperation::Correct);
6063 assert_eq!(head_b.revision_operation, RevisionOperation::Correct);
6064 assert_eq!(head_a.contradiction_ids, vec![head_b.id]);
6065 assert_eq!(head_b.contradiction_ids, vec![head_a.id]);
6066 }
6067
6068 #[tokio::test(flavor = "multi_thread")]
6069 async fn contradiction_connect_canonicalizes_stale_semantic_revision_ids() {
6070 let db = temp_db().await;
6071 let original_a = db
6072 .store_semantic(
6073 SemanticRecord::builder()
6074 .concept("deploy_status_a")
6075 .description("deployment succeeded")
6076 .agent_id(agent())
6077 .build()
6078 .unwrap(),
6079 )
6080 .await
6081 .unwrap();
6082 let corrected_a = db
6083 .correct_semantic(
6084 original_a,
6085 SemanticUpdate {
6086 description: Some("deployment succeeded after rollback".into()),
6087 reason: Some("postmortem correction".into()),
6088 ..SemanticUpdate::with_metadata(agent(), original_a)
6089 },
6090 )
6091 .await
6092 .unwrap();
6093 let id_b = db
6094 .store_semantic(
6095 SemanticRecord::builder()
6096 .concept("deploy_status_b")
6097 .description("deployment failed")
6098 .agent_id(AgentId::new("other_agent").unwrap())
6099 .build()
6100 .unwrap(),
6101 )
6102 .await
6103 .unwrap();
6104
6105 db.connect_with(
6106 original_a,
6107 id_b,
6108 EdgeRelation::Contradicts,
6109 0.9,
6110 Metadata::new(),
6111 )
6112 .await
6113 .unwrap();
6114
6115 let history_a = db.semantic_history(original_a).await.unwrap();
6116 let history_b = db.semantic_history(id_b).await.unwrap();
6117
6118 assert_eq!(history_a.len(), 3);
6119 assert_eq!(history_b.len(), 2);
6120 assert!(history_a[0].contradiction_ids.is_empty());
6121 assert!(history_a[1].contradiction_ids.is_empty());
6122 assert!(history_b[0].contradiction_ids.is_empty());
6123
6124 let head_a = history_a.last().unwrap();
6125 let head_b = history_b.last().unwrap();
6126
6127 assert_eq!(history_a[1].id, corrected_a.id);
6128 assert_eq!(head_a.revision_operation, RevisionOperation::Correct);
6129 assert_eq!(head_b.revision_operation, RevisionOperation::Correct);
6130 assert_eq!(head_a.contradiction_ids, vec![head_b.id]);
6131 assert_eq!(head_b.contradiction_ids, vec![head_a.id]);
6132 }
6133
6134 #[test]
6135 fn semantic_filters_use_structured_exact_match() {
6136 let logical_memory_id = LogicalMemoryId::new();
6137 let revision_id = RevisionId::new();
6138
6139 assert_eq!(
6140 HirnDB::semantic_logical_exact_filter(logical_memory_id),
6141 hirn_storage::store::ExactMatchFilter::utf8_value(
6142 "logical_memory_id",
6143 logical_memory_id.to_string(),
6144 )
6145 );
6146 assert_eq!(
6147 HirnDB::semantic_revision_exact_filter(revision_id),
6148 hirn_storage::store::ExactMatchFilter::utf8_value(
6149 "revision_id",
6150 revision_id.to_string(),
6151 )
6152 );
6153 }
6154
6155 #[tokio::test(flavor = "multi_thread")]
6156 async fn semantic_edit_target_rejects_stale_initial_revision_ids() {
6157 let db = temp_db().await;
6158 let initial = db
6159 .store_semantic(
6160 SemanticRecord::builder()
6161 .concept("deploy_status")
6162 .description("deployment succeeded")
6163 .origin(Origin::CrossAgent)
6164 .agent_id(agent())
6165 .build()
6166 .unwrap(),
6167 )
6168 .await
6169 .unwrap();
6170
6171 let corrected = db
6172 .correct_semantic(
6173 initial,
6174 SemanticUpdate {
6175 description: Some("deployment succeeded after retry".into()),
6176 reason: Some("postmortem correction".into()),
6177 ..SemanticUpdate::with_metadata(agent(), initial)
6178 },
6179 )
6180 .await
6181 .unwrap();
6182
6183 let stale = db.semantic_edit_target(initial).await.unwrap_err();
6184 assert!(matches!(stale, HirnError::InvalidInput(_)));
6185
6186 let head = db.semantic_edit_target(corrected.id).await.unwrap();
6187 assert_eq!(head.id, corrected.id);
6188 assert_eq!(head.revision_operation, RevisionOperation::Correct);
6189 }
6190}