Skip to main content

hirn_engine/db/
semantic.rs

1use std::collections::{HashMap, HashSet};
2
3use futures::TryStreamExt;
4use hirn_core::revision::{
5    LogicalMemoryId, RecallSnapshot, RevisionId, RevisionOperation, RevisionRef, RevisionState,
6};
7
8use super::*;
9
10pub(super) const SEMANTIC_CREATE_MUTATION_KIND: &str = "semantic_create";
11pub(super) const SEMANTIC_SUCCESSOR_MUTATION_KIND: &str = "semantic_successor";
12pub(super) const SEMANTIC_MERGE_MUTATION_KIND: &str = "semantic_merge";
13pub(super) const SEMANTIC_CONTRADICTION_SYNC_MUTATION_KIND: &str = "semantic_contradiction_sync";
14pub(super) const SEMANTIC_PURGE_MUTATION_KIND: &str = "semantic_purge";
15pub(super) const SEMANTIC_RETRACT_MUTATION_KIND: &str = "semantic_retract";
16
17#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
18struct SemanticCreateEnvelope {
19    record_id: MemoryId,
20}
21
22fn encode_semantic_create_envelope(payload: &SemanticCreateEnvelope) -> HirnResult<Vec<u8>> {
23    serde_json::to_vec(payload)
24        .map_err(|error| HirnError::storage(format!("semantic create envelope serialize: {error}")))
25}
26
27fn decode_semantic_create_envelope(
28    envelope: &hirn_storage::MutationEnvelopeRecord,
29) -> HirnResult<SemanticCreateEnvelope> {
30    serde_json::from_slice(&envelope.payload).map_err(|error| {
31        HirnError::storage(format!("semantic create envelope deserialize: {error}"))
32    })
33}
34
35fn build_semantic_create_envelope(
36    record_id: MemoryId,
37) -> HirnResult<hirn_storage::MutationEnvelopeRecord> {
38    let payload = SemanticCreateEnvelope { record_id };
39    let payload = encode_semantic_create_envelope(&payload)?;
40
41    Ok(hirn_storage::MutationEnvelopeRecord::pending(
42        format!("semantic-create:{record_id}"),
43        SEMANTIC_CREATE_MUTATION_KIND,
44        payload,
45    ))
46}
47
48#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
49struct SemanticSuccessorEnvelope {
50    prior_record_id: MemoryId,
51    successor_id: MemoryId,
52}
53
54fn encode_semantic_successor_envelope(payload: &SemanticSuccessorEnvelope) -> HirnResult<Vec<u8>> {
55    serde_json::to_vec(payload).map_err(|error| {
56        HirnError::storage(format!("semantic successor envelope serialize: {error}"))
57    })
58}
59
60fn decode_semantic_successor_envelope(
61    envelope: &hirn_storage::MutationEnvelopeRecord,
62) -> HirnResult<SemanticSuccessorEnvelope> {
63    serde_json::from_slice(&envelope.payload).map_err(|error| {
64        HirnError::storage(format!("semantic successor envelope deserialize: {error}"))
65    })
66}
67
68fn build_semantic_successor_envelope(
69    prior_record_id: MemoryId,
70    successor_id: MemoryId,
71) -> HirnResult<hirn_storage::MutationEnvelopeRecord> {
72    let payload = SemanticSuccessorEnvelope {
73        prior_record_id,
74        successor_id,
75    };
76    let payload = encode_semantic_successor_envelope(&payload)?;
77
78    Ok(hirn_storage::MutationEnvelopeRecord::pending(
79        format!("semantic-successor:{successor_id}"),
80        SEMANTIC_SUCCESSOR_MUTATION_KIND,
81        payload,
82    ))
83}
84
85#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
86struct SemanticMergeEnvelope {
87    prior_target_id: MemoryId,
88    merged_target_id: MemoryId,
89    prior_source_ids: Vec<MemoryId>,
90    merged_source_ids: Vec<MemoryId>,
91}
92
93fn encode_semantic_merge_envelope(payload: &SemanticMergeEnvelope) -> HirnResult<Vec<u8>> {
94    serde_json::to_vec(payload)
95        .map_err(|error| HirnError::storage(format!("semantic merge envelope serialize: {error}")))
96}
97
98fn decode_semantic_merge_envelope(
99    envelope: &hirn_storage::MutationEnvelopeRecord,
100) -> HirnResult<SemanticMergeEnvelope> {
101    serde_json::from_slice(&envelope.payload).map_err(|error| {
102        HirnError::storage(format!("semantic merge envelope deserialize: {error}"))
103    })
104}
105
106fn build_semantic_merge_envelope(
107    prior_target_id: MemoryId,
108    merged_target_id: MemoryId,
109    prior_source_ids: Vec<MemoryId>,
110    merged_source_ids: Vec<MemoryId>,
111) -> HirnResult<hirn_storage::MutationEnvelopeRecord> {
112    let payload = SemanticMergeEnvelope {
113        prior_target_id,
114        merged_target_id,
115        prior_source_ids,
116        merged_source_ids,
117    };
118    let payload = encode_semantic_merge_envelope(&payload)?;
119
120    Ok(hirn_storage::MutationEnvelopeRecord::pending(
121        format!("semantic-merge:{merged_target_id}"),
122        SEMANTIC_MERGE_MUTATION_KIND,
123        payload,
124    ))
125}
126
127#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
128struct SemanticContradictionSyncEnvelope {
129    prior_record_ids: Vec<MemoryId>,
130    successor_ids: Vec<MemoryId>,
131}
132
133fn encode_semantic_contradiction_sync_envelope(
134    payload: &SemanticContradictionSyncEnvelope,
135) -> HirnResult<Vec<u8>> {
136    serde_json::to_vec(payload).map_err(|error| {
137        HirnError::storage(format!(
138            "semantic contradiction sync envelope serialize: {error}"
139        ))
140    })
141}
142
143fn decode_semantic_contradiction_sync_envelope(
144    envelope: &hirn_storage::MutationEnvelopeRecord,
145) -> HirnResult<SemanticContradictionSyncEnvelope> {
146    serde_json::from_slice(&envelope.payload).map_err(|error| {
147        HirnError::storage(format!(
148            "semantic contradiction sync envelope deserialize: {error}"
149        ))
150    })
151}
152
153fn build_semantic_contradiction_sync_envelope(
154    mut prior_record_ids: Vec<MemoryId>,
155    mut successor_ids: Vec<MemoryId>,
156) -> HirnResult<hirn_storage::MutationEnvelopeRecord> {
157    prior_record_ids.sort_unstable();
158    prior_record_ids.dedup();
159    successor_ids.sort_unstable();
160    successor_ids.dedup();
161
162    let envelope_suffix = successor_ids
163        .iter()
164        .map(ToString::to_string)
165        .collect::<Vec<_>>()
166        .join("+");
167
168    let payload = SemanticContradictionSyncEnvelope {
169        prior_record_ids,
170        successor_ids,
171    };
172    let payload = encode_semantic_contradiction_sync_envelope(&payload)?;
173
174    Ok(hirn_storage::MutationEnvelopeRecord::pending(
175        format!("semantic-contradiction-sync:{envelope_suffix}"),
176        SEMANTIC_CONTRADICTION_SYNC_MUTATION_KIND,
177        payload,
178    ))
179}
180
181#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
182struct SemanticPurgeEnvelope {
183    logical_memory_id: LogicalMemoryId,
184    revision_ids: Vec<MemoryId>,
185}
186
187fn encode_semantic_purge_envelope(payload: &SemanticPurgeEnvelope) -> HirnResult<Vec<u8>> {
188    serde_json::to_vec(payload)
189        .map_err(|error| HirnError::storage(format!("semantic purge envelope serialize: {error}")))
190}
191
192fn decode_semantic_purge_envelope(
193    envelope: &hirn_storage::MutationEnvelopeRecord,
194) -> HirnResult<SemanticPurgeEnvelope> {
195    serde_json::from_slice(&envelope.payload).map_err(|error| {
196        HirnError::storage(format!("semantic purge envelope deserialize: {error}"))
197    })
198}
199
200fn build_semantic_purge_envelope(
201    logical_memory_id: LogicalMemoryId,
202    mut revision_ids: Vec<MemoryId>,
203) -> HirnResult<hirn_storage::MutationEnvelopeRecord> {
204    revision_ids.sort_unstable();
205    revision_ids.dedup();
206
207    let payload = SemanticPurgeEnvelope {
208        logical_memory_id,
209        revision_ids,
210    };
211    let payload = encode_semantic_purge_envelope(&payload)?;
212
213    Ok(hirn_storage::MutationEnvelopeRecord::pending(
214        format!("semantic-purge:{logical_memory_id}"),
215        SEMANTIC_PURGE_MUTATION_KIND,
216        payload,
217    ))
218}
219
220#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
221struct SemanticRetractEnvelope {
222    prior_record_id: MemoryId,
223    tombstone_id: MemoryId,
224}
225
226fn encode_semantic_retract_envelope(payload: &SemanticRetractEnvelope) -> HirnResult<Vec<u8>> {
227    serde_json::to_vec(payload).map_err(|error| {
228        HirnError::storage(format!("semantic retract envelope serialize: {error}"))
229    })
230}
231
232fn decode_semantic_retract_envelope(
233    envelope: &hirn_storage::MutationEnvelopeRecord,
234) -> HirnResult<SemanticRetractEnvelope> {
235    serde_json::from_slice(&envelope.payload).map_err(|error| {
236        HirnError::storage(format!("semantic retract envelope deserialize: {error}"))
237    })
238}
239
240fn build_semantic_retract_envelope(
241    prior_record_id: MemoryId,
242    tombstone_id: MemoryId,
243) -> HirnResult<hirn_storage::MutationEnvelopeRecord> {
244    let payload = SemanticRetractEnvelope {
245        prior_record_id,
246        tombstone_id,
247    };
248    let payload = encode_semantic_retract_envelope(&payload)?;
249
250    Ok(hirn_storage::MutationEnvelopeRecord::pending(
251        format!("semantic-retract:{tombstone_id}"),
252        SEMANTIC_RETRACT_MUTATION_KIND,
253        payload,
254    ))
255}
256
257fn semantic_revision_is_newer(candidate: &SemanticRecord, current: &SemanticRecord) -> bool {
258    candidate.version > current.version
259        || (candidate.version == current.version
260            && (candidate.created_at > current.created_at
261                || (candidate.created_at == current.created_at
262                    && candidate.revision_id > current.revision_id)))
263}
264
265fn upsert_semantic_head(
266    heads: &mut HashMap<LogicalMemoryId, SemanticRecord>,
267    record: SemanticRecord,
268) {
269    heads
270        .entry(record.logical_memory_id)
271        .and_modify(|current| {
272            if semantic_revision_is_newer(&record, current) {
273                *current = record.clone();
274            }
275        })
276        .or_insert(record);
277}
278
279fn collapse_semantic_heads(
280    records: impl IntoIterator<Item = SemanticRecord>,
281) -> HashMap<LogicalMemoryId, SemanticRecord> {
282    let mut heads = HashMap::new();
283
284    for record in records {
285        upsert_semantic_head(&mut heads, record);
286    }
287
288    heads
289}
290
291fn semantic_record_is_live(record: &SemanticRecord) -> bool {
292    record.is_live()
293}
294
295fn storage_precision_timestamp(ts: Timestamp) -> Timestamp {
296    Timestamp::from_millis(ts.millis())
297}
298
299fn normalize_semantic_record_timestamps(record: &mut SemanticRecord) {
300    record.last_accessed = storage_precision_timestamp(record.last_accessed);
301    record.created_at = storage_precision_timestamp(record.created_at);
302    record.updated_at = storage_precision_timestamp(record.updated_at);
303    record.valid_from = storage_precision_timestamp(record.valid_from);
304    record.valid_until = record.valid_until.map(storage_precision_timestamp);
305}
306
307#[derive(Clone)]
308struct ContradictionSuccessorLink {
309    target_id: MemoryId,
310    weight: f32,
311    metadata: Metadata,
312    skip_graph_edge: bool,
313}
314
315#[derive(Clone)]
316enum ContradictionEndpoint {
317    Semantic(Box<SemanticRecord>),
318    Other(MemoryId),
319}
320
321impl ContradictionEndpoint {
322    fn id(&self) -> MemoryId {
323        match self {
324            Self::Semantic(record) => record.id,
325            Self::Other(id) => *id,
326        }
327    }
328
329    fn as_semantic(&self) -> Option<&SemanticRecord> {
330        match self {
331            Self::Semantic(record) => Some(record.as_ref()),
332            Self::Other(_) => None,
333        }
334    }
335}
336
337struct PreparedSemanticContradictionSuccessor {
338    current: SemanticRecord,
339    next: SemanticRecord,
340}
341
342struct ContradictionSyncResult {
343    source_memory: MemoryId,
344    target_memory: MemoryId,
345    contradiction_edge: Option<crate::graph::EdgeId>,
346}
347
348fn format_memory_id_list(ids: &[MemoryId]) -> String {
349    ids.iter()
350        .map(ToString::to_string)
351        .collect::<Vec<_>>()
352        .join(",")
353}
354
355fn merged_confidence_and_evidence(records: &[&SemanticRecord]) -> (f32, u32) {
356    let total_weight: u64 = records
357        .iter()
358        .map(|record| u64::from(record.evidence_count.max(1)))
359        .sum();
360    let weighted_confidence = if total_weight == 0 {
361        0.5
362    } else {
363        let weighted_sum: f64 = records
364            .iter()
365            .map(|record| f64::from(record.confidence) * f64::from(record.evidence_count.max(1)))
366            .sum();
367        (weighted_sum / total_weight as f64) as f32
368    };
369    let total_evidence = records.iter().fold(0u32, |acc, record| {
370        acc.saturating_add(record.evidence_count.max(1))
371    });
372    (weighted_confidence.clamp(0.0, 1.0), total_evidence)
373}
374
375fn semantic_snapshot_head_as_of(
376    history: &[SemanticRecord],
377    cutoff: Timestamp,
378) -> Option<SemanticRecord> {
379    history
380        .iter()
381        .filter(|record| record.valid_from <= cutoff)
382        .max_by(|left, right| {
383            left.version
384                .cmp(&right.version)
385                .then_with(|| left.created_at.cmp(&right.created_at))
386                .then_with(|| left.revision_id.cmp(&right.revision_id))
387        })
388        .cloned()
389}
390
391fn semantic_snapshot_head_recorded_at_snapshot(
392    history: &[SemanticRecord],
393    snapshot: ResolvedRecallSnapshot,
394) -> Option<SemanticRecord> {
395    history
396        .iter()
397        .filter(|record| {
398            snapshot.contains_recorded_revision_for_chain(
399                record.logical_memory_id,
400                record.version,
401                record.created_at,
402                record.revision_id,
403            )
404        })
405        .max_by(|left, right| {
406            left.created_at
407                .cmp(&right.created_at)
408                .then_with(|| left.version.cmp(&right.version))
409                .then_with(|| left.revision_id.cmp(&right.revision_id))
410        })
411        .cloned()
412}
413
414fn memory_record_recorded_at(record: &MemoryRecord) -> Timestamp {
415    match record {
416        MemoryRecord::Episodic(record) => record.created_at,
417        MemoryRecord::Semantic(record) => record.created_at,
418        MemoryRecord::Working(record) => record.created_at,
419        MemoryRecord::Procedural(record) => record.created_at,
420    }
421}
422
423fn memory_record_revision_chain(record: &MemoryRecord) -> (LogicalMemoryId, u32) {
424    match record {
425        MemoryRecord::Episodic(record) => (record.logical_memory_id, record.version),
426        MemoryRecord::Semantic(record) => (record.logical_memory_id, record.version),
427        MemoryRecord::Working(record) => (record.logical_memory_id, record.version),
428        MemoryRecord::Procedural(record) => (record.logical_memory_id, record.version),
429    }
430}
431
432fn collect_semantic_logical_ids(results: &[RecallResult]) -> Vec<LogicalMemoryId> {
433    let mut logical_memory_ids = Vec::new();
434    let mut seen = HashSet::new();
435
436    for result in results {
437        let MemoryRecord::Semantic(record) = &result.record else {
438            continue;
439        };
440
441        if seen.insert(record.logical_memory_id) {
442            logical_memory_ids.push(record.logical_memory_id);
443        }
444    }
445
446    logical_memory_ids
447}
448
449fn collect_episodic_logical_ids(results: &[RecallResult]) -> Vec<LogicalMemoryId> {
450    let mut logical_memory_ids = Vec::new();
451    let mut seen = HashSet::new();
452
453    for result in results {
454        let MemoryRecord::Episodic(record) = &result.record else {
455            continue;
456        };
457
458        if seen.insert(record.logical_memory_id) {
459            logical_memory_ids.push(record.logical_memory_id);
460        }
461    }
462
463    logical_memory_ids
464}
465
466#[derive(Clone, Copy)]
467pub(super) enum ResolvedRecallSnapshot {
468    Observed(Timestamp),
469    Recorded(Timestamp),
470    Revision {
471        cutoff: Timestamp,
472        revision_id: RevisionId,
473        logical_memory_id: LogicalMemoryId,
474        version: u32,
475    },
476}
477
478impl ResolvedRecallSnapshot {
479    pub(super) fn contains_recorded_revision(
480        self,
481        created_at: Timestamp,
482        revision_id: RevisionId,
483    ) -> bool {
484        match self {
485            Self::Observed(_) => false,
486            Self::Recorded(cutoff) => created_at <= cutoff,
487            Self::Revision {
488                cutoff,
489                revision_id: boundary_revision_id,
490                ..
491            } => {
492                created_at < cutoff || (created_at == cutoff && revision_id <= boundary_revision_id)
493            }
494        }
495    }
496
497    pub(super) fn contains_recorded_revision_for_chain(
498        self,
499        logical_memory_id: LogicalMemoryId,
500        version: u32,
501        created_at: Timestamp,
502        revision_id: RevisionId,
503    ) -> bool {
504        match self {
505            Self::Revision {
506                cutoff,
507                revision_id: boundary_revision_id,
508                logical_memory_id: boundary_logical_memory_id,
509                version: boundary_version,
510            } if created_at == cutoff && logical_memory_id == boundary_logical_memory_id => {
511                version < boundary_version
512                    || (version == boundary_version && revision_id <= boundary_revision_id)
513            }
514            _ => self.contains_recorded_revision(created_at, revision_id),
515        }
516    }
517}
518
519impl HirnDB {
520    // ── Semantic Memory ─────────────────────────────────────────────────
521
522    /// Store a semantic record. Enforces concept name uniqueness within namespace.
523    ///
524    /// Also adds a node in the property graph and detects auto-edges.
525    pub(crate) async fn store_semantic(&self, mut record: SemanticRecord) -> HirnResult<MemoryId> {
526        // ── Cedar policy enforcement ──
527        self.enforce(
528            record.provenance.created_by.as_str(),
529            crate::policy::Action::Remember,
530            &self.config.default_realm,
531            record.namespace.as_str(),
532        )
533        .await?;
534
535        // ── Text retention ──
536        match self.config.text_retention {
537            hirn_core::TextRetention::Full => {}
538            hirn_core::TextRetention::SummaryOnly | hirn_core::TextRetention::None => {
539                record.description = String::new();
540            }
541        }
542
543        normalize_semantic_record_timestamps(&mut record);
544
545        let id = record.id;
546        let content_preview = record.concept.chars().take(120).collect::<String>();
547        let embedding = record.embedding.clone();
548        let confidence = record.confidence;
549        let created_at = record.created_at;
550        let namespace = record.namespace.clone();
551
552        // Check concept uniqueness within namespace + agent.
553        {
554            let escaped_ns = record.namespace.as_str().replace('\'', "''");
555            let escaped_concept = record.concept.replace('\'', "''");
556            let agent_id = record.provenance.created_by.as_str();
557            let mut batches = self
558                .storage_runtime
559                .scan_stream(
560                    hirn_storage::datasets::semantic::DATASET_NAME,
561                    hirn_storage::store::ScanOptions {
562                        filter: Some(format!(
563                            "namespace = '{}' AND concept = '{}'",
564                            escaped_ns, escaped_concept
565                        )),
566                        ..Default::default()
567                    },
568                )
569                .await
570                .map_err(HirnError::storage)?;
571            let mut heads = HashMap::new();
572            while let Some(batch) = batches.try_next().await.map_err(HirnError::storage)? {
573                let recs = hirn_storage::datasets::semantic::from_batch(&batch)
574                    .map_err(HirnError::storage)?;
575                for rec in recs {
576                    upsert_semantic_head(&mut heads, rec);
577                }
578            }
579            if heads.into_values().any(|r| {
580                r.provenance.created_by.as_str() == agent_id && semantic_record_is_live(&r)
581            }) {
582                return Err(HirnError::AlreadyExists(format!(
583                    "concept '{}' already exists in namespace '{}' for agent '{}'",
584                    record.concept, record.namespace, record.provenance.created_by
585                )));
586            }
587        }
588
589        // Validate embedding dimensions if present.
590        if let Some(ref emb) = embedding {
591            if emb.len() != self.config.embedding_dimensions.as_usize() {
592                return Err(HirnError::InvalidInput(format!(
593                    "embedding dimension mismatch: expected {}, got {}",
594                    self.config.embedding_dimensions.as_usize(),
595                    emb.len()
596                )));
597            }
598        }
599
600        let envelope = build_semantic_create_envelope(id)?;
601        hirn_storage::append_mutation_envelope(self.storage_backend(), &envelope)
602            .await
603            .map_err(HirnError::storage)?;
604
605        // Add graph node and detect entity edges.
606        if let Err(error) = self
607            .cached_graph()
608            .add_node(
609                id,
610                Layer::Semantic,
611                confidence,
612                created_at,
613                namespace.clone(),
614            )
615            .await
616        {
617            self.finalize_semantic_create_failure(
618                &envelope,
619                id,
620                error.to_string(),
621                true,
622                "graph add_node",
623            )
624            .await;
625            return Err(error);
626        }
627
628        // Auto-detect similarity edges.
629        if let Some(ref emb) = embedding {
630            let candidates = self.find_similarity_candidates(emb).await;
631            if let Err(error) = self.apply_similarity_edges(id, &candidates).await {
632                let cleanup_applied = match self.remove_semantic_graph_nodes_if_present(&[id]).await
633                {
634                    Ok(()) => true,
635                    Err(cleanup_error) => {
636                        tracing::warn!(
637                            id = %id,
638                            envelope_id = %envelope.id,
639                            error = %cleanup_error,
640                            "semantic create graph cleanup incomplete after similarity edge error"
641                        );
642                        false
643                    }
644                };
645                self.finalize_semantic_create_failure(
646                    &envelope,
647                    id,
648                    error.to_string(),
649                    cleanup_applied,
650                    "similarity edge",
651                )
652                .await;
653                return Err(error);
654            }
655        }
656
657        // LanceDB write.
658        let dims = self.config.embedding_dimensions.as_usize();
659        let batch = match hirn_storage::datasets::semantic::to_batch(
660            std::slice::from_ref(&record),
661            dims,
662        ) {
663            Ok(batch) => batch,
664            Err(error) => {
665                let storage_error = HirnError::storage(error);
666                let cleanup_applied = match self.remove_semantic_graph_nodes_if_present(&[id]).await
667                {
668                    Ok(()) => true,
669                    Err(cleanup_error) => {
670                        tracing::warn!(
671                            id = %id,
672                            envelope_id = %envelope.id,
673                            error = %cleanup_error,
674                            "semantic create graph cleanup incomplete after semantic to_batch error"
675                        );
676                        false
677                    }
678                };
679                self.finalize_semantic_create_failure(
680                    &envelope,
681                    id,
682                    storage_error.to_string(),
683                    cleanup_applied,
684                    "semantic to_batch",
685                )
686                .await;
687                return Err(storage_error);
688            }
689        };
690        if let Err(e) = self
691            .storage_runtime
692            .append(hirn_storage::datasets::semantic::DATASET_NAME, batch)
693            .await
694        {
695            let error = HirnError::storage(e);
696            let cleanup_applied = match self.remove_semantic_graph_nodes_if_present(&[id]).await {
697                Ok(()) => true,
698                Err(cleanup_error) => {
699                    tracing::warn!(
700                        id = %id,
701                        envelope_id = %envelope.id,
702                        error = %cleanup_error,
703                        "semantic create graph cleanup incomplete after semantic append error"
704                    );
705                    false
706                }
707            };
708            self.finalize_semantic_create_failure(
709                &envelope,
710                id,
711                error.to_string(),
712                cleanup_applied,
713                "semantic append",
714            )
715            .await;
716            return Err(error);
717        }
718
719        self.cache_semantic_head(&record);
720
721        self.emit_scoped(
722            record.namespace.as_str(),
723            record.provenance.created_by.as_str(),
724            MemoryEvent::SemanticCreated {
725                id,
726                concept_name: content_preview,
727            },
728        )
729        .await;
730        if let Err(error) = hirn_storage::update_mutation_envelope_state(
731            self.storage_backend(),
732            &envelope.id,
733            hirn_storage::MutationEnvelopeState::Applied,
734            None,
735        )
736        .await
737        {
738            tracing::warn!(
739                id = %id,
740                envelope_id = %envelope.id,
741                error = %error,
742                "semantic create mutation envelope finalize failed; recovery will retry"
743            );
744        }
745        Ok(id)
746    }
747
748    /// Store multiple semantic records in a single batch. Returns per-record results.
749    ///
750    /// All records must belong to the same agent (Cedar authorization is checked
751    /// once per unique namespace, not per record). Concept uniqueness is checked
752    /// via a single scan for all records rather than one scan per record.
753    /// LanceDB append is batched for throughput.
754    pub(crate) async fn batch_store_semantic(
755        &self,
756        records: Vec<SemanticRecord>,
757    ) -> Vec<HirnResult<MemoryId>> {
758        if records.is_empty() {
759            return Vec::new();
760        }
761
762        let n = records.len();
763
764        // ── 1. Validate all records share the same agent_id ─────────────
765        let agent_id = records[0].provenance.created_by.clone();
766        for rec in records.iter().skip(1) {
767            if rec.provenance.created_by != agent_id {
768                return (0..n)
769                    .map(|_| {
770                        Err(HirnError::InvalidInput(
771                            "batch_store_semantic: all records must have the same agent_id".into(),
772                        ))
773                    })
774                    .collect();
775            }
776        }
777
778        // ── 2. Cedar enforce once per unique namespace ──────────────────
779        {
780            let mut checked_namespaces = HashSet::new();
781            for rec in &records {
782                if checked_namespaces.insert(rec.namespace.clone()) {
783                    if let Err(e) = self
784                        .enforce(
785                            agent_id.as_str(),
786                            crate::policy::Action::Remember,
787                            &self.config.default_realm,
788                            rec.namespace.as_str(),
789                        )
790                        .await
791                    {
792                        let msg = format!("{e}");
793                        return (0..n)
794                            .map(|_| Err(HirnError::AccessDenied(msg.clone())))
795                            .collect();
796                    }
797                }
798            }
799        }
800
801        // Per-record result slots.
802        let mut results: Vec<Option<HirnResult<MemoryId>>> = (0..n).map(|_| None).collect();
803
804        // ── 3. Text retention ───────────────────────────────────────────
805        let mut records: Vec<(usize, SemanticRecord)> = records
806            .into_iter()
807            .enumerate()
808            .map(|(idx, mut rec)| {
809                match self.config.text_retention {
810                    hirn_core::TextRetention::Full => {}
811                    hirn_core::TextRetention::SummaryOnly | hirn_core::TextRetention::None => {
812                        rec.description = String::new();
813                    }
814                }
815                normalize_semantic_record_timestamps(&mut rec);
816                (idx, rec)
817            })
818            .collect();
819
820        // ── 4. Batch uniqueness check (single scan) ────────────────────
821        // Build one filter that covers all (namespace, concept) pairs and check
822        // against the returned records instead of issuing N individual scans.
823        {
824            let exists = self
825                .storage_runtime
826                .exists(hirn_storage::datasets::semantic::DATASET_NAME)
827                .await
828                .unwrap_or(false);
829            if exists {
830                // Collect all unique (namespace, concept) pairs we need to check.
831                let mut pairs: Vec<(String, String)> = Vec::new();
832                for (_, rec) in &records {
833                    pairs.push((rec.namespace.as_str().to_owned(), rec.concept.clone()));
834                }
835
836                // Build a single OR filter for all namespace+concept pairs.
837                let clauses: Vec<String> = pairs
838                    .iter()
839                    .map(|(ns, concept)| {
840                        let escaped_ns = ns.replace('\'', "''");
841                        let escaped_concept = concept.replace('\'', "''");
842                        format!(
843                            "(namespace = '{}' AND concept = '{}')",
844                            escaped_ns, escaped_concept
845                        )
846                    })
847                    .collect();
848                let filter = clauses.join(" OR ");
849
850                let opts = hirn_storage::store::ScanOptions {
851                    filter: Some(filter),
852                    ..Default::default()
853                };
854                let mut batches = self
855                    .storage_runtime
856                    .scan_stream(hirn_storage::datasets::semantic::DATASET_NAME, opts)
857                    .await
858                    .ok();
859
860                // Build a set of existing (namespace, concept, agent) triples.
861                let mut existing: HashSet<(String, String)> = HashSet::new();
862                if let Some(batches) = batches.as_mut() {
863                    let mut heads = HashMap::new();
864                    while let Ok(Some(batch)) = batches.try_next().await {
865                        if let Ok(recs) = hirn_storage::datasets::semantic::from_batch(&batch) {
866                            for rec in recs {
867                                upsert_semantic_head(&mut heads, rec);
868                            }
869                        }
870                    }
871                    for r in heads.into_values() {
872                        if r.provenance.created_by.as_str() == agent_id.as_str()
873                            && semantic_record_is_live(&r)
874                        {
875                            existing.insert((r.namespace.to_string(), r.concept.clone()));
876                        }
877                    }
878                }
879
880                // Also track concepts within the batch itself (intra-batch dedup).
881                let mut batch_seen: HashSet<(String, String)> = HashSet::new();
882                records.retain(|(idx, rec)| {
883                    let key = (rec.namespace.to_string(), rec.concept.clone());
884                    if existing.contains(&key) {
885                        results[*idx] = Some(Err(HirnError::AlreadyExists(format!(
886                            "concept '{}' already exists in namespace '{}' for agent '{}'",
887                            rec.concept, rec.namespace, agent_id
888                        ))));
889                        false
890                    } else if !batch_seen.insert(key) {
891                        results[*idx] = Some(Err(HirnError::AlreadyExists(format!(
892                            "duplicate concept '{}' in batch for namespace '{}'",
893                            rec.concept, rec.namespace
894                        ))));
895                        false
896                    } else {
897                        true
898                    }
899                });
900            }
901        }
902
903        if records.is_empty() {
904            return results
905                .into_iter()
906                .map(|r| r.unwrap_or_else(|| Err(HirnError::InvalidInput("unreachable".into()))))
907                .collect();
908        }
909
910        // ── 5. Embedding validation ─────────────────────────────────────
911        records.retain(|(idx, rec)| {
912            if let Some(ref emb) = rec.embedding {
913                if emb.len() != self.config.embedding_dimensions.as_usize() {
914                    results[*idx] = Some(Err(HirnError::InvalidInput(format!(
915                        "embedding dimension mismatch: expected {}, got {}",
916                        self.config.embedding_dimensions.as_usize(),
917                        emb.len()
918                    ))));
919                    return false;
920                }
921            }
922            true
923        });
924
925        if records.is_empty() {
926            return results
927                .into_iter()
928                .map(|r| r.unwrap_or_else(|| Err(HirnError::InvalidInput("unreachable".into()))))
929                .collect();
930        }
931
932        // ── 6. Per-record graph + similarity edges ──────────────────────
933        struct PreparedRecord {
934            idx: usize,
935            record: SemanticRecord,
936            content_preview: String,
937            create_envelope: hirn_storage::MutationEnvelopeRecord,
938        }
939        let mut prepared: Vec<PreparedRecord> = Vec::with_capacity(records.len());
940
941        for (idx, rec) in records {
942            let id = rec.id;
943            let content_preview = rec.concept.chars().take(120).collect::<String>();
944            let create_envelope = match build_semantic_create_envelope(id) {
945                Ok(envelope) => envelope,
946                Err(error) => {
947                    results[idx] = Some(Err(error));
948                    continue;
949                }
950            };
951
952            prepared.push(PreparedRecord {
953                idx,
954                record: rec,
955                content_preview,
956                create_envelope,
957            });
958        }
959
960        if prepared.is_empty() {
961            return results
962                .into_iter()
963                .map(|r| r.unwrap_or_else(|| Err(HirnError::InvalidInput("unreachable".into()))))
964                .collect();
965        }
966
967        if let Err(error) = hirn_storage::append_mutation_envelopes(
968            self.storage_backend(),
969            &prepared
970                .iter()
971                .map(|record| record.create_envelope.clone())
972                .collect::<Vec<_>>(),
973        )
974        .await
975        {
976            let message = error.to_string();
977            for record in &prepared {
978                results[record.idx] = Some(Err(HirnError::storage(message.clone())));
979            }
980            return results
981                .into_iter()
982                .map(|r| {
983                    r.unwrap_or_else(|| {
984                        Err(HirnError::storage("semantic create envelope append failed"))
985                    })
986                })
987                .collect();
988        }
989
990        let mut graph_prepared = Vec::with_capacity(prepared.len());
991        for prepared_record in prepared {
992            let id = prepared_record.record.id;
993
994            // Graph node.
995            if let Err(error) = self
996                .cached_graph()
997                .add_node(
998                    id,
999                    Layer::Semantic,
1000                    prepared_record.record.confidence,
1001                    prepared_record.record.created_at,
1002                    prepared_record.record.namespace,
1003                )
1004                .await
1005            {
1006                self.finalize_semantic_create_failure(
1007                    &prepared_record.create_envelope,
1008                    id,
1009                    error.to_string(),
1010                    true,
1011                    "graph add_node",
1012                )
1013                .await;
1014                results[prepared_record.idx] = Some(Err(error));
1015                continue;
1016            }
1017
1018            // Auto-detect similarity edges.
1019            if let Some(ref emb) = prepared_record.record.embedding {
1020                let candidates = self.find_similarity_candidates(emb).await;
1021                if let Err(error) = self.apply_similarity_edges(id, &candidates).await {
1022                    let cleanup_applied = match self
1023                        .remove_semantic_graph_nodes_if_present(&[id])
1024                        .await
1025                    {
1026                        Ok(()) => true,
1027                        Err(cleanup_error) => {
1028                            tracing::warn!(
1029                                id = %id,
1030                                envelope_id = %prepared_record.create_envelope.id,
1031                                error = %cleanup_error,
1032                                "semantic create graph cleanup incomplete after similarity edge error"
1033                            );
1034                            false
1035                        }
1036                    };
1037                    self.finalize_semantic_create_failure(
1038                        &prepared_record.create_envelope,
1039                        id,
1040                        error.to_string(),
1041                        cleanup_applied,
1042                        "similarity edge",
1043                    )
1044                    .await;
1045                    results[prepared_record.idx] = Some(Err(error));
1046                    continue;
1047                }
1048            }
1049
1050            graph_prepared.push(prepared_record);
1051        }
1052
1053        let prepared = graph_prepared;
1054        if prepared.is_empty() {
1055            return results
1056                .into_iter()
1057                .map(|r| r.unwrap_or_else(|| Err(HirnError::InvalidInput("unreachable".into()))))
1058                .collect();
1059        }
1060
1061        // ── 7. Single LanceDB append ────────────────────────────────────
1062        if !prepared.is_empty() {
1063            let lance_records = prepared
1064                .iter()
1065                .map(|record| record.record.clone())
1066                .collect::<Vec<_>>();
1067            let dims = self.config.embedding_dimensions.as_usize();
1068            match hirn_storage::datasets::semantic::to_batch(&lance_records, dims) {
1069                Ok(batch) => {
1070                    if let Err(e) = self
1071                        .storage_runtime
1072                        .append(hirn_storage::datasets::semantic::DATASET_NAME, batch)
1073                        .await
1074                    {
1075                        tracing::error!(
1076                            count = lance_records.len(),
1077                            error = %e,
1078                            "batch_store_semantic: LanceDB batch append failed"
1079                        );
1080                        let msg = format!("{e}");
1081                        for p in &prepared {
1082                            let cleanup_applied = match self
1083                                .remove_semantic_graph_nodes_if_present(&[p.record.id])
1084                                .await
1085                            {
1086                                Ok(()) => true,
1087                                Err(cleanup_error) => {
1088                                    tracing::warn!(
1089                                        id = %p.record.id,
1090                                        envelope_id = %p.create_envelope.id,
1091                                        error = %cleanup_error,
1092                                        "semantic create graph cleanup incomplete after semantic append error"
1093                                    );
1094                                    false
1095                                }
1096                            };
1097                            self.finalize_semantic_create_failure(
1098                                &p.create_envelope,
1099                                p.record.id,
1100                                msg.clone(),
1101                                cleanup_applied,
1102                                "semantic append",
1103                            )
1104                            .await;
1105                            results[p.idx] = Some(Err(HirnError::StorageError(msg.clone().into())));
1106                        }
1107                        return results
1108                            .into_iter()
1109                            .map(|r| {
1110                                r.unwrap_or_else(|| {
1111                                    Err(HirnError::storage("LanceDB append failed"))
1112                                })
1113                            })
1114                            .collect();
1115                    }
1116
1117                    for record in &lance_records {
1118                        self.cache_semantic_head(record);
1119                    }
1120                }
1121                Err(e) => {
1122                    tracing::error!(
1123                        count = lance_records.len(),
1124                        error = %e,
1125                        "batch_store_semantic: LanceDB to_batch failed"
1126                    );
1127                    let msg = format!("{e}");
1128                    for p in &prepared {
1129                        let cleanup_applied = match self
1130                            .remove_semantic_graph_nodes_if_present(&[p.record.id])
1131                            .await
1132                        {
1133                            Ok(()) => true,
1134                            Err(cleanup_error) => {
1135                                tracing::warn!(
1136                                    id = %p.record.id,
1137                                    envelope_id = %p.create_envelope.id,
1138                                    error = %cleanup_error,
1139                                    "semantic create graph cleanup incomplete after semantic to_batch error"
1140                                );
1141                                false
1142                            }
1143                        };
1144                        self.finalize_semantic_create_failure(
1145                            &p.create_envelope,
1146                            p.record.id,
1147                            msg.clone(),
1148                            cleanup_applied,
1149                            "semantic to_batch",
1150                        )
1151                        .await;
1152                        results[p.idx] = Some(Err(HirnError::StorageError(msg.clone().into())));
1153                    }
1154                    return results
1155                        .into_iter()
1156                        .map(|r| {
1157                            r.unwrap_or_else(|| Err(HirnError::storage("LanceDB to_batch failed")))
1158                        })
1159                        .collect();
1160                }
1161            }
1162        }
1163
1164        // ── 8. Events ───────────────────────────────────────────────────
1165        for p in &prepared {
1166            results[p.idx] = Some(Ok(p.record.id));
1167            self.emit_scoped(
1168                p.record.namespace.as_str(),
1169                p.record.provenance.created_by.as_str(),
1170                MemoryEvent::SemanticCreated {
1171                    id: p.record.id,
1172                    concept_name: p.content_preview.clone(),
1173                },
1174            )
1175            .await;
1176            if let Err(error) = hirn_storage::update_mutation_envelope_state(
1177                self.storage_backend(),
1178                &p.create_envelope.id,
1179                hirn_storage::MutationEnvelopeState::Applied,
1180                None,
1181            )
1182            .await
1183            {
1184                tracing::warn!(
1185                    id = %p.record.id,
1186                    envelope_id = %p.create_envelope.id,
1187                    error = %error,
1188                    "semantic create mutation envelope finalize failed; recovery will retry"
1189                );
1190            }
1191        }
1192
1193        results
1194            .into_iter()
1195            .map(|r| r.unwrap_or_else(|| Err(HirnError::InvalidInput("unreachable".into()))))
1196            .collect()
1197    }
1198
1199    /// Read a single semantic record from LanceDB by ID.
1200    pub(crate) async fn read_semantic_record(&self, id: MemoryId) -> HirnResult<SemanticRecord> {
1201        let mut batches = self
1202            .storage_runtime
1203            .scan_stream(
1204                hirn_storage::datasets::semantic::DATASET_NAME,
1205                hirn_storage::store::ScanOptions {
1206                    exact_filter: Some(hirn_storage::store::ExactMatchFilter::utf8_value(
1207                        "id",
1208                        id.to_string(),
1209                    )),
1210                    limit: Some(1),
1211                    ..Default::default()
1212                },
1213            )
1214            .await
1215            .map_err(HirnError::storage)?;
1216
1217        while let Some(batch) = batches.try_next().await.map_err(HirnError::storage)? {
1218            let recs =
1219                hirn_storage::datasets::semantic::from_batch(&batch).map_err(HirnError::storage)?;
1220            if let Some(r) = recs.into_iter().next() {
1221                return Ok(r);
1222            }
1223        }
1224        Err(HirnError::NotFound(format!("semantic record {id}")))
1225    }
1226
1227    /// Overwrite an existing semantic record in place.
1228    async fn overwrite_semantic_record(&self, record: &SemanticRecord) -> HirnResult<()> {
1229        let dims = self.config.embedding_dimensions.as_usize();
1230        let exact_filter =
1231            hirn_storage::store::ExactMatchFilter::utf8_value("id", record.id.to_string());
1232        self.storage_runtime
1233            .delete_exact(
1234                hirn_storage::datasets::semantic::DATASET_NAME,
1235                &exact_filter,
1236            )
1237            .await
1238            .map_err(|e| HirnError::storage(e))?;
1239        let batch = hirn_storage::datasets::semantic::to_batch(std::slice::from_ref(record), dims)
1240            .map_err(|e| HirnError::storage(e))?;
1241        self.storage_runtime
1242            .append(hirn_storage::datasets::semantic::DATASET_NAME, batch)
1243            .await
1244            .map_err(|e| HirnError::storage(e))?;
1245        self.evict_semantic_head(record.logical_memory_id);
1246        Ok(())
1247    }
1248
1249    async fn append_semantic_record(&self, record: &SemanticRecord) -> HirnResult<()> {
1250        let dims = self.config.embedding_dimensions.as_usize();
1251        let batch = hirn_storage::datasets::semantic::to_batch(std::slice::from_ref(record), dims)
1252            .map_err(HirnError::storage)?;
1253        self.storage_runtime
1254            .append(hirn_storage::datasets::semantic::DATASET_NAME, batch)
1255            .await
1256            .map_err(HirnError::storage)?;
1257        Ok(())
1258    }
1259
1260    async fn append_semantic_records(&self, records: &[SemanticRecord]) -> HirnResult<()> {
1261        if records.is_empty() {
1262            return Ok(());
1263        }
1264
1265        let dims = self.config.embedding_dimensions.as_usize();
1266        let batch = hirn_storage::datasets::semantic::to_batch(records, dims)
1267            .map_err(HirnError::storage)?;
1268        self.storage_runtime
1269            .append(hirn_storage::datasets::semantic::DATASET_NAME, batch)
1270            .await
1271            .map_err(HirnError::storage)?;
1272        Ok(())
1273    }
1274
1275    async fn finalize_semantic_create_failure(
1276        &self,
1277        envelope: &hirn_storage::MutationEnvelopeRecord,
1278        record_id: MemoryId,
1279        error_message: String,
1280        cleanup_applied: bool,
1281        stage: &'static str,
1282    ) {
1283        if cleanup_applied {
1284            if let Err(update_error) = hirn_storage::update_mutation_envelope_state(
1285                self.storage_backend(),
1286                &envelope.id,
1287                hirn_storage::MutationEnvelopeState::Failed,
1288                Some(error_message.clone()),
1289            )
1290            .await
1291            {
1292                tracing::warn!(
1293                    record_id = %record_id,
1294                    envelope_id = %envelope.id,
1295                    stage = stage,
1296                    error = %update_error,
1297                    "semantic create mutation envelope fail-fast finalize failed"
1298                );
1299            }
1300        } else {
1301            tracing::warn!(
1302                record_id = %record_id,
1303                envelope_id = %envelope.id,
1304                stage = stage,
1305                error = %error_message,
1306                "semantic create mutation cleanup incomplete; recovery will retry"
1307            );
1308        }
1309    }
1310
1311    fn semantic_logical_exact_filter(
1312        logical_memory_id: LogicalMemoryId,
1313    ) -> hirn_storage::store::ExactMatchFilter {
1314        hirn_storage::store::ExactMatchFilter::utf8_value(
1315            "logical_memory_id",
1316            logical_memory_id.to_string(),
1317        )
1318    }
1319
1320    fn semantic_logical_exact_filter_many(
1321        logical_memory_ids: &[LogicalMemoryId],
1322    ) -> Option<hirn_storage::store::ExactMatchFilter> {
1323        hirn_storage::store::ExactMatchFilter::utf8_values(
1324            "logical_memory_id",
1325            logical_memory_ids
1326                .iter()
1327                .map(ToString::to_string)
1328                .collect::<Vec<_>>(),
1329        )
1330    }
1331
1332    #[cfg(test)]
1333    fn semantic_revision_exact_filter(
1334        revision_id: RevisionId,
1335    ) -> hirn_storage::store::ExactMatchFilter {
1336        hirn_storage::store::ExactMatchFilter::utf8_value("revision_id", revision_id.to_string())
1337    }
1338
1339    fn cached_semantic_head(&self, logical_memory_id: LogicalMemoryId) -> Option<SemanticRecord> {
1340        self.semantic_head_cache_get(logical_memory_id)
1341    }
1342
1343    pub(crate) fn cache_semantic_head(&self, record: &SemanticRecord) {
1344        if let Some(current) = self.cached_semantic_head(record.logical_memory_id) {
1345            if !semantic_revision_is_newer(record, &current) {
1346                return;
1347            }
1348        }
1349        self.semantic_head_cache_put(record.clone());
1350    }
1351
1352    pub(crate) fn evict_semantic_head(&self, logical_memory_id: LogicalMemoryId) {
1353        self.semantic_head_cache_evict(logical_memory_id);
1354    }
1355
1356    pub(crate) fn cached_semantic_heads_snapshot(
1357        &self,
1358    ) -> HashMap<LogicalMemoryId, SemanticRecord> {
1359        self.semantic_head_cache_snapshot()
1360    }
1361
1362    pub(crate) fn replace_semantic_heads(&self, records: impl IntoIterator<Item = SemanticRecord>) {
1363        self.semantic_head_cache_replace(records);
1364    }
1365
1366    async fn load_semantic_head_from_storage(
1367        &self,
1368        logical_memory_id: LogicalMemoryId,
1369    ) -> HirnResult<SemanticRecord> {
1370        let mut batches = self
1371            .storage_runtime
1372            .scan_stream(
1373                hirn_storage::datasets::semantic::DATASET_NAME,
1374                hirn_storage::store::ScanOptions {
1375                    exact_filter: Some(Self::semantic_logical_exact_filter(logical_memory_id)),
1376                    order_by: Some(vec![
1377                        hirn_storage::store::ScanOrdering::desc("version"),
1378                        hirn_storage::store::ScanOrdering::desc("created_at_ms"),
1379                        hirn_storage::store::ScanOrdering::desc("revision_id"),
1380                    ]),
1381                    limit: Some(1),
1382                    ..Default::default()
1383                },
1384            )
1385            .await
1386            .map_err(HirnError::storage)?;
1387
1388        while let Some(batch) = batches.try_next().await.map_err(HirnError::storage)? {
1389            let recs =
1390                hirn_storage::datasets::semantic::from_batch(&batch).map_err(HirnError::storage)?;
1391            if let Some(record) = recs.into_iter().next() {
1392                return Ok(record);
1393            }
1394        }
1395
1396        Err(HirnError::NotFound(format!(
1397            "semantic logical memory {logical_memory_id}"
1398        )))
1399    }
1400
1401    async fn semantic_record_is_current_head(&self, record: &SemanticRecord) -> HirnResult<bool> {
1402        match self
1403            .load_semantic_head_from_storage(record.logical_memory_id)
1404            .await
1405        {
1406            Ok(head) => {
1407                self.cache_semantic_head(&head);
1408                Ok(head.id == record.id)
1409            }
1410            Err(HirnError::NotFound(_)) => Ok(false),
1411            Err(error) => Err(error),
1412        }
1413    }
1414
1415    pub(crate) async fn semantic_head_for_logical_id(
1416        &self,
1417        logical_memory_id: LogicalMemoryId,
1418    ) -> HirnResult<SemanticRecord> {
1419        if let Some(record) = self.cached_semantic_head(logical_memory_id) {
1420            return Ok(record);
1421        }
1422
1423        match self
1424            .load_semantic_head_from_storage(logical_memory_id)
1425            .await
1426        {
1427            Ok(record) => {
1428                self.cache_semantic_head(&record);
1429                Ok(record)
1430            }
1431            Err(HirnError::NotFound(_)) => {
1432                self.evict_semantic_head(logical_memory_id);
1433                Err(HirnError::NotFound(format!(
1434                    "semantic logical memory {logical_memory_id}"
1435                )))
1436            }
1437            Err(error) => Err(error),
1438        }
1439    }
1440
1441    async fn semantic_heads_for_logical_ids(
1442        &self,
1443        logical_memory_ids: &[LogicalMemoryId],
1444    ) -> HirnResult<HashMap<LogicalMemoryId, SemanticRecord>> {
1445        let mut heads = HashMap::with_capacity(logical_memory_ids.len());
1446        let mut missing = Vec::new();
1447
1448        for &logical_memory_id in logical_memory_ids {
1449            if let Some(record) = self.cached_semantic_head(logical_memory_id) {
1450                heads.insert(logical_memory_id, record);
1451            } else {
1452                missing.push(logical_memory_id);
1453            }
1454        }
1455
1456        let Some(exact_filter) = Self::semantic_logical_exact_filter_many(&missing) else {
1457            return Ok(heads);
1458        };
1459
1460        let mut batches = self
1461            .storage_runtime
1462            .scan_stream(
1463                hirn_storage::datasets::semantic::DATASET_NAME,
1464                hirn_storage::store::ScanOptions {
1465                    exact_filter: Some(exact_filter),
1466                    ..Default::default()
1467                },
1468            )
1469            .await
1470            .map_err(HirnError::storage)?;
1471
1472        let mut loaded = HashMap::new();
1473        while let Some(batch) = batches.try_next().await.map_err(HirnError::storage)? {
1474            let recs =
1475                hirn_storage::datasets::semantic::from_batch(&batch).map_err(HirnError::storage)?;
1476            for record in recs {
1477                upsert_semantic_head(&mut loaded, record);
1478            }
1479        }
1480
1481        for (logical_memory_id, record) in loaded {
1482            self.cache_semantic_head(&record);
1483            heads.insert(logical_memory_id, record);
1484        }
1485
1486        for &logical_memory_id in &missing {
1487            if !heads.contains_key(&logical_memory_id) {
1488                self.evict_semantic_head(logical_memory_id);
1489            }
1490        }
1491
1492        Ok(heads)
1493    }
1494
1495    #[cfg(test)]
1496    pub(crate) async fn semantic_edit_target(&self, id: MemoryId) -> HirnResult<SemanticRecord> {
1497        let record = self.read_semantic_record(id).await?;
1498        let head = self
1499            .semantic_head_for_logical_id(record.logical_memory_id)
1500            .await?;
1501
1502        if head.revision_id == record.revision_id {
1503            if head.is_retracted() {
1504                Err(HirnError::InvalidInput(format!(
1505                    "semantic logical memory {} is retracted",
1506                    head.logical_memory_id
1507                )))
1508            } else if let Some(merged_into) = head.merged_into {
1509                Err(HirnError::InvalidInput(format!(
1510                    "semantic logical memory {} has been merged into {}",
1511                    head.logical_memory_id, merged_into
1512                )))
1513            } else {
1514                Ok(head)
1515            }
1516        } else {
1517            Err(HirnError::InvalidInput(format!(
1518                "semantic revision {id} is not the active head"
1519            )))
1520        }
1521    }
1522
1523    async fn resolve_active_semantic_head(&self, id: MemoryId) -> HirnResult<SemanticRecord> {
1524        let record = self.read_semantic_record(id).await?;
1525        let head = self
1526            .semantic_head_for_logical_id(record.logical_memory_id)
1527            .await?;
1528
1529        if head.is_retracted() {
1530            Err(HirnError::InvalidInput(format!(
1531                "semantic logical memory {} is retracted",
1532                head.logical_memory_id
1533            )))
1534        } else if let Some(merged_into) = head.merged_into {
1535            Err(HirnError::InvalidInput(format!(
1536                "semantic logical memory {} has been merged into {}",
1537                head.logical_memory_id, merged_into
1538            )))
1539        } else {
1540            Ok(head)
1541        }
1542    }
1543
1544    async fn live_semantic_heads_for_logical_ids(
1545        &self,
1546        logical_memory_ids: &[LogicalMemoryId],
1547    ) -> HirnResult<HashMap<LogicalMemoryId, SemanticRecord>> {
1548        let heads = self
1549            .semantic_heads_for_logical_ids(logical_memory_ids)
1550            .await?;
1551        Ok(heads
1552            .into_iter()
1553            .filter(|(_, record)| semantic_record_is_live(record))
1554            .collect())
1555    }
1556
1557    pub(crate) async fn semantic_revision_as_of(
1558        &self,
1559        logical_memory_id: LogicalMemoryId,
1560        cutoff: Timestamp,
1561    ) -> HirnResult<Option<SemanticRecord>> {
1562        let mut batches = self
1563            .storage_runtime
1564            .scan_stream(
1565                hirn_storage::datasets::semantic::DATASET_NAME,
1566                hirn_storage::store::ScanOptions {
1567                    exact_filter: Some(Self::semantic_logical_exact_filter(logical_memory_id)),
1568                    order_by: Some(vec![
1569                        hirn_storage::store::ScanOrdering::asc("version"),
1570                        hirn_storage::store::ScanOrdering::asc("created_at_ms"),
1571                    ]),
1572                    ..Default::default()
1573                },
1574            )
1575            .await
1576            .map_err(HirnError::storage)?;
1577
1578        let mut history = Vec::new();
1579        while let Some(batch) = batches.try_next().await.map_err(HirnError::storage)? {
1580            let recs =
1581                hirn_storage::datasets::semantic::from_batch(&batch).map_err(HirnError::storage)?;
1582            history.extend(recs);
1583        }
1584
1585        let Some(record) = semantic_snapshot_head_as_of(&history, cutoff) else {
1586            return Ok(None);
1587        };
1588
1589        if !semantic_record_is_live(&record) {
1590            return Ok(None);
1591        }
1592
1593        Ok(Some(record))
1594    }
1595
1596    async fn semantic_revision_recorded_at_snapshot(
1597        &self,
1598        logical_memory_id: LogicalMemoryId,
1599        snapshot: ResolvedRecallSnapshot,
1600    ) -> HirnResult<Option<SemanticRecord>> {
1601        let mut batches = self
1602            .storage_runtime
1603            .scan_stream(
1604                hirn_storage::datasets::semantic::DATASET_NAME,
1605                hirn_storage::store::ScanOptions {
1606                    exact_filter: Some(Self::semantic_logical_exact_filter(logical_memory_id)),
1607                    order_by: Some(vec![
1608                        hirn_storage::store::ScanOrdering::asc("created_at_ms"),
1609                        hirn_storage::store::ScanOrdering::asc("version"),
1610                    ]),
1611                    ..Default::default()
1612                },
1613            )
1614            .await
1615            .map_err(HirnError::storage)?;
1616
1617        let mut history = Vec::new();
1618        while let Some(batch) = batches.try_next().await.map_err(HirnError::storage)? {
1619            let recs =
1620                hirn_storage::datasets::semantic::from_batch(&batch).map_err(HirnError::storage)?;
1621            history.extend(recs);
1622        }
1623
1624        let Some(record) = semantic_snapshot_head_recorded_at_snapshot(&history, snapshot) else {
1625            return Ok(None);
1626        };
1627
1628        if !semantic_record_is_live(&record) {
1629            return Ok(None);
1630        }
1631
1632        Ok(Some(record))
1633    }
1634
1635    async fn semantic_revisions_for_logical_ids_at_snapshot(
1636        &self,
1637        logical_memory_ids: &[LogicalMemoryId],
1638        snapshot: ResolvedRecallSnapshot,
1639    ) -> HirnResult<HashMap<LogicalMemoryId, SemanticRecord>> {
1640        if let [logical_memory_id] = logical_memory_ids {
1641            let revision = match snapshot {
1642                ResolvedRecallSnapshot::Observed(cutoff) => {
1643                    self.semantic_revision_as_of(*logical_memory_id, cutoff)
1644                        .await?
1645                }
1646                recorded_snapshot => {
1647                    self.semantic_revision_recorded_at_snapshot(
1648                        *logical_memory_id,
1649                        recorded_snapshot,
1650                    )
1651                    .await?
1652                }
1653            };
1654
1655            let mut resolved = HashMap::new();
1656            if let Some(revision) = revision {
1657                resolved.insert(*logical_memory_id, revision);
1658            }
1659            return Ok(resolved);
1660        }
1661
1662        let Some(exact_filter) = Self::semantic_logical_exact_filter_many(logical_memory_ids)
1663        else {
1664            return Ok(HashMap::new());
1665        };
1666
1667        let mut batches = self
1668            .storage_runtime
1669            .scan_stream(
1670                hirn_storage::datasets::semantic::DATASET_NAME,
1671                hirn_storage::store::ScanOptions {
1672                    exact_filter: Some(exact_filter),
1673                    order_by: Some(vec![
1674                        hirn_storage::store::ScanOrdering::asc("version"),
1675                        hirn_storage::store::ScanOrdering::asc("created_at_ms"),
1676                    ]),
1677                    ..Default::default()
1678                },
1679            )
1680            .await
1681            .map_err(HirnError::storage)?;
1682
1683        let mut histories: HashMap<LogicalMemoryId, Vec<SemanticRecord>> =
1684            HashMap::with_capacity(logical_memory_ids.len());
1685        while let Some(batch) = batches.try_next().await.map_err(HirnError::storage)? {
1686            let recs =
1687                hirn_storage::datasets::semantic::from_batch(&batch).map_err(HirnError::storage)?;
1688            for record in recs {
1689                histories
1690                    .entry(record.logical_memory_id)
1691                    .or_default()
1692                    .push(record);
1693            }
1694        }
1695
1696        let mut resolved = HashMap::with_capacity(histories.len());
1697        for (logical_memory_id, mut history) in histories {
1698            history.sort_by(|left, right| {
1699                left.version
1700                    .cmp(&right.version)
1701                    .then_with(|| left.created_at.cmp(&right.created_at))
1702            });
1703
1704            let revision = match snapshot {
1705                ResolvedRecallSnapshot::Observed(cutoff) => {
1706                    semantic_snapshot_head_as_of(&history, cutoff)
1707                }
1708                recorded_snapshot => {
1709                    semantic_snapshot_head_recorded_at_snapshot(&history, recorded_snapshot)
1710                }
1711            };
1712
1713            if let Some(revision) = revision.filter(semantic_record_is_live) {
1714                resolved.insert(logical_memory_id, revision);
1715            }
1716        }
1717
1718        Ok(resolved)
1719    }
1720
1721    pub(super) async fn resolve_recall_snapshot(
1722        &self,
1723        snapshot: RecallSnapshot,
1724    ) -> HirnResult<ResolvedRecallSnapshot> {
1725        match snapshot {
1726            RecallSnapshot::Observed(cutoff) => Ok(ResolvedRecallSnapshot::Observed(cutoff)),
1727            RecallSnapshot::Recorded(cutoff) => Ok(ResolvedRecallSnapshot::Recorded(cutoff)),
1728            RecallSnapshot::Revision(revision_id) => {
1729                let boundary_record = self.get_memory(revision_id.as_memory_id()).await?;
1730                let (logical_memory_id, version) = memory_record_revision_chain(&boundary_record);
1731                Ok(ResolvedRecallSnapshot::Revision {
1732                    cutoff: memory_record_recorded_at(&boundary_record),
1733                    revision_id,
1734                    logical_memory_id,
1735                    version,
1736                })
1737            }
1738        }
1739    }
1740
1741    pub(crate) async fn semantic_revision_for_logical_id_at_snapshot(
1742        &self,
1743        logical_memory_id: LogicalMemoryId,
1744        snapshot: RecallSnapshot,
1745    ) -> HirnResult<Option<SemanticRecord>> {
1746        let snapshot = self.resolve_recall_snapshot(snapshot).await?;
1747        let mut semantic_revisions = self
1748            .semantic_revisions_for_logical_ids_at_snapshot(&[logical_memory_id], snapshot)
1749            .await?;
1750
1751        Ok(semantic_revisions.remove(&logical_memory_id))
1752    }
1753
1754    pub(crate) async fn normalize_current_recall_results(
1755        &self,
1756        results: Vec<RecallResult>,
1757    ) -> HirnResult<Vec<RecallResult>> {
1758        let semantic_heads = self
1759            .live_semantic_heads_for_logical_ids(&collect_semantic_logical_ids(&results))
1760            .await?;
1761        let episodic_heads = self
1762            .live_episodic_heads_for_logical_ids(&collect_episodic_logical_ids(&results))
1763            .await?;
1764        let mut resolved = Vec::with_capacity(results.len());
1765        let mut seen_semantic = HashSet::new();
1766        let mut seen_episodic = HashSet::new();
1767        let mut seen_procedural = HashSet::new();
1768        let mut seen_working = HashSet::new();
1769
1770        for mut result in results {
1771            match &result.record {
1772                MemoryRecord::Semantic(record) => {
1773                    if !seen_semantic.insert(record.logical_memory_id) {
1774                        continue;
1775                    }
1776
1777                    let Some(head) = semantic_heads.get(&record.logical_memory_id) else {
1778                        continue;
1779                    };
1780
1781                    result.record = MemoryRecord::Semantic(head.clone());
1782                    result.revision = Some(RevisionRef {
1783                        logical_memory_id: head.logical_memory_id,
1784                        revision_id: head.revision_id,
1785                        state: RevisionState::Active,
1786                    });
1787                    resolved.push(result);
1788                }
1789                MemoryRecord::Episodic(record) => {
1790                    if !seen_episodic.insert(record.logical_memory_id) {
1791                        continue;
1792                    }
1793
1794                    let Some(head) = episodic_heads.get(&record.logical_memory_id) else {
1795                        continue;
1796                    };
1797
1798                    result.record = MemoryRecord::Episodic(head.clone());
1799                    result.revision = Some(RevisionRef {
1800                        logical_memory_id: head.logical_memory_id,
1801                        revision_id: head.revision_id,
1802                        state: RevisionState::Active,
1803                    });
1804                    resolved.push(result);
1805                }
1806                MemoryRecord::Procedural(record) => {
1807                    if !seen_procedural.insert(record.logical_memory_id) {
1808                        continue;
1809                    }
1810
1811                    let Ok(head) = self
1812                        .procedural_head_for_logical_id(record.logical_memory_id)
1813                        .await
1814                    else {
1815                        continue;
1816                    };
1817                    if !head.is_live() {
1818                        continue;
1819                    }
1820
1821                    result.record = MemoryRecord::Procedural(head.clone());
1822                    result.revision = Some(RevisionRef {
1823                        logical_memory_id: head.logical_memory_id,
1824                        revision_id: head.revision_id,
1825                        state: RevisionState::Active,
1826                    });
1827                    resolved.push(result);
1828                }
1829                MemoryRecord::Working(record) => {
1830                    if !seen_working.insert(record.logical_memory_id) {
1831                        continue;
1832                    }
1833
1834                    let Ok(head) = self
1835                        .working_head_for_logical_id(record.logical_memory_id)
1836                        .await
1837                    else {
1838                        continue;
1839                    };
1840                    if !head.is_live() {
1841                        continue;
1842                    }
1843
1844                    result.record = MemoryRecord::Working(head.clone());
1845                    result.revision = Some(RevisionRef {
1846                        logical_memory_id: head.logical_memory_id,
1847                        revision_id: head.revision_id,
1848                        state: RevisionState::Active,
1849                    });
1850                    resolved.push(result);
1851                }
1852            }
1853        }
1854
1855        Ok(resolved)
1856    }
1857
1858    pub(crate) async fn normalize_recall_results_at_snapshot(
1859        &self,
1860        results: Vec<RecallResult>,
1861        snapshot: RecallSnapshot,
1862    ) -> HirnResult<Vec<RecallResult>> {
1863        let requested_snapshot = snapshot;
1864        let snapshot = self.resolve_recall_snapshot(snapshot).await?;
1865        let semantic_revisions = self
1866            .semantic_revisions_for_logical_ids_at_snapshot(
1867                &collect_semantic_logical_ids(&results),
1868                snapshot,
1869            )
1870            .await?;
1871        let mut resolved = Vec::with_capacity(results.len());
1872        let mut seen_semantic = HashSet::new();
1873        let mut seen_episodic = HashSet::new();
1874        let mut seen_procedural = HashSet::new();
1875        let mut seen_working = HashSet::new();
1876
1877        for mut result in results {
1878            match &result.record {
1879                MemoryRecord::Semantic(record) => {
1880                    if !seen_semantic.insert(record.logical_memory_id) {
1881                        continue;
1882                    }
1883
1884                    let Some(revision) = semantic_revisions.get(&record.logical_memory_id) else {
1885                        continue;
1886                    };
1887
1888                    result.record = MemoryRecord::Semantic(revision.clone());
1889                    result.revision = Some(RevisionRef {
1890                        logical_memory_id: revision.logical_memory_id,
1891                        revision_id: revision.revision_id,
1892                        state: revision.logical_state(),
1893                    });
1894                    resolved.push(result);
1895                }
1896                MemoryRecord::Episodic(record) => {
1897                    if !seen_episodic.insert(record.logical_memory_id) {
1898                        continue;
1899                    }
1900
1901                    let Ok(Some(revision)) = self
1902                        .episodic_revision_for_logical_id_at_snapshot(
1903                            record.logical_memory_id,
1904                            requested_snapshot,
1905                        )
1906                        .await
1907                    else {
1908                        continue;
1909                    };
1910                    if !revision.is_live() {
1911                        continue;
1912                    }
1913
1914                    result.record = MemoryRecord::Episodic(revision.clone());
1915                    result.revision = Some(RevisionRef {
1916                        logical_memory_id: revision.logical_memory_id,
1917                        revision_id: revision.revision_id,
1918                        state: RevisionState::Active,
1919                    });
1920                    resolved.push(result);
1921                }
1922                MemoryRecord::Procedural(record) => {
1923                    if !seen_procedural.insert(record.logical_memory_id) {
1924                        continue;
1925                    }
1926
1927                    let Ok(Some(revision)) = self
1928                        .procedural_revision_for_logical_id_at_snapshot(
1929                            record.logical_memory_id,
1930                            requested_snapshot,
1931                        )
1932                        .await
1933                    else {
1934                        continue;
1935                    };
1936                    if !revision.is_live() {
1937                        continue;
1938                    }
1939
1940                    result.record = MemoryRecord::Procedural(revision.clone());
1941                    result.revision = Some(RevisionRef {
1942                        logical_memory_id: revision.logical_memory_id,
1943                        revision_id: revision.revision_id,
1944                        state: RevisionState::Active,
1945                    });
1946                    resolved.push(result);
1947                }
1948                MemoryRecord::Working(record) => {
1949                    if !seen_working.insert(record.logical_memory_id) {
1950                        continue;
1951                    }
1952
1953                    let Ok(Some(revision)) = self
1954                        .working_revision_for_logical_id_at_snapshot(
1955                            record.logical_memory_id,
1956                            requested_snapshot,
1957                        )
1958                        .await
1959                    else {
1960                        continue;
1961                    };
1962                    if !revision.is_live() {
1963                        continue;
1964                    }
1965
1966                    result.record = MemoryRecord::Working(revision.clone());
1967                    result.revision = Some(RevisionRef {
1968                        logical_memory_id: revision.logical_memory_id,
1969                        revision_id: revision.revision_id,
1970                        state: RevisionState::Active,
1971                    });
1972                    resolved.push(result);
1973                }
1974            }
1975        }
1976
1977        Ok(resolved)
1978    }
1979
1980    /// Retrieve a semantic record by ID.
1981    ///
1982    /// Access stats are buffered and flushed asynchronously during consolidation
1983    /// (F-015: read operations no longer have write side effects).
1984    pub(crate) async fn get_semantic(&self, id: MemoryId) -> HirnResult<SemanticRecord> {
1985        let record = self.read_semantic_record(id).await?;
1986
1987        // Buffer the access to be flushed later (F-015).
1988        self.graph_runtime().buffer_semantic_access(id);
1989
1990        Ok(record)
1991    }
1992
1993    /// Retrieve a semantic record by concept name (default namespace).
1994    pub(crate) async fn get_semantic_by_concept(&self, name: &str) -> HirnResult<SemanticRecord> {
1995        self.get_semantic_by_concept_ns(name, &Namespace::default())
1996            .await
1997    }
1998
1999    /// Retrieve a semantic record by concept name and namespace.
2000    ///
2001    /// Queries LanceDB for the concept in the given namespace.
2002    /// Returns the first match found.
2003    pub(crate) async fn get_semantic_by_concept_ns(
2004        &self,
2005        name: &str,
2006        namespace: &Namespace,
2007    ) -> HirnResult<SemanticRecord> {
2008        let escaped_ns = namespace.as_str().replace('\'', "''");
2009        let escaped_concept = name.replace('\'', "''");
2010        let filter = format!(
2011            "namespace = '{}' AND concept = '{}'",
2012            escaped_ns, escaped_concept
2013        );
2014        let opts = hirn_storage::store::ScanOptions {
2015            filter: Some(filter),
2016            order_by: Some(vec![
2017                hirn_storage::store::ScanOrdering::desc("version"),
2018                hirn_storage::store::ScanOrdering::desc("created_at_ms"),
2019                hirn_storage::store::ScanOrdering::desc("revision_id"),
2020            ]),
2021            ..Default::default()
2022        };
2023        let mut batches = self
2024            .storage_runtime
2025            .scan_stream(hirn_storage::datasets::semantic::DATASET_NAME, opts)
2026            .await
2027            .map_err(HirnError::storage)?;
2028
2029        let mut candidates = Vec::new();
2030        while let Some(batch) = batches.try_next().await.map_err(HirnError::storage)? {
2031            let recs =
2032                hirn_storage::datasets::semantic::from_batch(&batch).map_err(HirnError::storage)?;
2033            candidates.extend(recs);
2034        }
2035
2036        if let Some(rec) = collapse_semantic_heads(candidates)
2037            .into_values()
2038            .find(semantic_record_is_live)
2039        {
2040            return Ok(rec);
2041        }
2042        Err(HirnError::NotFound(format!(
2043            "concept '{name}' in namespace '{namespace}'"
2044        )))
2045    }
2046
2047    /// List semantic records matching a filter.
2048    pub(crate) async fn list_semantics(
2049        &self,
2050        filter: &SemanticFilter,
2051    ) -> HirnResult<Vec<SemanticRecord>> {
2052        let mut parts = Vec::new();
2053        if let Some(ref kt) = filter.knowledge_type {
2054            parts.push(format!("knowledge_type = '{:?}'", kt));
2055        }
2056        if let Some(min_conf) = filter.min_confidence {
2057            parts.push(format!("confidence >= {}", min_conf));
2058        }
2059        if let Some(ref ns) = filter.namespace {
2060            let escaped = ns.as_str().replace('\'', "''");
2061            parts.push(format!("namespace = '{}'", escaped));
2062        }
2063
2064        let lance_filter = if parts.is_empty() {
2065            None
2066        } else {
2067            Some(parts.join(" AND "))
2068        };
2069
2070        let opts = hirn_storage::store::ScanOptions {
2071            filter: lance_filter,
2072            ..Default::default()
2073        };
2074        let mut batches = self
2075            .storage_runtime
2076            .scan_stream(hirn_storage::datasets::semantic::DATASET_NAME, opts)
2077            .await
2078            .map_err(HirnError::storage)?;
2079
2080        let mut results = Vec::new();
2081        while let Some(batch) = batches.try_next().await.map_err(HirnError::storage)? {
2082            let recs =
2083                hirn_storage::datasets::semantic::from_batch(&batch).map_err(HirnError::storage)?;
2084            results.extend(recs);
2085        }
2086
2087        let mut heads: Vec<_> = collapse_semantic_heads(results)
2088            .into_values()
2089            .filter(semantic_record_is_live)
2090            .collect();
2091        heads.sort_by_key(|r| std::cmp::Reverse(r.version));
2092        if let Some(limit) = filter.limit {
2093            heads.truncate(limit);
2094        }
2095
2096        Ok(heads)
2097    }
2098
2099    /// Return the full immutable revision chain for a semantic memory.
2100    pub(crate) async fn semantic_history(&self, id: MemoryId) -> HirnResult<Vec<SemanticRecord>> {
2101        let record = self.read_semantic_record(id).await?;
2102        let mut batches = self
2103            .storage_runtime
2104            .scan_stream(
2105                hirn_storage::datasets::semantic::DATASET_NAME,
2106                hirn_storage::store::ScanOptions {
2107                    exact_filter: Some(Self::semantic_logical_exact_filter(
2108                        record.logical_memory_id,
2109                    )),
2110                    order_by: Some(vec![
2111                        hirn_storage::store::ScanOrdering::asc("version"),
2112                        hirn_storage::store::ScanOrdering::asc("created_at_ms"),
2113                    ]),
2114                    ..Default::default()
2115                },
2116            )
2117            .await
2118            .map_err(HirnError::storage)?;
2119
2120        let mut history = Vec::new();
2121        while let Some(batch) = batches.try_next().await.map_err(HirnError::storage)? {
2122            let recs =
2123                hirn_storage::datasets::semantic::from_batch(&batch).map_err(HirnError::storage)?;
2124            history.extend(recs);
2125        }
2126
2127        history.sort_by(|left, right| {
2128            left.version
2129                .cmp(&right.version)
2130                .then_with(|| left.created_at.cmp(&right.created_at))
2131        });
2132        Ok(history)
2133    }
2134
2135    async fn contradiction_successor_links(
2136        &self,
2137        current: &SemanticRecord,
2138    ) -> Vec<ContradictionSuccessorLink> {
2139        let mut links = Vec::new();
2140        let mut seen_targets = HashSet::new();
2141
2142        for contradiction_id in &current.contradiction_ids {
2143            let (resolved_target, skip_graph_edge) = match self.get_memory(*contradiction_id).await
2144            {
2145                Ok(MemoryRecord::Semantic(record)) => match self
2146                    .semantic_head_for_logical_id(record.logical_memory_id)
2147                    .await
2148                {
2149                    Ok(head) => (head.id, !semantic_record_is_live(&head)),
2150                    Err(_) => (record.id, !semantic_record_is_live(&record)),
2151                },
2152                Ok(_) => (*contradiction_id, false),
2153                Err(_) => continue,
2154            };
2155
2156            if resolved_target == current.id || !seen_targets.insert(resolved_target) {
2157                continue;
2158            }
2159
2160            let mut template = self
2161                .cached_graph()
2162                .get_edges_between(current.id, *contradiction_id)
2163                .await
2164                .unwrap_or_default()
2165                .into_iter()
2166                .find(|edge| edge.relation == EdgeRelation::Contradicts);
2167
2168            if template.is_none() && resolved_target != *contradiction_id {
2169                template = self
2170                    .cached_graph()
2171                    .get_edges_between(current.id, resolved_target)
2172                    .await
2173                    .unwrap_or_default()
2174                    .into_iter()
2175                    .find(|edge| edge.relation == EdgeRelation::Contradicts);
2176            }
2177
2178            links.push(ContradictionSuccessorLink {
2179                target_id: resolved_target,
2180                weight: template.as_ref().map_or(1.0, |edge| {
2181                    edge.confidence().unwrap_or(edge.weight).clamp(0.0, 1.0)
2182                }),
2183                metadata: template.map_or_else(Metadata::new, |edge| edge.metadata),
2184                skip_graph_edge,
2185            });
2186        }
2187
2188        links
2189    }
2190
2191    async fn attach_contradiction_successor_links(
2192        &self,
2193        current: &SemanticRecord,
2194        next: &mut SemanticRecord,
2195    ) {
2196        if current.contradiction_ids.is_empty() {
2197            next.contradiction_ids.clear();
2198            return;
2199        }
2200
2201        let links = self.contradiction_successor_links(current).await;
2202        let mut attached_ids = Vec::new();
2203
2204        for link in links {
2205            if link.target_id == next.id {
2206                continue;
2207            }
2208
2209            if !link.skip_graph_edge {
2210                match self
2211                    .cached_graph()
2212                    .add_edge(
2213                        next.id,
2214                        link.target_id,
2215                        EdgeRelation::Contradicts,
2216                        link.weight,
2217                        link.metadata,
2218                    )
2219                    .await
2220                {
2221                    Ok(_) => {}
2222                    Err(error) => {
2223                        let target_present = self
2224                            .cached_graph()
2225                            .has_node(link.target_id)
2226                            .await
2227                            .unwrap_or(false);
2228                        if target_present {
2229                            tracing::warn!(
2230                                current_id = %current.id,
2231                                next_id = %next.id,
2232                                target_id = %link.target_id,
2233                                error = %error,
2234                                "failed to carry contradiction edge to semantic successor"
2235                            );
2236                        } else {
2237                            tracing::debug!(
2238                                current_id = %current.id,
2239                                next_id = %next.id,
2240                                target_id = %link.target_id,
2241                                error = %error,
2242                                "carrying contradiction lineage without graph edge for non-live target"
2243                            );
2244                        }
2245                    }
2246                }
2247            }
2248
2249            attached_ids.push(link.target_id);
2250        }
2251
2252        attached_ids.sort();
2253        attached_ids.dedup();
2254        next.contradiction_ids = attached_ids;
2255    }
2256
2257    async fn resolve_contradiction_endpoint(
2258        &self,
2259        id: MemoryId,
2260    ) -> HirnResult<ContradictionEndpoint> {
2261        let record = match self.get_memory(id).await {
2262            Ok(record) => record,
2263            Err(HirnError::NotFound(_)) => {
2264                if self.cached_graph().has_node(id).await.unwrap_or(false) {
2265                    return Ok(ContradictionEndpoint::Other(id));
2266                }
2267                return Err(HirnError::NotFound(format!("memory record {id}")));
2268            }
2269            Err(error) => return Err(error),
2270        };
2271
2272        match record {
2273            MemoryRecord::Semantic(record) => {
2274                let head = self
2275                    .semantic_head_for_logical_id(record.logical_memory_id)
2276                    .await?;
2277                if head.is_retracted() {
2278                    return Err(HirnError::InvalidInput(format!(
2279                        "semantic logical memory {} is retracted",
2280                        head.logical_memory_id
2281                    )));
2282                }
2283                if let Some(merged_into) = head.merged_into {
2284                    return Err(HirnError::InvalidInput(format!(
2285                        "semantic logical memory {} has been merged into {}",
2286                        head.logical_memory_id, merged_into
2287                    )));
2288                }
2289                Ok(ContradictionEndpoint::Semantic(Box::new(head)))
2290            }
2291            MemoryRecord::Episodic(record) => {
2292                let head = self
2293                    .episodic_head_for_logical_id(record.logical_memory_id)
2294                    .await?;
2295                if !head.is_live() {
2296                    return Err(HirnError::InvalidInput(format!(
2297                        "episodic logical memory {} is not live",
2298                        head.logical_memory_id
2299                    )));
2300                }
2301                Ok(ContradictionEndpoint::Other(head.id))
2302            }
2303            MemoryRecord::Procedural(record) => {
2304                let head = self
2305                    .procedural_head_for_logical_id(record.logical_memory_id)
2306                    .await?;
2307                if !head.is_live() {
2308                    return Err(HirnError::InvalidInput(format!(
2309                        "procedural logical memory {} is not live",
2310                        head.logical_memory_id
2311                    )));
2312                }
2313                Ok(ContradictionEndpoint::Other(head.id))
2314            }
2315            MemoryRecord::Working(record) => {
2316                let head = self
2317                    .working_head_for_logical_id(record.logical_memory_id)
2318                    .await?;
2319                if !head.is_live() {
2320                    return Err(HirnError::InvalidInput(format!(
2321                        "working logical memory {} is not live",
2322                        head.logical_memory_id
2323                    )));
2324                }
2325                Ok(ContradictionEndpoint::Other(head.id))
2326            }
2327        }
2328    }
2329
2330    fn prepare_semantic_contradiction_successor(
2331        &self,
2332        current: &SemanticRecord,
2333        next_id: MemoryId,
2334        contradiction_target_id: MemoryId,
2335        now: Timestamp,
2336    ) -> SemanticRecord {
2337        let mut next = current.clone();
2338        next.id = next_id;
2339        next.revision_id = RevisionId::from_memory_id(next_id);
2340        next.version = current.version + 1;
2341        next.revision_operation = RevisionOperation::Correct;
2342        next.revision_reason = Some("contradiction relation updated".to_string());
2343        next.revision_causation_id = Some(contradiction_target_id);
2344        next.created_at = now;
2345        next.updated_at = now;
2346        next.valid_from = current.valid_from;
2347        next.valid_until = None;
2348        next.superseded_by = None;
2349        next.merged_into = None;
2350        next.provenance.created_by = AgentId::well_known("system");
2351
2352        let old_contradictions = format_memory_id_list(&current.contradiction_ids);
2353        let mut new_contradictions = current.contradiction_ids.clone();
2354        new_contradictions.push(contradiction_target_id);
2355        new_contradictions.sort_unstable();
2356        new_contradictions.dedup();
2357
2358        next.provenance
2359            .record_mutation(hirn_core::provenance::Mutation {
2360                timestamp: now,
2361                trigger: MutationTrigger::Manual,
2362                field: "contradiction_ids".to_string(),
2363                old_value: old_contradictions,
2364                new_value: format_memory_id_list(&new_contradictions),
2365                reason: "contradiction relation updated".to_string(),
2366            });
2367
2368        normalize_semantic_record_timestamps(&mut next);
2369        next
2370    }
2371
2372    async fn synchronize_contradiction_refs(
2373        &self,
2374        source: MemoryId,
2375        target: MemoryId,
2376        edge_spec: Option<(f32, Metadata)>,
2377    ) -> HirnResult<ContradictionSyncResult> {
2378        let source_endpoint = self.resolve_contradiction_endpoint(source).await?;
2379        let target_endpoint = self.resolve_contradiction_endpoint(target).await?;
2380        let source_id = source_endpoint.id();
2381        let target_id = target_endpoint.id();
2382
2383        if source_id == target_id {
2384            return Err(HirnError::InvalidInput(
2385                "a record cannot contradict itself".into(),
2386            ));
2387        }
2388
2389        if let (Some(source_record), Some(target_record)) =
2390            (source_endpoint.as_semantic(), target_endpoint.as_semantic())
2391        {
2392            if source_record.logical_memory_id == target_record.logical_memory_id {
2393                return Err(HirnError::InvalidInput(format!(
2394                    "semantic logical memory {} cannot contradict itself",
2395                    source_record.logical_memory_id
2396                )));
2397            }
2398        }
2399
2400        let semantic_pair_requires_successors =
2401            match (source_endpoint.as_semantic(), target_endpoint.as_semantic()) {
2402                (Some(source_record), Some(target_record)) => {
2403                    !source_record.contradiction_ids.contains(&target_id)
2404                        || !target_record.contradiction_ids.contains(&source_id)
2405                }
2406                _ => false,
2407            };
2408
2409        let source_requires_successor =
2410            match (source_endpoint.as_semantic(), target_endpoint.as_semantic()) {
2411                (Some(_), Some(_)) => semantic_pair_requires_successors,
2412                (Some(source_record), None) => {
2413                    !source_record.contradiction_ids.contains(&target_id)
2414                }
2415                _ => false,
2416            };
2417        let target_requires_successor =
2418            match (source_endpoint.as_semantic(), target_endpoint.as_semantic()) {
2419                (Some(_), Some(_)) => semantic_pair_requires_successors,
2420                (None, Some(target_record)) => {
2421                    !target_record.contradiction_ids.contains(&source_id)
2422                }
2423                _ => false,
2424            };
2425
2426        if !source_requires_successor && !target_requires_successor {
2427            let edge_id = if let Some((weight, metadata)) = edge_spec {
2428                let existing = self
2429                    .cached_graph()
2430                    .get_edges_between(source_id, target_id)
2431                    .await
2432                    .unwrap_or_default()
2433                    .into_iter()
2434                    .find(|edge| edge.relation == EdgeRelation::Contradicts)
2435                    .map(|edge| edge.id);
2436
2437                match existing {
2438                    Some(edge_id) => Some(edge_id),
2439                    None => Some(
2440                        self.cached_graph()
2441                            .add_edge(
2442                                source_id,
2443                                target_id,
2444                                EdgeRelation::Contradicts,
2445                                weight,
2446                                metadata,
2447                            )
2448                            .await?,
2449                    ),
2450                }
2451            } else {
2452                None
2453            };
2454
2455            return Ok(ContradictionSyncResult {
2456                source_memory: source_id,
2457                target_memory: target_id,
2458                contradiction_edge: edge_id,
2459            });
2460        }
2461
2462        let now = Timestamp::now();
2463        let final_source_id = if source_requires_successor {
2464            MemoryId::new()
2465        } else {
2466            source_id
2467        };
2468        let final_target_id = if target_requires_successor {
2469            MemoryId::new()
2470        } else {
2471            target_id
2472        };
2473
2474        let mut source_prepared = source_endpoint
2475            .as_semantic()
2476            .filter(|_| source_requires_successor)
2477            .map(|current| PreparedSemanticContradictionSuccessor {
2478                current: current.clone(),
2479                next: self.prepare_semantic_contradiction_successor(
2480                    current,
2481                    final_source_id,
2482                    final_target_id,
2483                    now,
2484                ),
2485            });
2486        let mut target_prepared = target_endpoint
2487            .as_semantic()
2488            .filter(|_| target_requires_successor)
2489            .map(|current| PreparedSemanticContradictionSuccessor {
2490                current: current.clone(),
2491                next: self.prepare_semantic_contradiction_successor(
2492                    current,
2493                    final_target_id,
2494                    final_source_id,
2495                    now,
2496                ),
2497            });
2498
2499        let envelope = build_semantic_contradiction_sync_envelope(
2500            [source_prepared.as_ref(), target_prepared.as_ref()]
2501                .into_iter()
2502                .flatten()
2503                .map(|prepared| prepared.current.id)
2504                .collect(),
2505            [source_prepared.as_ref(), target_prepared.as_ref()]
2506                .into_iter()
2507                .flatten()
2508                .map(|prepared| prepared.next.id)
2509                .collect(),
2510        )?;
2511        hirn_storage::append_mutation_envelope(self.storage_backend(), &envelope)
2512            .await
2513            .map_err(HirnError::storage)?;
2514
2515        let mut added_nodes = Vec::new();
2516        for prepared in [source_prepared.as_ref(), target_prepared.as_ref()]
2517            .into_iter()
2518            .flatten()
2519        {
2520            if let Err(error) = self
2521                .cached_graph()
2522                .add_node(
2523                    prepared.next.id,
2524                    Layer::Semantic,
2525                    prepared.next.confidence,
2526                    prepared.next.created_at,
2527                    prepared.next.namespace,
2528                )
2529                .await
2530            {
2531                for added_id in added_nodes.into_iter().rev() {
2532                    let _ = self.cached_graph().remove_node(added_id).await;
2533                }
2534                if let Err(update_error) = hirn_storage::update_mutation_envelope_state(
2535                    self.storage_backend(),
2536                    &envelope.id,
2537                    hirn_storage::MutationEnvelopeState::Failed,
2538                    Some(error.to_string()),
2539                )
2540                .await
2541                {
2542                    tracing::warn!(
2543                        source_id = %source_id,
2544                        target_id = %target_id,
2545                        envelope_id = %envelope.id,
2546                        error = %update_error,
2547                        "semantic contradiction sync envelope fail-fast finalize failed after graph add_node error"
2548                    );
2549                }
2550                return Err(error);
2551            }
2552            added_nodes.push(prepared.next.id);
2553        }
2554
2555        for prepared in [source_prepared.as_ref(), target_prepared.as_ref()]
2556            .into_iter()
2557            .flatten()
2558        {
2559            if let Some(ref embedding) = prepared.next.embedding {
2560                let candidates = self.find_similarity_candidates(embedding).await;
2561                if let Err(error) = self
2562                    .apply_similarity_edges(prepared.next.id, &candidates)
2563                    .await
2564                {
2565                    for added_id in added_nodes.iter().rev().copied() {
2566                        let _ = self.cached_graph().remove_node(added_id).await;
2567                    }
2568                    if let Err(update_error) = hirn_storage::update_mutation_envelope_state(
2569                        self.storage_backend(),
2570                        &envelope.id,
2571                        hirn_storage::MutationEnvelopeState::Failed,
2572                        Some(error.to_string()),
2573                    )
2574                    .await
2575                    {
2576                        tracing::warn!(
2577                            source_id = %source_id,
2578                            target_id = %target_id,
2579                            envelope_id = %envelope.id,
2580                            error = %update_error,
2581                            "semantic contradiction sync envelope fail-fast finalize failed after similarity edge error"
2582                        );
2583                    }
2584                    return Err(error);
2585                }
2586            }
2587        }
2588
2589        if let Some(prepared) = source_prepared.as_mut() {
2590            self.attach_contradiction_successor_links(&prepared.current, &mut prepared.next)
2591                .await;
2592            if semantic_pair_requires_successors {
2593                prepared
2594                    .next
2595                    .contradiction_ids
2596                    .retain(|id| *id != target_id);
2597            }
2598            prepared.next.contradiction_ids.push(final_target_id);
2599            prepared.next.contradiction_ids.sort_unstable();
2600            prepared.next.contradiction_ids.dedup();
2601        }
2602        if let Some(prepared) = target_prepared.as_mut() {
2603            self.attach_contradiction_successor_links(&prepared.current, &mut prepared.next)
2604                .await;
2605            if semantic_pair_requires_successors {
2606                prepared
2607                    .next
2608                    .contradiction_ids
2609                    .retain(|id| *id != source_id);
2610            }
2611            prepared.next.contradiction_ids.push(final_source_id);
2612            prepared.next.contradiction_ids.sort_unstable();
2613            prepared.next.contradiction_ids.dedup();
2614        }
2615
2616        let edge_id = if let Some((weight, metadata)) = edge_spec {
2617            match self
2618                .cached_graph()
2619                .add_edge(
2620                    final_source_id,
2621                    final_target_id,
2622                    EdgeRelation::Contradicts,
2623                    weight,
2624                    metadata,
2625                )
2626                .await
2627            {
2628                Ok(edge_id) => Some(edge_id),
2629                Err(error) => {
2630                    for added_id in added_nodes.iter().rev().copied() {
2631                        let _ = self.cached_graph().remove_node(added_id).await;
2632                    }
2633                    if let Err(update_error) = hirn_storage::update_mutation_envelope_state(
2634                        self.storage_backend(),
2635                        &envelope.id,
2636                        hirn_storage::MutationEnvelopeState::Failed,
2637                        Some(error.to_string()),
2638                    )
2639                    .await
2640                    {
2641                        tracing::warn!(
2642                            source_id = %source_id,
2643                            target_id = %target_id,
2644                            envelope_id = %envelope.id,
2645                            error = %update_error,
2646                            "semantic contradiction sync envelope fail-fast finalize failed after contradiction edge error"
2647                        );
2648                    }
2649                    return Err(error);
2650                }
2651            }
2652        } else {
2653            None
2654        };
2655
2656        let next_records: Vec<SemanticRecord> =
2657            [source_prepared.as_ref(), target_prepared.as_ref()]
2658                .into_iter()
2659                .flatten()
2660                .map(|prepared| prepared.next.clone())
2661                .collect();
2662
2663        if let Err(error) = self.append_semantic_records(&next_records).await {
2664            for added_id in added_nodes.iter().rev().copied() {
2665                let _ = self.cached_graph().remove_node(added_id).await;
2666            }
2667            if let Err(update_error) = hirn_storage::update_mutation_envelope_state(
2668                self.storage_backend(),
2669                &envelope.id,
2670                hirn_storage::MutationEnvelopeState::Failed,
2671                Some(error.to_string()),
2672            )
2673            .await
2674            {
2675                tracing::warn!(
2676                    source_id = %source_id,
2677                    target_id = %target_id,
2678                    envelope_id = %envelope.id,
2679                    error = %update_error,
2680                    "semantic contradiction sync envelope fail-fast finalize failed after semantic append error"
2681                );
2682            }
2683            return Err(error);
2684        }
2685
2686        if let Some(prepared) = source_prepared.as_ref() {
2687            self.cache_semantic_head(&prepared.next);
2688        }
2689        if let Some(prepared) = target_prepared.as_ref() {
2690            self.cache_semantic_head(&prepared.next);
2691        }
2692
2693        let mut predecessors_removed = true;
2694
2695        for prepared in [source_prepared.as_ref(), target_prepared.as_ref()]
2696            .into_iter()
2697            .flatten()
2698        {
2699            match self.cached_graph().has_node(prepared.current.id).await {
2700                Ok(true) => {
2701                    if let Err(error) = self.cached_graph().remove_node(prepared.current.id).await {
2702                        tracing::warn!(
2703                            id = %prepared.current.id,
2704                            error = %error,
2705                            "failed to remove superseded semantic graph node after contradiction sync"
2706                        );
2707                        predecessors_removed = false;
2708                    }
2709                }
2710                Ok(false) => {}
2711                Err(error) => {
2712                    tracing::warn!(
2713                        id = %prepared.current.id,
2714                        error = %error,
2715                        "failed to inspect superseded semantic graph node after contradiction sync"
2716                    );
2717                    predecessors_removed = false;
2718                }
2719            }
2720        }
2721
2722        if predecessors_removed {
2723            if let Err(error) = hirn_storage::update_mutation_envelope_state(
2724                self.storage_backend(),
2725                &envelope.id,
2726                hirn_storage::MutationEnvelopeState::Applied,
2727                None,
2728            )
2729            .await
2730            {
2731                tracing::warn!(
2732                    source_id = %source_id,
2733                    target_id = %target_id,
2734                    envelope_id = %envelope.id,
2735                    error = %error,
2736                    "semantic contradiction sync envelope finalize failed; recovery will retry predecessor cleanup"
2737                );
2738            }
2739        }
2740
2741        Ok(ContradictionSyncResult {
2742            source_memory: final_source_id,
2743            target_memory: final_target_id,
2744            contradiction_edge: edge_id,
2745        })
2746    }
2747
2748    pub(crate) async fn connect_contradiction(
2749        &self,
2750        source: MemoryId,
2751        target: MemoryId,
2752        weight: f32,
2753        metadata: Metadata,
2754    ) -> HirnResult<crate::graph::EdgeId> {
2755        let synced = self
2756            .synchronize_contradiction_refs(source, target, Some((weight, metadata)))
2757            .await?;
2758
2759        synced.contradiction_edge.ok_or_else(|| {
2760            HirnError::InvalidInput(format!(
2761                "failed to materialize contradiction edge between {} and {}",
2762                synced.source_memory, synced.target_memory
2763            ))
2764        })
2765    }
2766
2767    async fn append_semantic_successor(
2768        &self,
2769        current: &SemanticRecord,
2770        update: SemanticUpdate,
2771        operation: RevisionOperation,
2772        preserve_valid_from: bool,
2773        authorization_action: crate::policy::Action,
2774    ) -> HirnResult<SemanticRecord> {
2775        let actor_id = update.actor_id;
2776        self.enforce(
2777            actor_id.as_str(),
2778            authorization_action,
2779            &self.config.default_realm,
2780            current.namespace.as_str(),
2781        )
2782        .await?;
2783
2784        let now = Timestamp::now();
2785        let mut next = current.clone();
2786        let new_id = MemoryId::new();
2787        next.id = new_id;
2788        next.revision_id = RevisionId::from_memory_id(new_id);
2789        next.version = current.version + 1;
2790        next.revision_operation = operation;
2791        next.revision_reason.clone_from(&update.reason);
2792        next.revision_causation_id = Some(update.causation_id);
2793        next.created_at = now;
2794        next.updated_at = now;
2795        next.valid_from = if preserve_valid_from {
2796            update.observed_at.unwrap_or(current.valid_from)
2797        } else {
2798            update.observed_at.unwrap_or(now)
2799        };
2800        next.valid_until = None;
2801        next.superseded_by = None;
2802        next.merged_into = None;
2803        next.provenance.created_by = actor_id;
2804
2805        let reason = update.reason.clone().unwrap_or_else(|| match operation {
2806            RevisionOperation::Override => "override".to_string(),
2807            RevisionOperation::Supersede => "supersede".to_string(),
2808            _ => "update".to_string(),
2809        });
2810
2811        self.apply_semantic_update_fields(current, &mut next, &update, &reason, now)
2812            .await?;
2813        normalize_semantic_record_timestamps(&mut next);
2814
2815        let envelope = build_semantic_successor_envelope(current.id, next.id)?;
2816        hirn_storage::append_mutation_envelope(self.storage_backend(), &envelope)
2817            .await
2818            .map_err(HirnError::storage)?;
2819
2820        if let Err(error) = self
2821            .cached_graph()
2822            .add_node(
2823                next.id,
2824                Layer::Semantic,
2825                next.confidence,
2826                next.created_at,
2827                next.namespace,
2828            )
2829            .await
2830        {
2831            if let Err(update_error) = hirn_storage::update_mutation_envelope_state(
2832                self.storage_backend(),
2833                &envelope.id,
2834                hirn_storage::MutationEnvelopeState::Failed,
2835                Some(error.to_string()),
2836            )
2837            .await
2838            {
2839                tracing::warn!(
2840                    current_id = %current.id,
2841                    next_id = %next.id,
2842                    envelope_id = %envelope.id,
2843                    error = %update_error,
2844                    "semantic successor mutation envelope fail-fast finalize failed after graph add_node error"
2845                );
2846            }
2847            return Err(error);
2848        }
2849
2850        if let Some(ref emb) = next.embedding {
2851            let candidates = self.find_similarity_candidates(emb).await;
2852            if let Err(error) = self.apply_similarity_edges(next.id, &candidates).await {
2853                let _ = self.cached_graph().remove_node(next.id).await;
2854                if let Err(update_error) = hirn_storage::update_mutation_envelope_state(
2855                    self.storage_backend(),
2856                    &envelope.id,
2857                    hirn_storage::MutationEnvelopeState::Failed,
2858                    Some(error.to_string()),
2859                )
2860                .await
2861                {
2862                    tracing::warn!(
2863                        current_id = %current.id,
2864                        next_id = %next.id,
2865                        envelope_id = %envelope.id,
2866                        error = %update_error,
2867                        "semantic successor mutation envelope fail-fast finalize failed after similarity edge error"
2868                    );
2869                }
2870                return Err(error);
2871            }
2872        }
2873
2874        self.attach_contradiction_successor_links(current, &mut next)
2875            .await;
2876
2877        if let Err(error) = self.append_semantic_record(&next).await {
2878            let _ = self.cached_graph().remove_node(next.id).await;
2879            if let Err(update_error) = hirn_storage::update_mutation_envelope_state(
2880                self.storage_backend(),
2881                &envelope.id,
2882                hirn_storage::MutationEnvelopeState::Failed,
2883                Some(error.to_string()),
2884            )
2885            .await
2886            {
2887                tracing::warn!(
2888                    current_id = %current.id,
2889                    next_id = %next.id,
2890                    envelope_id = %envelope.id,
2891                    error = %update_error,
2892                    "semantic successor mutation envelope fail-fast finalize failed after semantic append error"
2893                );
2894            }
2895            return Err(error);
2896        }
2897
2898        self.cache_semantic_head(&next);
2899
2900        let predecessor_removed = match self.cached_graph().has_node(current.id).await {
2901            Ok(true) => match self.cached_graph().remove_node(current.id).await {
2902                Ok(_) => true,
2903                Err(error) => {
2904                    tracing::warn!(id = %current.id, error = %error, "failed to remove superseded semantic graph node");
2905                    false
2906                }
2907            },
2908            Ok(false) => true,
2909            Err(error) => {
2910                tracing::warn!(id = %current.id, error = %error, "failed to inspect superseded semantic graph node state");
2911                false
2912            }
2913        };
2914
2915        if predecessor_removed {
2916            if let Err(error) = hirn_storage::update_mutation_envelope_state(
2917                self.storage_backend(),
2918                &envelope.id,
2919                hirn_storage::MutationEnvelopeState::Applied,
2920                None,
2921            )
2922            .await
2923            {
2924                tracing::warn!(
2925                    current_id = %current.id,
2926                    next_id = %next.id,
2927                    envelope_id = %envelope.id,
2928                    error = %error,
2929                    "semantic successor mutation envelope finalize failed; recovery will retry predecessor cleanup"
2930                );
2931            }
2932        }
2933
2934        Ok(next)
2935    }
2936
2937    async fn apply_semantic_update_fields(
2938        &self,
2939        current: &SemanticRecord,
2940        next: &mut SemanticRecord,
2941        update: &SemanticUpdate,
2942        reason: &str,
2943        now: Timestamp,
2944    ) -> HirnResult<()> {
2945        if let Some(desc) = update.description.clone() {
2946            let mutation = hirn_core::provenance::Mutation {
2947                timestamp: now,
2948                trigger: MutationTrigger::Manual,
2949                field: "description".to_string(),
2950                old_value: current.description.clone(),
2951                new_value: desc.clone(),
2952                reason: reason.to_string(),
2953            };
2954            next.provenance.record_mutation(mutation);
2955            next.description = desc;
2956            next.embedding = Some(self.embed_text(&next.description).await?);
2957        }
2958
2959        if let Some(conf) = update.confidence {
2960            let clamped = conf.clamp(0.0, 1.0);
2961            let mutation = hirn_core::provenance::Mutation {
2962                timestamp: now,
2963                trigger: MutationTrigger::Manual,
2964                field: "confidence".to_string(),
2965                old_value: current.confidence.to_string(),
2966                new_value: clamped.to_string(),
2967                reason: reason.to_string(),
2968            };
2969            next.provenance.record_mutation(mutation);
2970            next.confidence = clamped;
2971        }
2972
2973        if let Some(count) = update.evidence_count {
2974            next.evidence_count = count;
2975        }
2976
2977        Ok(())
2978    }
2979
2980    /// Append a corrected revision for a semantic record.
2981    pub(crate) async fn correct_semantic(
2982        &self,
2983        id: MemoryId,
2984        update: SemanticUpdate,
2985    ) -> HirnResult<SemanticRecord> {
2986        let current = self.resolve_active_semantic_head(id).await?;
2987        let next = self
2988            .append_semantic_successor(
2989                &current,
2990                update,
2991                RevisionOperation::Correct,
2992                true,
2993                crate::policy::Action::Correct,
2994            )
2995            .await?;
2996
2997        self.emit_scoped(
2998            next.namespace.as_str(),
2999            next.provenance.created_by.as_str(),
3000            MemoryEvent::MemoryCorrected {
3001                logical_memory_id: current.logical_memory_id,
3002                old_revision_id: current.revision_id,
3003                new_revision_id: next.revision_id,
3004                reason: next.revision_reason.clone(),
3005            },
3006        )
3007        .await;
3008
3009        Ok(next)
3010    }
3011
3012    /// Append a superseding revision for a semantic record.
3013    pub(crate) async fn supersede_semantic(
3014        &self,
3015        id: MemoryId,
3016        supersession: SemanticSupersession,
3017    ) -> HirnResult<SemanticRecord> {
3018        let current = self.resolve_active_semantic_head(id).await?;
3019        let next = self
3020            .append_semantic_successor(
3021                &current,
3022                supersession.into(),
3023                RevisionOperation::Supersede,
3024                false,
3025                crate::policy::Action::Supersede,
3026            )
3027            .await?;
3028
3029        self.emit_scoped(
3030            next.namespace.as_str(),
3031            next.provenance.created_by.as_str(),
3032            MemoryEvent::MemorySuperseded {
3033                logical_memory_id: current.logical_memory_id,
3034                prior_revision_id: current.revision_id,
3035                new_revision_id: next.revision_id,
3036                reason: next.revision_reason.clone(),
3037            },
3038        )
3039        .await;
3040
3041        Ok(next)
3042    }
3043
3044    /// Append an explicit human/admin override revision for a semantic record.
3045    pub(crate) async fn override_semantic(
3046        &self,
3047        id: MemoryId,
3048        override_request: SemanticOverride,
3049    ) -> HirnResult<SemanticRecord> {
3050        let current = self.resolve_active_semantic_head(id).await?;
3051        let actor_id = override_request.actor_id;
3052        let reason = override_request
3053            .reason
3054            .clone()
3055            .map(|reason| reason.trim().to_string())
3056            .filter(|reason| !reason.is_empty())
3057            .ok_or_else(|| {
3058                HirnError::InvalidInput("semantic override requires a non-empty reason".into())
3059            })?;
3060        let next = self
3061            .append_semantic_successor(
3062                &current,
3063                SemanticUpdate {
3064                    description: override_request.description,
3065                    confidence: override_request.confidence,
3066                    evidence_count: override_request.evidence_count,
3067                    reason: Some(reason.clone()),
3068                    actor_id,
3069                    observed_at: override_request.observed_at,
3070                    causation_id: override_request.causation_id,
3071                },
3072                RevisionOperation::Override,
3073                false,
3074                crate::policy::Action::Admin,
3075            )
3076            .await?;
3077
3078        self.append_audit(
3079            Some(actor_id.clone()),
3080            hirn_core::audit::AuditAction::BeliefOverride {
3081                logical_memory_id: current.logical_memory_id,
3082                prior_revision_id: current.revision_id,
3083                override_revision_id: next.revision_id,
3084                namespace: next.namespace.as_str().to_string(),
3085                reason: reason.clone(),
3086            },
3087        )
3088        .await?;
3089
3090        self.emit_scoped(
3091            next.namespace.as_str(),
3092            actor_id.as_str(),
3093            MemoryEvent::MemoryOverridden {
3094                logical_memory_id: current.logical_memory_id,
3095                prior_revision_id: current.revision_id,
3096                override_revision_id: next.revision_id,
3097                reason: Some(reason),
3098            },
3099        )
3100        .await;
3101
3102        Ok(next)
3103    }
3104
3105    /// Merge one or more semantic logical memories into an active target chain.
3106    pub(crate) async fn merge_semantic(
3107        &self,
3108        target_id: MemoryId,
3109        merge: SemanticMerge,
3110    ) -> HirnResult<SemanticMergeOutcome> {
3111        if merge.source_ids.is_empty() {
3112            return Err(HirnError::InvalidInput(
3113                "MERGE MEMORY requires at least one source memory".into(),
3114            ));
3115        }
3116
3117        let current = self.resolve_active_semantic_head(target_id).await?;
3118        let actor_id = merge.actor_id;
3119        self.enforce(
3120            actor_id.as_str(),
3121            crate::policy::Action::Merge,
3122            &self.config.default_realm,
3123            current.namespace.as_str(),
3124        )
3125        .await?;
3126
3127        let mut source_heads = Vec::with_capacity(merge.source_ids.len());
3128        let mut seen_sources = HashSet::new();
3129        for source_id in &merge.source_ids {
3130            let source = self.resolve_active_semantic_head(*source_id).await?;
3131            if source.logical_memory_id == current.logical_memory_id {
3132                return Err(HirnError::InvalidInput(format!(
3133                    "MERGE MEMORY source '{}' resolves to the target logical memory {}",
3134                    source_id, current.logical_memory_id
3135                )));
3136            }
3137            if !seen_sources.insert(source.logical_memory_id) {
3138                return Err(HirnError::InvalidInput(format!(
3139                    "MERGE MEMORY source '{}' duplicates logical memory {}",
3140                    source_id, source.logical_memory_id
3141                )));
3142            }
3143            if source.namespace != current.namespace {
3144                return Err(HirnError::InvalidInput(format!(
3145                    "MERGE MEMORY source {} is in namespace '{}' but target is in '{}'",
3146                    source.id,
3147                    source.namespace.as_str(),
3148                    current.namespace.as_str()
3149                )));
3150            }
3151            if source.concept != current.concept {
3152                return Err(HirnError::InvalidInput(format!(
3153                    "MERGE MEMORY source {} uses concept '{}' but target uses '{}'",
3154                    source.id, source.concept, current.concept
3155                )));
3156            }
3157            source_heads.push(source);
3158        }
3159
3160        let now = Timestamp::now();
3161        let observed_at = merge.observed_at.unwrap_or(now);
3162        let causation_id = merge.causation_id;
3163        let reason = merge.reason.clone().unwrap_or_else(|| "merge".to_string());
3164
3165        let participants: Vec<&SemanticRecord> = std::iter::once(&current)
3166            .chain(source_heads.iter())
3167            .collect();
3168        let (default_confidence, default_evidence_count) =
3169            merged_confidence_and_evidence(&participants);
3170
3171        let mut target = current.clone();
3172        let new_target_id = MemoryId::new();
3173        target.id = new_target_id;
3174        target.revision_id = RevisionId::from_memory_id(new_target_id);
3175        target.version = current.version + 1;
3176        target.revision_operation = RevisionOperation::Merge;
3177        target.revision_reason.clone_from(&merge.reason);
3178        target.revision_causation_id = Some(causation_id);
3179        target.created_at = now;
3180        target.updated_at = now;
3181        target.valid_from = observed_at;
3182        target.valid_until = None;
3183        target.superseded_by = None;
3184        target.merged_into = None;
3185        target.provenance.created_by = actor_id.clone();
3186
3187        let mut merged_source_episodes = target.source_episodes.clone();
3188        let mut merged_contradictions = target.contradiction_ids.clone();
3189        for source in &source_heads {
3190            merged_source_episodes.extend(source.source_episodes.iter().copied());
3191            merged_contradictions.extend(source.contradiction_ids.iter().copied());
3192        }
3193        merged_source_episodes.sort();
3194        merged_source_episodes.dedup();
3195        merged_contradictions.sort();
3196        merged_contradictions.dedup();
3197        target.source_episodes = merged_source_episodes;
3198        target.contradiction_ids = merged_contradictions;
3199
3200        let target_update = SemanticUpdate {
3201            description: merge.description.clone(),
3202            confidence: merge.confidence.or(Some(default_confidence)),
3203            evidence_count: merge.evidence_count.or(Some(default_evidence_count)),
3204            reason: merge.reason.clone(),
3205            actor_id,
3206            observed_at: Some(observed_at),
3207            causation_id,
3208        };
3209        self.apply_semantic_update_fields(&current, &mut target, &target_update, &reason, now)
3210            .await?;
3211        normalize_semantic_record_timestamps(&mut target);
3212
3213        let mut merged_sources = Vec::with_capacity(source_heads.len());
3214        for source in &source_heads {
3215            let merged_source_id = MemoryId::new();
3216            let mut merged_source = source.clone();
3217            merged_source.id = merged_source_id;
3218            merged_source.revision_id = RevisionId::from_memory_id(merged_source_id);
3219            merged_source.version = source.version + 1;
3220            merged_source.revision_operation = RevisionOperation::Merge;
3221            merged_source.revision_reason.clone_from(&merge.reason);
3222            merged_source.revision_causation_id = Some(target.id);
3223            merged_source.created_at = now;
3224            merged_source.updated_at = now;
3225            merged_source.valid_from = observed_at;
3226            merged_source.valid_until = None;
3227            merged_source.superseded_by = None;
3228            merged_source.merged_into = Some(target.logical_memory_id);
3229            merged_source.provenance.created_by = actor_id.clone();
3230            normalize_semantic_record_timestamps(&mut merged_source);
3231            merged_sources.push(merged_source);
3232        }
3233
3234        let envelope = build_semantic_merge_envelope(
3235            current.id,
3236            target.id,
3237            source_heads.iter().map(|source| source.id).collect(),
3238            merged_sources.iter().map(|source| source.id).collect(),
3239        )?;
3240        hirn_storage::append_mutation_envelope(self.storage_backend(), &envelope)
3241            .await
3242            .map_err(HirnError::storage)?;
3243
3244        let mut added_nodes = Vec::with_capacity(1 + merged_sources.len());
3245        let graph_setup = async {
3246            self.cached_graph()
3247                .add_node(
3248                    target.id,
3249                    Layer::Semantic,
3250                    target.confidence,
3251                    target.created_at,
3252                    target.namespace,
3253                )
3254                .await?;
3255            added_nodes.push(target.id);
3256
3257            for merged_source in &merged_sources {
3258                self.cached_graph()
3259                    .add_node(
3260                        merged_source.id,
3261                        Layer::Semantic,
3262                        merged_source.confidence,
3263                        merged_source.created_at,
3264                        merged_source.namespace,
3265                    )
3266                    .await?;
3267                added_nodes.push(merged_source.id);
3268            }
3269
3270            if let Some(ref emb) = target.embedding {
3271                let candidates = self.find_similarity_candidates(emb).await;
3272                self.apply_similarity_edges(target.id, &candidates).await?;
3273            }
3274
3275            for merged_source in &merged_sources {
3276                self.connect_with(
3277                    target.id,
3278                    merged_source.id,
3279                    EdgeRelation::DerivedFrom,
3280                    1.0,
3281                    Metadata::default(),
3282                )
3283                .await?;
3284            }
3285
3286            HirnResult::Ok(())
3287        }
3288        .await;
3289
3290        if let Err(error) = graph_setup {
3291            for node_id in added_nodes {
3292                let _ = self.cached_graph().remove_node(node_id).await;
3293            }
3294            if let Err(update_error) = hirn_storage::update_mutation_envelope_state(
3295                self.storage_backend(),
3296                &envelope.id,
3297                hirn_storage::MutationEnvelopeState::Failed,
3298                Some(error.to_string()),
3299            )
3300            .await
3301            {
3302                tracing::warn!(
3303                    current_id = %current.id,
3304                    next_id = %target.id,
3305                    envelope_id = %envelope.id,
3306                    error = %update_error,
3307                    "semantic merge mutation envelope fail-fast finalize failed after graph setup error"
3308                );
3309            }
3310            return Err(error);
3311        }
3312
3313        let mut appended_records = Vec::with_capacity(1 + merged_sources.len());
3314        appended_records.push(target.clone());
3315        appended_records.extend(merged_sources.iter().cloned());
3316        if let Err(error) = self.append_semantic_records(&appended_records).await {
3317            for node_id in [target.id]
3318                .into_iter()
3319                .chain(merged_sources.iter().map(|record| record.id))
3320            {
3321                let _ = self.cached_graph().remove_node(node_id).await;
3322            }
3323            if let Err(update_error) = hirn_storage::update_mutation_envelope_state(
3324                self.storage_backend(),
3325                &envelope.id,
3326                hirn_storage::MutationEnvelopeState::Failed,
3327                Some(error.to_string()),
3328            )
3329            .await
3330            {
3331                tracing::warn!(
3332                    current_id = %current.id,
3333                    next_id = %target.id,
3334                    envelope_id = %envelope.id,
3335                    error = %update_error,
3336                    "semantic merge mutation envelope fail-fast finalize failed after semantic append error"
3337                );
3338            }
3339            return Err(error);
3340        }
3341
3342        self.cache_semantic_head(&target);
3343        for merged_source in &merged_sources {
3344            self.cache_semantic_head(merged_source);
3345        }
3346
3347        let mut predecessors_removed = true;
3348
3349        match self.cached_graph().has_node(current.id).await {
3350            Ok(true) => {
3351                if let Err(error) = self.cached_graph().remove_node(current.id).await {
3352                    tracing::warn!(id = %current.id, error = %error, "failed to remove merged target predecessor graph node");
3353                    predecessors_removed = false;
3354                }
3355            }
3356            Ok(false) => {}
3357            Err(error) => {
3358                tracing::warn!(id = %current.id, error = %error, "failed to inspect merged target predecessor graph node state");
3359                predecessors_removed = false;
3360            }
3361        }
3362
3363        for source in &source_heads {
3364            match self.cached_graph().has_node(source.id).await {
3365                Ok(true) => {
3366                    if let Err(error) = self.cached_graph().remove_node(source.id).await {
3367                        tracing::warn!(id = %source.id, error = %error, "failed to remove merged source predecessor graph node");
3368                        predecessors_removed = false;
3369                    }
3370                }
3371                Ok(false) => {}
3372                Err(error) => {
3373                    tracing::warn!(id = %source.id, error = %error, "failed to inspect merged source predecessor graph node state");
3374                    predecessors_removed = false;
3375                }
3376            }
3377        }
3378
3379        if predecessors_removed {
3380            if let Err(error) = hirn_storage::update_mutation_envelope_state(
3381                self.storage_backend(),
3382                &envelope.id,
3383                hirn_storage::MutationEnvelopeState::Applied,
3384                None,
3385            )
3386            .await
3387            {
3388                tracing::warn!(
3389                    current_id = %current.id,
3390                    next_id = %target.id,
3391                    envelope_id = %envelope.id,
3392                    error = %error,
3393                    "semantic merge mutation envelope finalize failed; recovery will retry predecessor cleanup"
3394                );
3395            }
3396        }
3397
3398        self.emit_scoped(
3399            target.namespace.as_str(),
3400            target.provenance.created_by.as_str(),
3401            MemoryEvent::MemoryMerged {
3402                target_logical_memory_id: target.logical_memory_id,
3403                prior_target_revision_id: current.revision_id,
3404                new_target_revision_id: target.revision_id,
3405                source_logical_memory_ids: source_heads
3406                    .iter()
3407                    .map(|source| source.logical_memory_id)
3408                    .collect(),
3409                source_revision_ids: merged_sources
3410                    .iter()
3411                    .map(|source| source.revision_id)
3412                    .collect(),
3413                reason: target.revision_reason.clone(),
3414            },
3415        )
3416        .await;
3417
3418        Ok(SemanticMergeOutcome {
3419            target,
3420            merged_sources,
3421        })
3422    }
3423
3424    /// Retract a semantic record by appending a tombstone revision.
3425    pub(crate) async fn retract_semantic(
3426        &self,
3427        id: MemoryId,
3428        retraction: SemanticRetraction,
3429    ) -> HirnResult<SemanticRecord> {
3430        let rec = self.resolve_active_semantic_head(id).await?;
3431
3432        let actor_id = retraction.actor_id;
3433
3434        self.enforce(
3435            actor_id.as_str(),
3436            crate::policy::Action::Retract,
3437            &self.config.default_realm,
3438            rec.namespace.as_str(),
3439        )
3440        .await?;
3441
3442        let now = Timestamp::now();
3443        let new_id = MemoryId::new();
3444        let mut tombstone = rec.clone();
3445        tombstone.id = new_id;
3446        tombstone.revision_id = RevisionId::from_memory_id(new_id);
3447        tombstone.version = rec.version + 1;
3448        tombstone.revision_operation = RevisionOperation::Retract;
3449        tombstone.revision_reason.clone_from(&retraction.reason);
3450        tombstone.revision_causation_id = Some(retraction.causation_id);
3451        tombstone.created_at = now;
3452        tombstone.updated_at = now;
3453        tombstone.valid_from = retraction.observed_at.unwrap_or(now);
3454        tombstone.valid_until = None;
3455        tombstone.superseded_by = None;
3456        tombstone.merged_into = None;
3457        tombstone.provenance.created_by = actor_id;
3458        normalize_semantic_record_timestamps(&mut tombstone);
3459
3460        let envelope = build_semantic_retract_envelope(rec.id, tombstone.id)?;
3461        hirn_storage::append_mutation_envelope(self.storage_backend(), &envelope)
3462            .await
3463            .map_err(HirnError::storage)?;
3464
3465        self.append_semantic_record(&tombstone).await?;
3466
3467        self.cache_semantic_head(&tombstone);
3468
3469        let node_removed = match self.cached_graph().remove_node(rec.id).await {
3470            Ok(_) => true,
3471            Err(error) => {
3472                tracing::warn!(id = %rec.id, error = %error, "failed to remove retracted semantic graph node");
3473                false
3474            }
3475        };
3476
3477        self.emit_scoped(
3478            tombstone.namespace.as_str(),
3479            tombstone.provenance.created_by.as_str(),
3480            MemoryEvent::MemoryRetracted {
3481                logical_memory_id: rec.logical_memory_id,
3482                prior_revision_id: rec.revision_id,
3483                tombstone_revision_id: tombstone.revision_id,
3484                reason: tombstone.revision_reason.clone(),
3485            },
3486        )
3487        .await;
3488
3489        if node_removed {
3490            if let Err(error) = hirn_storage::update_mutation_envelope_state(
3491                self.storage_backend(),
3492                &envelope.id,
3493                hirn_storage::MutationEnvelopeState::Applied,
3494                None,
3495            )
3496            .await
3497            {
3498                tracing::warn!(
3499                    id = %rec.id,
3500                    envelope_id = %envelope.id,
3501                    error = %error,
3502                    "semantic retract mutation envelope finalize failed; recovery will retry graph cleanup"
3503                );
3504            }
3505        }
3506        Ok(tombstone)
3507    }
3508
3509    pub(crate) async fn reconcile_pending_semantic_create_mutations(&self) -> HirnResult<usize> {
3510        let envelopes = hirn_storage::list_pending_mutation_envelopes(
3511            self.storage_backend(),
3512            Some(SEMANTIC_CREATE_MUTATION_KIND),
3513        )
3514        .await
3515        .map_err(HirnError::storage)?;
3516        let mut reconciled = 0usize;
3517
3518        for envelope in envelopes {
3519            match self
3520                .reconcile_single_pending_semantic_create_mutation(&envelope)
3521                .await
3522            {
3523                Ok(true) => reconciled += 1,
3524                Ok(false) => {}
3525                Err(error) => {
3526                    hirn_storage::update_mutation_envelope_state(
3527                        self.storage_backend(),
3528                        &envelope.id,
3529                        hirn_storage::MutationEnvelopeState::Failed,
3530                        Some(error.to_string()),
3531                    )
3532                    .await
3533                    .map_err(HirnError::storage)?;
3534                }
3535            }
3536        }
3537
3538        Ok(reconciled)
3539    }
3540
3541    pub(crate) async fn reconcile_pending_semantic_retract_mutations(&self) -> HirnResult<usize> {
3542        let envelopes = hirn_storage::list_pending_mutation_envelopes(
3543            self.storage_backend(),
3544            Some(SEMANTIC_RETRACT_MUTATION_KIND),
3545        )
3546        .await
3547        .map_err(HirnError::storage)?;
3548        let mut reconciled = 0usize;
3549
3550        for envelope in envelopes {
3551            match self
3552                .reconcile_single_pending_semantic_retract_mutation(&envelope)
3553                .await
3554            {
3555                Ok(true) => reconciled += 1,
3556                Ok(false) => {}
3557                Err(error) => {
3558                    hirn_storage::update_mutation_envelope_state(
3559                        self.storage_backend(),
3560                        &envelope.id,
3561                        hirn_storage::MutationEnvelopeState::Failed,
3562                        Some(error.to_string()),
3563                    )
3564                    .await
3565                    .map_err(HirnError::storage)?;
3566                }
3567            }
3568        }
3569
3570        Ok(reconciled)
3571    }
3572
3573    pub(crate) async fn reconcile_pending_semantic_successor_mutations(&self) -> HirnResult<usize> {
3574        let envelopes = hirn_storage::list_pending_mutation_envelopes(
3575            self.storage_backend(),
3576            Some(SEMANTIC_SUCCESSOR_MUTATION_KIND),
3577        )
3578        .await
3579        .map_err(HirnError::storage)?;
3580        let mut reconciled = 0usize;
3581
3582        for envelope in envelopes {
3583            match self
3584                .reconcile_single_pending_semantic_successor_mutation(&envelope)
3585                .await
3586            {
3587                Ok(true) => reconciled += 1,
3588                Ok(false) => {}
3589                Err(error) => {
3590                    hirn_storage::update_mutation_envelope_state(
3591                        self.storage_backend(),
3592                        &envelope.id,
3593                        hirn_storage::MutationEnvelopeState::Failed,
3594                        Some(error.to_string()),
3595                    )
3596                    .await
3597                    .map_err(HirnError::storage)?;
3598                }
3599            }
3600        }
3601
3602        Ok(reconciled)
3603    }
3604
3605    pub(crate) async fn reconcile_pending_semantic_merge_mutations(&self) -> HirnResult<usize> {
3606        let envelopes = hirn_storage::list_pending_mutation_envelopes(
3607            self.storage_backend(),
3608            Some(SEMANTIC_MERGE_MUTATION_KIND),
3609        )
3610        .await
3611        .map_err(HirnError::storage)?;
3612        let mut reconciled = 0usize;
3613
3614        for envelope in envelopes {
3615            match self
3616                .reconcile_single_pending_semantic_merge_mutation(&envelope)
3617                .await
3618            {
3619                Ok(true) => reconciled += 1,
3620                Ok(false) => {}
3621                Err(error) => {
3622                    hirn_storage::update_mutation_envelope_state(
3623                        self.storage_backend(),
3624                        &envelope.id,
3625                        hirn_storage::MutationEnvelopeState::Failed,
3626                        Some(error.to_string()),
3627                    )
3628                    .await
3629                    .map_err(HirnError::storage)?;
3630                }
3631            }
3632        }
3633
3634        Ok(reconciled)
3635    }
3636
3637    pub(crate) async fn reconcile_pending_semantic_contradiction_sync_mutations(
3638        &self,
3639    ) -> HirnResult<usize> {
3640        let envelopes = hirn_storage::list_pending_mutation_envelopes(
3641            self.storage_backend(),
3642            Some(SEMANTIC_CONTRADICTION_SYNC_MUTATION_KIND),
3643        )
3644        .await
3645        .map_err(HirnError::storage)?;
3646        let mut reconciled = 0usize;
3647
3648        for envelope in envelopes {
3649            match self
3650                .reconcile_single_pending_semantic_contradiction_sync_mutation(&envelope)
3651                .await
3652            {
3653                Ok(true) => reconciled += 1,
3654                Ok(false) => {}
3655                Err(error) => {
3656                    hirn_storage::update_mutation_envelope_state(
3657                        self.storage_backend(),
3658                        &envelope.id,
3659                        hirn_storage::MutationEnvelopeState::Failed,
3660                        Some(error.to_string()),
3661                    )
3662                    .await
3663                    .map_err(HirnError::storage)?;
3664                }
3665            }
3666        }
3667
3668        Ok(reconciled)
3669    }
3670
3671    pub(crate) async fn reconcile_pending_semantic_purge_mutations(&self) -> HirnResult<usize> {
3672        let envelopes = hirn_storage::list_pending_mutation_envelopes(
3673            self.storage_backend(),
3674            Some(SEMANTIC_PURGE_MUTATION_KIND),
3675        )
3676        .await
3677        .map_err(HirnError::storage)?;
3678        let mut reconciled = 0usize;
3679
3680        for envelope in envelopes {
3681            match self
3682                .reconcile_single_pending_semantic_purge_mutation(&envelope)
3683                .await
3684            {
3685                Ok(true) => reconciled += 1,
3686                Ok(false) => {}
3687                Err(error) => {
3688                    hirn_storage::update_mutation_envelope_state(
3689                        self.storage_backend(),
3690                        &envelope.id,
3691                        hirn_storage::MutationEnvelopeState::Failed,
3692                        Some(error.to_string()),
3693                    )
3694                    .await
3695                    .map_err(HirnError::storage)?;
3696                }
3697            }
3698        }
3699
3700        Ok(reconciled)
3701    }
3702
3703    async fn remove_semantic_graph_nodes_if_present(&self, ids: &[MemoryId]) -> HirnResult<()> {
3704        for &id in ids {
3705            if self.cached_graph().has_node(id).await? {
3706                self.cached_graph().remove_node(id).await?;
3707            }
3708        }
3709        Ok(())
3710    }
3711
3712    async fn reconcile_single_pending_semantic_create_mutation(
3713        &self,
3714        envelope: &hirn_storage::MutationEnvelopeRecord,
3715    ) -> HirnResult<bool> {
3716        let payload = decode_semantic_create_envelope(envelope)?;
3717
3718        match self.read_semantic_record(payload.record_id).await {
3719            Ok(record) => {
3720                if self.semantic_record_is_current_head(&record).await? {
3721                    if !self.cached_graph().has_node(record.id).await? {
3722                        self.cached_graph()
3723                            .add_node(
3724                                record.id,
3725                                Layer::Semantic,
3726                                record.confidence,
3727                                record.created_at,
3728                                record.namespace,
3729                            )
3730                            .await?;
3731                    }
3732                    self.cache_semantic_head(&record);
3733                } else if self.cached_graph().has_node(record.id).await? {
3734                    self.cached_graph().remove_node(record.id).await?;
3735                }
3736
3737                hirn_storage::update_mutation_envelope_state(
3738                    self.storage_backend(),
3739                    &envelope.id,
3740                    hirn_storage::MutationEnvelopeState::Applied,
3741                    None,
3742                )
3743                .await
3744                .map_err(HirnError::storage)?;
3745                Ok(true)
3746            }
3747            Err(HirnError::NotFound(_)) => {
3748                if self.cached_graph().has_node(payload.record_id).await? {
3749                    self.cached_graph().remove_node(payload.record_id).await?;
3750                }
3751                hirn_storage::update_mutation_envelope_state(
3752                    self.storage_backend(),
3753                    &envelope.id,
3754                    hirn_storage::MutationEnvelopeState::Failed,
3755                    Some(format!(
3756                        "semantic create record missing during recovery: {}",
3757                        payload.record_id
3758                    )),
3759                )
3760                .await
3761                .map_err(HirnError::storage)?;
3762                Ok(true)
3763            }
3764            Err(error) => Err(error),
3765        }
3766    }
3767
3768    async fn apply_semantic_purge_storage_delete(
3769        &self,
3770        logical_memory_id: LogicalMemoryId,
3771    ) -> HirnResult<()> {
3772        let exact_filter = Self::semantic_logical_exact_filter(logical_memory_id);
3773        self.storage_runtime
3774            .delete_exact(
3775                hirn_storage::datasets::semantic::DATASET_NAME,
3776                &exact_filter,
3777            )
3778            .await
3779            .map_err(HirnError::storage)?;
3780        self.evict_semantic_head(logical_memory_id);
3781        Ok(())
3782    }
3783
3784    async fn reconcile_single_pending_semantic_successor_mutation(
3785        &self,
3786        envelope: &hirn_storage::MutationEnvelopeRecord,
3787    ) -> HirnResult<bool> {
3788        let payload = decode_semantic_successor_envelope(envelope)?;
3789
3790        match self.read_semantic_record(payload.successor_id).await {
3791            Ok(successor) => {
3792                if self.semantic_record_is_current_head(&successor).await?
3793                    && !self.cached_graph().has_node(payload.successor_id).await?
3794                {
3795                    self.cached_graph()
3796                        .add_node(
3797                            successor.id,
3798                            Layer::Semantic,
3799                            successor.confidence,
3800                            successor.created_at,
3801                            successor.namespace,
3802                        )
3803                        .await?;
3804                }
3805                if self
3806                    .cached_graph()
3807                    .has_node(payload.prior_record_id)
3808                    .await?
3809                {
3810                    self.cached_graph()
3811                        .remove_node(payload.prior_record_id)
3812                        .await?;
3813                }
3814                hirn_storage::update_mutation_envelope_state(
3815                    self.storage_backend(),
3816                    &envelope.id,
3817                    hirn_storage::MutationEnvelopeState::Applied,
3818                    None,
3819                )
3820                .await
3821                .map_err(HirnError::storage)?;
3822                Ok(true)
3823            }
3824            Err(HirnError::NotFound(_)) => {
3825                self.remove_semantic_graph_nodes_if_present(std::slice::from_ref(
3826                    &payload.successor_id,
3827                ))
3828                .await?;
3829                hirn_storage::update_mutation_envelope_state(
3830                    self.storage_backend(),
3831                    &envelope.id,
3832                    hirn_storage::MutationEnvelopeState::Failed,
3833                    Some(format!(
3834                        "semantic successor record missing during recovery: {}",
3835                        payload.successor_id
3836                    )),
3837                )
3838                .await
3839                .map_err(HirnError::storage)?;
3840                Ok(true)
3841            }
3842            Err(error) => Err(error),
3843        }
3844    }
3845
3846    async fn reconcile_single_pending_semantic_merge_mutation(
3847        &self,
3848        envelope: &hirn_storage::MutationEnvelopeRecord,
3849    ) -> HirnResult<bool> {
3850        let payload = decode_semantic_merge_envelope(envelope)?;
3851
3852        let mut merged_node_ids = Vec::with_capacity(1 + payload.merged_source_ids.len());
3853        merged_node_ids.push(payload.merged_target_id);
3854        merged_node_ids.extend(payload.merged_source_ids.iter().copied());
3855
3856        let target = match self.read_semantic_record(payload.merged_target_id).await {
3857            Ok(target) => target,
3858            Err(HirnError::NotFound(_)) => {
3859                self.remove_semantic_graph_nodes_if_present(&merged_node_ids)
3860                    .await?;
3861                hirn_storage::update_mutation_envelope_state(
3862                    self.storage_backend(),
3863                    &envelope.id,
3864                    hirn_storage::MutationEnvelopeState::Failed,
3865                    Some(format!(
3866                        "semantic merge target missing during recovery: {}",
3867                        payload.merged_target_id
3868                    )),
3869                )
3870                .await
3871                .map_err(HirnError::storage)?;
3872                return Ok(true);
3873            }
3874            Err(error) => return Err(error),
3875        };
3876
3877        let mut merged_sources = Vec::with_capacity(payload.merged_source_ids.len());
3878        for merged_source_id in &payload.merged_source_ids {
3879            match self.read_semantic_record(*merged_source_id).await {
3880                Ok(record) => merged_sources.push(record),
3881                Err(HirnError::NotFound(_)) => {
3882                    self.remove_semantic_graph_nodes_if_present(&merged_node_ids)
3883                        .await?;
3884                    hirn_storage::update_mutation_envelope_state(
3885                        self.storage_backend(),
3886                        &envelope.id,
3887                        hirn_storage::MutationEnvelopeState::Failed,
3888                        Some(format!(
3889                            "semantic merge source missing during recovery: {}",
3890                            merged_source_id
3891                        )),
3892                    )
3893                    .await
3894                    .map_err(HirnError::storage)?;
3895                    return Ok(true);
3896                }
3897                Err(error) => return Err(error),
3898            }
3899        }
3900
3901        if self.semantic_record_is_current_head(&target).await?
3902            && !self.cached_graph().has_node(target.id).await?
3903        {
3904            self.cached_graph()
3905                .add_node(
3906                    target.id,
3907                    Layer::Semantic,
3908                    target.confidence,
3909                    target.created_at,
3910                    target.namespace,
3911                )
3912                .await?;
3913        }
3914
3915        for merged_source in &merged_sources {
3916            if self.semantic_record_is_current_head(merged_source).await?
3917                && !self.cached_graph().has_node(merged_source.id).await?
3918            {
3919                self.cached_graph()
3920                    .add_node(
3921                        merged_source.id,
3922                        Layer::Semantic,
3923                        merged_source.confidence,
3924                        merged_source.created_at,
3925                        merged_source.namespace,
3926                    )
3927                    .await?;
3928            }
3929        }
3930
3931        let mut predecessor_ids = Vec::with_capacity(1 + payload.prior_source_ids.len());
3932        predecessor_ids.push(payload.prior_target_id);
3933        predecessor_ids.extend(payload.prior_source_ids.iter().copied());
3934        self.remove_semantic_graph_nodes_if_present(&predecessor_ids)
3935            .await?;
3936
3937        hirn_storage::update_mutation_envelope_state(
3938            self.storage_backend(),
3939            &envelope.id,
3940            hirn_storage::MutationEnvelopeState::Applied,
3941            None,
3942        )
3943        .await
3944        .map_err(HirnError::storage)?;
3945        Ok(true)
3946    }
3947
3948    async fn reconcile_single_pending_semantic_contradiction_sync_mutation(
3949        &self,
3950        envelope: &hirn_storage::MutationEnvelopeRecord,
3951    ) -> HirnResult<bool> {
3952        let payload = decode_semantic_contradiction_sync_envelope(envelope)?;
3953
3954        let mut successors = Vec::with_capacity(payload.successor_ids.len());
3955        for successor_id in &payload.successor_ids {
3956            match self.read_semantic_record(*successor_id).await {
3957                Ok(record) => successors.push(record),
3958                Err(HirnError::NotFound(_)) => {
3959                    self.remove_semantic_graph_nodes_if_present(&payload.successor_ids)
3960                        .await?;
3961                    hirn_storage::update_mutation_envelope_state(
3962                        self.storage_backend(),
3963                        &envelope.id,
3964                        hirn_storage::MutationEnvelopeState::Failed,
3965                        Some(format!(
3966                            "semantic contradiction successor missing during recovery: {}",
3967                            successor_id
3968                        )),
3969                    )
3970                    .await
3971                    .map_err(HirnError::storage)?;
3972                    return Ok(true);
3973                }
3974                Err(error) => return Err(error),
3975            }
3976        }
3977
3978        for successor in &successors {
3979            if self.semantic_record_is_current_head(successor).await?
3980                && !self.cached_graph().has_node(successor.id).await?
3981            {
3982                self.cached_graph()
3983                    .add_node(
3984                        successor.id,
3985                        Layer::Semantic,
3986                        successor.confidence,
3987                        successor.created_at,
3988                        successor.namespace,
3989                    )
3990                    .await?;
3991            }
3992        }
3993
3994        self.remove_semantic_graph_nodes_if_present(&payload.prior_record_ids)
3995            .await?;
3996
3997        hirn_storage::update_mutation_envelope_state(
3998            self.storage_backend(),
3999            &envelope.id,
4000            hirn_storage::MutationEnvelopeState::Applied,
4001            None,
4002        )
4003        .await
4004        .map_err(HirnError::storage)?;
4005        Ok(true)
4006    }
4007
4008    async fn reconcile_single_pending_semantic_purge_mutation(
4009        &self,
4010        envelope: &hirn_storage::MutationEnvelopeRecord,
4011    ) -> HirnResult<bool> {
4012        let payload = decode_semantic_purge_envelope(envelope)?;
4013
4014        self.apply_semantic_purge_storage_delete(payload.logical_memory_id)
4015            .await?;
4016        self.remove_semantic_graph_nodes_if_present(&payload.revision_ids)
4017            .await?;
4018
4019        hirn_storage::update_mutation_envelope_state(
4020            self.storage_backend(),
4021            &envelope.id,
4022            hirn_storage::MutationEnvelopeState::Applied,
4023            None,
4024        )
4025        .await
4026        .map_err(HirnError::storage)?;
4027        Ok(true)
4028    }
4029
4030    async fn reconcile_single_pending_semantic_retract_mutation(
4031        &self,
4032        envelope: &hirn_storage::MutationEnvelopeRecord,
4033    ) -> HirnResult<bool> {
4034        let payload = decode_semantic_retract_envelope(envelope)?;
4035
4036        match self.read_semantic_record(payload.tombstone_id).await {
4037            Ok(tombstone) => {
4038                self.cache_semantic_head(&tombstone);
4039                if self
4040                    .cached_graph()
4041                    .has_node(payload.prior_record_id)
4042                    .await?
4043                {
4044                    self.cached_graph()
4045                        .remove_node(payload.prior_record_id)
4046                        .await?;
4047                }
4048                hirn_storage::update_mutation_envelope_state(
4049                    self.storage_backend(),
4050                    &envelope.id,
4051                    hirn_storage::MutationEnvelopeState::Applied,
4052                    None,
4053                )
4054                .await
4055                .map_err(HirnError::storage)?;
4056                Ok(true)
4057            }
4058            Err(HirnError::NotFound(_)) => {
4059                hirn_storage::update_mutation_envelope_state(
4060                    self.storage_backend(),
4061                    &envelope.id,
4062                    hirn_storage::MutationEnvelopeState::Failed,
4063                    Some(format!(
4064                        "semantic retract tombstone missing during recovery: {}",
4065                        payload.tombstone_id
4066                    )),
4067                )
4068                .await
4069                .map_err(HirnError::storage)?;
4070                Ok(true)
4071            }
4072            Err(error) => Err(error),
4073        }
4074    }
4075
4076    /// Permanently purge all revisions for a semantic logical memory.
4077    pub(crate) async fn purge_semantic(&self, id: MemoryId) -> HirnResult<()> {
4078        self.purge_semantic_as(id, None).await
4079    }
4080
4081    pub(crate) async fn purge_semantic_as(
4082        &self,
4083        id: MemoryId,
4084        actor_id: Option<AgentId>,
4085    ) -> HirnResult<()> {
4086        let rec = self.read_semantic_record(id).await?;
4087        let actor_id = actor_id.unwrap_or_else(|| rec.provenance.created_by.clone());
4088        self.enforce(
4089            actor_id.as_str(),
4090            crate::policy::Action::Purge,
4091            &self.config.default_realm,
4092            rec.namespace.as_str(),
4093        )
4094        .await?;
4095
4096        let history = self.semantic_history(id).await?;
4097        let revision_ids = history
4098            .iter()
4099            .map(|revision| revision.id)
4100            .collect::<Vec<_>>();
4101        let envelope = build_semantic_purge_envelope(rec.logical_memory_id, revision_ids.clone())?;
4102        hirn_storage::append_mutation_envelope(self.storage_backend(), &envelope)
4103            .await
4104            .map_err(HirnError::storage)?;
4105
4106        self.apply_semantic_purge_storage_delete(rec.logical_memory_id)
4107            .await?;
4108
4109        let graph_cleanup_applied = match self
4110            .remove_semantic_graph_nodes_if_present(&revision_ids)
4111            .await
4112        {
4113            Ok(()) => true,
4114            Err(error) => {
4115                tracing::warn!(
4116                    logical_memory_id = %rec.logical_memory_id,
4117                    envelope_id = %envelope.id,
4118                    error = %error,
4119                    "semantic purge graph cleanup incomplete; recovery will retry"
4120                );
4121                false
4122            }
4123        };
4124
4125        self.emit_scoped(
4126            rec.namespace.as_str(),
4127            actor_id.as_str(),
4128            MemoryEvent::Forgotten { id },
4129        )
4130        .await;
4131
4132        if graph_cleanup_applied {
4133            if let Err(error) = hirn_storage::update_mutation_envelope_state(
4134                self.storage_backend(),
4135                &envelope.id,
4136                hirn_storage::MutationEnvelopeState::Applied,
4137                None,
4138            )
4139            .await
4140            {
4141                tracing::warn!(
4142                    logical_memory_id = %rec.logical_memory_id,
4143                    envelope_id = %envelope.id,
4144                    error = %error,
4145                    "semantic purge mutation envelope finalize failed; recovery will retry graph cleanup"
4146                );
4147            }
4148        }
4149        Ok(())
4150    }
4151
4152    // ── Cross-Layer ─────────────────────────────────────────────────────
4153
4154    /// Retrieve any memory record by ID, regardless of layer.
4155    pub(crate) async fn get_memory(&self, id: MemoryId) -> HirnResult<MemoryRecord> {
4156        let exact_filter = hirn_storage::store::ExactMatchFilter::utf8_value("id", id.to_string());
4157        let opts = || hirn_storage::store::ScanOptions {
4158            exact_filter: Some(exact_filter.clone()),
4159            limit: Some(1),
4160            ..Default::default()
4161        };
4162
4163        // Try working memory first.
4164        if let Some(record) = Self::scan_memory_dataset_record(
4165            self.storage_backend(),
4166            "memory lookup",
4167            hirn_storage::datasets::working::DATASET_NAME,
4168            opts(),
4169            hirn_storage::datasets::working::from_batch,
4170        )
4171        .await?
4172        {
4173            return Ok(MemoryRecord::Working(record));
4174        }
4175
4176        // Try episodic.
4177        if let Some(record) = Self::scan_memory_dataset_record(
4178            self.storage_backend(),
4179            "memory lookup",
4180            hirn_storage::datasets::episodic::DATASET_NAME,
4181            opts(),
4182            hirn_storage::datasets::episodic::from_batch,
4183        )
4184        .await?
4185        {
4186            return Ok(MemoryRecord::Episodic(record));
4187        }
4188
4189        // Try semantic.
4190        if let Some(record) = Self::scan_memory_dataset_record(
4191            self.storage_backend(),
4192            "memory lookup",
4193            hirn_storage::datasets::semantic::DATASET_NAME,
4194            opts(),
4195            hirn_storage::datasets::semantic::from_batch,
4196        )
4197        .await?
4198        {
4199            return Ok(MemoryRecord::Semantic(record));
4200        }
4201
4202        // Try procedural.
4203        if let Some(record) = Self::scan_memory_dataset_record(
4204            self.storage_backend(),
4205            "memory lookup",
4206            hirn_storage::datasets::procedural::DATASET_NAME,
4207            opts(),
4208            hirn_storage::datasets::procedural::from_batch,
4209        )
4210        .await?
4211        {
4212            return Ok(MemoryRecord::Procedural(record));
4213        }
4214
4215        Err(HirnError::NotFound(format!("memory record {id}")))
4216    }
4217
4218    /// Batch-fetch multiple records by ID.
4219    ///
4220    /// Issues at most 4 storage queries (one per dataset) regardless of the
4221    /// number of IDs, eliminating the N+1 query anti-pattern.
4222    pub(crate) async fn get_memories_batch(
4223        &self,
4224        ids: &[MemoryId],
4225    ) -> HirnResult<HashMap<MemoryId, MemoryRecord>> {
4226        if ids.is_empty() {
4227            return Ok(HashMap::new());
4228        }
4229
4230        let (working, episodic, semantic, procedural) = tokio::try_join!(
4231            Self::scan_memory_dataset_records_for_ids(
4232                self.storage_backend(),
4233                "recall hydration",
4234                hirn_storage::datasets::working::DATASET_NAME,
4235                ids,
4236                hirn_storage::datasets::working::from_batch,
4237                MemoryRecord::Working,
4238            ),
4239            Self::scan_memory_dataset_records_for_ids(
4240                self.storage_backend(),
4241                "recall hydration",
4242                hirn_storage::datasets::episodic::DATASET_NAME,
4243                ids,
4244                hirn_storage::datasets::episodic::from_batch,
4245                MemoryRecord::Episodic,
4246            ),
4247            Self::scan_memory_dataset_records_for_ids(
4248                self.storage_backend(),
4249                "recall hydration",
4250                hirn_storage::datasets::semantic::DATASET_NAME,
4251                ids,
4252                hirn_storage::datasets::semantic::from_batch,
4253                MemoryRecord::Semantic,
4254            ),
4255            Self::scan_memory_dataset_records_for_ids(
4256                self.storage_backend(),
4257                "recall hydration",
4258                hirn_storage::datasets::procedural::DATASET_NAME,
4259                ids,
4260                hirn_storage::datasets::procedural::from_batch,
4261                MemoryRecord::Procedural,
4262            ),
4263        )?;
4264
4265        let mut result: HashMap<MemoryId, MemoryRecord> = HashMap::with_capacity(ids.len());
4266        result.extend(working);
4267        result.extend(episodic);
4268        result.extend(semantic);
4269        result.extend(procedural);
4270
4271        Ok(result)
4272    }
4273
4274    pub(crate) async fn get_memories_batch_with_hints(
4275        &self,
4276        ids: &[MemoryId],
4277        layer_hints: &HashMap<MemoryId, Layer>,
4278    ) -> HirnResult<HashMap<MemoryId, MemoryRecord>> {
4279        if ids.is_empty() {
4280            return Ok(HashMap::new());
4281        }
4282
4283        let mut working_ids = Vec::new();
4284        let mut episodic_ids = Vec::new();
4285        let mut semantic_ids = Vec::new();
4286        let mut procedural_ids = Vec::new();
4287        let mut unknown_ids = Vec::new();
4288
4289        for &id in ids {
4290            match layer_hints.get(&id).copied() {
4291                Some(Layer::Working) => working_ids.push(id),
4292                Some(Layer::Episodic) => episodic_ids.push(id),
4293                Some(Layer::Semantic) => semantic_ids.push(id),
4294                Some(Layer::Procedural) => procedural_ids.push(id),
4295                None => unknown_ids.push(id),
4296            }
4297        }
4298
4299        let mut working_scan_ids = working_ids;
4300        let mut episodic_scan_ids = episodic_ids;
4301        let mut semantic_scan_ids = semantic_ids;
4302        let mut procedural_scan_ids = procedural_ids;
4303
4304        if !unknown_ids.is_empty() {
4305            working_scan_ids.extend_from_slice(&unknown_ids);
4306            episodic_scan_ids.extend_from_slice(&unknown_ids);
4307            semantic_scan_ids.extend_from_slice(&unknown_ids);
4308            procedural_scan_ids.extend_from_slice(&unknown_ids);
4309        }
4310
4311        let (working, episodic, semantic, procedural) = tokio::try_join!(
4312            Self::scan_memory_dataset_records_for_ids_projected(
4313                self.storage_backend(),
4314                "recall hydration",
4315                hirn_storage::datasets::working::DATASET_NAME,
4316                &working_scan_ids,
4317                None, // working has no embedding column
4318                hirn_storage::datasets::working::from_batch,
4319                MemoryRecord::Working,
4320            ),
4321            Self::scan_memory_dataset_records_for_ids_projected(
4322                self.storage_backend(),
4323                "recall hydration",
4324                hirn_storage::datasets::episodic::DATASET_NAME,
4325                &episodic_scan_ids,
4326                Some(hirn_storage::datasets::episodic::RECALL_HYDRATION_COLUMNS),
4327                hirn_storage::datasets::episodic::from_batch,
4328                MemoryRecord::Episodic,
4329            ),
4330            Self::scan_memory_dataset_records_for_ids_projected(
4331                self.storage_backend(),
4332                "recall hydration",
4333                hirn_storage::datasets::semantic::DATASET_NAME,
4334                &semantic_scan_ids,
4335                Some(hirn_storage::datasets::semantic::RECALL_HYDRATION_COLUMNS),
4336                hirn_storage::datasets::semantic::from_batch,
4337                MemoryRecord::Semantic,
4338            ),
4339            Self::scan_memory_dataset_records_for_ids_projected(
4340                self.storage_backend(),
4341                "recall hydration",
4342                hirn_storage::datasets::procedural::DATASET_NAME,
4343                &procedural_scan_ids,
4344                Some(hirn_storage::datasets::procedural::RECALL_HYDRATION_COLUMNS),
4345                hirn_storage::datasets::procedural::from_batch,
4346                MemoryRecord::Procedural,
4347            ),
4348        )?;
4349
4350        let mut result: HashMap<MemoryId, MemoryRecord> = HashMap::with_capacity(ids.len());
4351        result.extend(working);
4352        result.extend(episodic);
4353        result.extend(semantic);
4354        result.extend(procedural);
4355
4356        Ok(result)
4357    }
4358
4359    fn memory_ids_exact_filter(ids: &[MemoryId]) -> Option<hirn_storage::store::ExactMatchFilter> {
4360        hirn_storage::store::ExactMatchFilter::utf8_values(
4361            "id",
4362            ids.iter().map(ToString::to_string),
4363        )
4364    }
4365
4366    async fn scan_memory_dataset_records_for_ids<T, F>(
4367        storage: &dyn hirn_storage::PhysicalStore,
4368        context: &'static str,
4369        dataset: &'static str,
4370        ids: &[MemoryId],
4371        from_batch: fn(&arrow_array::RecordBatch) -> Result<Vec<T>, hirn_storage::HirnDbError>,
4372        wrap: F,
4373    ) -> HirnResult<HashMap<MemoryId, MemoryRecord>>
4374    where
4375        F: Fn(T) -> MemoryRecord,
4376    {
4377        Self::scan_memory_dataset_records_for_ids_projected(
4378            storage, context, dataset, ids, None, from_batch, wrap,
4379        )
4380        .await
4381    }
4382
4383    /// Same as `scan_memory_dataset_records_for_ids` but with an optional
4384    /// column projection.  Pass `Some(cols)` to read only a subset of columns
4385    /// and avoid loading large fields (e.g. `embedding`) that are not needed
4386    /// for the current operation.
4387    async fn scan_memory_dataset_records_for_ids_projected<T, F>(
4388        storage: &dyn hirn_storage::PhysicalStore,
4389        context: &'static str,
4390        dataset: &'static str,
4391        ids: &[MemoryId],
4392        columns: Option<&[&str]>,
4393        from_batch: fn(&arrow_array::RecordBatch) -> Result<Vec<T>, hirn_storage::HirnDbError>,
4394        wrap: F,
4395    ) -> HirnResult<HashMap<MemoryId, MemoryRecord>>
4396    where
4397        F: Fn(T) -> MemoryRecord,
4398    {
4399        let Some(exact_filter) = Self::memory_ids_exact_filter(ids) else {
4400            return Ok(HashMap::new());
4401        };
4402
4403        Self::scan_memory_dataset_records(
4404            storage,
4405            context,
4406            dataset,
4407            hirn_storage::store::ScanOptions {
4408                exact_filter: Some(exact_filter),
4409                columns: columns.map(|cols| cols.iter().map(|c| (*c).to_string()).collect()),
4410                limit: None,
4411                ..Default::default()
4412            },
4413            from_batch,
4414            wrap,
4415        )
4416        .await
4417    }
4418
4419    async fn scan_memory_dataset_record<T>(
4420        storage: &dyn hirn_storage::PhysicalStore,
4421        context: &'static str,
4422        dataset: &'static str,
4423        options: hirn_storage::store::ScanOptions,
4424        from_batch: fn(&arrow_array::RecordBatch) -> Result<Vec<T>, hirn_storage::HirnDbError>,
4425    ) -> HirnResult<Option<T>> {
4426        let mut stream = storage
4427            .scan_stream(dataset, options)
4428            .await
4429            .map_err(|error| {
4430                HirnError::storage(format!(
4431                    "failed to scan {context} dataset `{dataset}`: {error}"
4432                ))
4433            })?;
4434
4435        while let Some(batch) = stream.try_next().await.map_err(|error| {
4436            HirnError::storage(format!(
4437                "failed to stream {context} dataset `{dataset}`: {error}"
4438            ))
4439        })? {
4440            let records = from_batch(&batch).map_err(|error| {
4441                HirnError::storage(format!(
4442                    "failed to decode {context} dataset `{dataset}`: {error}"
4443                ))
4444            })?;
4445            if let Some(record) = records.into_iter().next() {
4446                return Ok(Some(record));
4447            }
4448        }
4449
4450        Ok(None)
4451    }
4452
4453    async fn scan_memory_dataset_records<T, F>(
4454        storage: &dyn hirn_storage::PhysicalStore,
4455        context: &'static str,
4456        dataset: &'static str,
4457        options: hirn_storage::store::ScanOptions,
4458        from_batch: fn(&arrow_array::RecordBatch) -> Result<Vec<T>, hirn_storage::HirnDbError>,
4459        wrap: F,
4460    ) -> HirnResult<HashMap<MemoryId, MemoryRecord>>
4461    where
4462        F: Fn(T) -> MemoryRecord,
4463    {
4464        let mut stream = storage
4465            .scan_stream(dataset, options)
4466            .await
4467            .map_err(|error| {
4468                HirnError::storage(format!(
4469                    "failed to scan {context} dataset `{dataset}`: {error}"
4470                ))
4471            })?;
4472
4473        let mut records = HashMap::new();
4474        while let Some(batch) = stream.try_next().await.map_err(|error| {
4475            HirnError::storage(format!(
4476                "failed to stream {context} dataset `{dataset}`: {error}"
4477            ))
4478        })? {
4479            let entries = from_batch(&batch).map_err(|error| {
4480                HirnError::storage(format!(
4481                    "failed to decode {context} dataset `{dataset}`: {error}"
4482                ))
4483            })?;
4484            for entry in entries {
4485                let record = wrap(entry);
4486                records.insert(record.id(), record);
4487            }
4488        }
4489
4490        Ok(records)
4491    }
4492    /// Get record counts per layer.
4493    pub(crate) async fn count(&self) -> HirnResult<LayerCounts> {
4494        let storage = self.storage_backend();
4495        let working = self
4496            .storage_backend()
4497            .count(hirn_storage::datasets::working::DATASET_NAME, None)
4498            .await
4499            .map_err(|e| HirnError::storage(e))?;
4500
4501        let episodic = storage
4502            .count(hirn_storage::datasets::episodic::DATASET_NAME, None)
4503            .await
4504            .map_err(|e| HirnError::storage(e))?;
4505
4506        let semantic = storage
4507            .count(hirn_storage::datasets::semantic::DATASET_NAME, None)
4508            .await
4509            .map_err(|e| HirnError::storage(e))?;
4510
4511        let procedural = storage
4512            .count(hirn_storage::datasets::procedural::DATASET_NAME, None)
4513            .await
4514            .map_err(|e| HirnError::storage(e))?;
4515
4516        Ok(LayerCounts {
4517            working,
4518            episodic,
4519            semantic,
4520            procedural,
4521            total: working + episodic + semantic + procedural,
4522        })
4523    }
4524
4525    /// Get database statistics.
4526    pub(crate) async fn stats(&self) -> HirnResult<DbStats> {
4527        let counts = self.count().await?;
4528
4529        let file_size_bytes = self.file_size_bytes();
4530
4531        let edge_count = self.cached_graph().edge_count().await.unwrap_or(0) as u64;
4532
4533        let node_count = self.cached_graph().node_count().await.unwrap_or(0) as u64;
4534
4535        // Emit gauges for observability.
4536        metrics::gauge!(crate::metrics::MEMORY_COUNT).set(counts.total as f64);
4537        metrics::gauge!(crate::metrics::GRAPH_NODE_COUNT).set(node_count as f64);
4538        metrics::gauge!(crate::metrics::GRAPH_EDGES_TOTAL).set(edge_count as f64);
4539
4540        Ok(DbStats {
4541            working_count: counts.working,
4542            episodic_count: counts.episodic,
4543            semantic_count: counts.semantic,
4544            procedural_count: counts.procedural,
4545            total_count: counts.total,
4546            edge_count,
4547            file_size_bytes,
4548        })
4549    }
4550
4551    // ── Temporal Index Queries ───────────────────────────────────────────
4552
4553    /// Retrieve episodic records within a timestamp range (inclusive start,
4554    /// exclusive end), in chronological order.
4555    pub(crate) async fn episodes_in_range(
4556        &self,
4557        after: Timestamp,
4558        before: Timestamp,
4559    ) -> HirnResult<Vec<EpisodicRecord>> {
4560        self.list_episodes(&EpisodicFilter {
4561            after: Some(after),
4562            before: Some(before),
4563            ..Default::default()
4564        })
4565        .await
4566    }
4567
4568    /// Retrieve all episodic records after the given timestamp.
4569    pub(crate) async fn episodes_after(&self, after: Timestamp) -> HirnResult<Vec<EpisodicRecord>> {
4570        self.list_episodes(&EpisodicFilter {
4571            after: Some(after),
4572            ..Default::default()
4573        })
4574        .await
4575    }
4576
4577    /// Retrieve all episodic records before the given timestamp.
4578    pub(crate) async fn episodes_before(
4579        &self,
4580        before: Timestamp,
4581    ) -> HirnResult<Vec<EpisodicRecord>> {
4582        self.list_episodes(&EpisodicFilter {
4583            before: Some(before),
4584            ..Default::default()
4585        })
4586        .await
4587    }
4588
4589    /// List episodic records in reverse chronological order.
4590    pub(crate) async fn episodes_reverse(&self) -> HirnResult<Vec<EpisodicRecord>> {
4591        let mut records = self.list_episodes(&EpisodicFilter::default()).await?;
4592        records.reverse();
4593        Ok(records)
4594    }
4595
4596    // ── Semantic Recall ─────────────────────────────────────────────────
4597
4598    /// Start a recall (semantic search) query.
4599    pub(crate) fn recall(&self, query_embedding: Vec<f32>) -> RecallBuilder<'_> {
4600        RecallBuilder::new(self, query_embedding)
4601    }
4602
4603    /// Execute multiple recall queries concurrently. Returns per-query results.
4604    pub async fn batch_recall<'a>(
4605        &'a self,
4606        builders: Vec<RecallBuilder<'a>>,
4607    ) -> Vec<HirnResult<Vec<RecallResult>>> {
4608        if builders.is_empty() {
4609            return Vec::new();
4610        }
4611
4612        // Cedar: deduplicate enforcement across all builders.
4613        {
4614            let mut checked: HashSet<(String, String)> = HashSet::new();
4615            for b in &builders {
4616                let agent = b.agent_id.as_deref().unwrap_or("anonymous").to_string();
4617                let ns = b
4618                    .namespace
4619                    .as_ref()
4620                    .map_or(String::new(), |n| n.as_str().to_string());
4621                if checked.insert((agent.clone(), ns.clone())) {
4622                    if let Err(e) = self
4623                        .enforce(
4624                            &agent,
4625                            crate::policy::Action::Recall,
4626                            &self.config.default_realm,
4627                            &ns,
4628                        )
4629                        .await
4630                    {
4631                        let msg = format!("{e}");
4632                        return builders
4633                            .iter()
4634                            .map(|_| Err(HirnError::AccessDenied(msg.clone())))
4635                            .collect();
4636                    }
4637                }
4638            }
4639        }
4640
4641        // Execute all queries concurrently.
4642        let futs = builders.into_iter().map(|b| b.execute());
4643        futures::future::join_all(futs).await
4644    }
4645
4646    /// Start a THINK query: recall + context assembly.
4647    pub(crate) fn think(&self, query_embedding: Vec<f32>) -> crate::think::ThinkBuilder<'_> {
4648        crate::think::ThinkBuilder::new(self, query_embedding)
4649    }
4650
4651    /// Start an INSPECT query: metadata, graph neighborhood, and trust for a record.
4652    pub(crate) fn inspect(&self, id: MemoryId) -> crate::inspect::InspectBuilder<'_> {
4653        crate::inspect::InspectBuilder::new(self, id)
4654    }
4655
4656    /// Start a TRACE query: provenance lineage for a specific record.
4657    pub(crate) fn trace(&self, id: MemoryId) -> crate::trace::TraceBuilder<'_> {
4658        crate::trace::TraceBuilder::new(self, id)
4659    }
4660
4661    /// Get the configured embedding dimensions.
4662    pub fn embedding_dims(&self) -> usize {
4663        self.config.embedding_dimensions.as_usize()
4664    }
4665
4666    /// Mark two semantic records as contradicting each other.
4667    pub(crate) async fn mark_contradiction(
4668        &self,
4669        id: MemoryId,
4670        contradicts: MemoryId,
4671    ) -> HirnResult<()> {
4672        let _ = self
4673            .synchronize_contradiction_refs(id, contradicts, None)
4674            .await?;
4675        Ok(())
4676    }
4677
4678    /// F-015: Flush buffered semantic access counts to storage.
4679    /// Called during consolidation and on close/drop.
4680    pub(crate) async fn flush_semantic_access(&self) -> HirnResult<()> {
4681        let pending = self.graph_runtime().drain_semantic_access();
4682
4683        if pending.is_empty() {
4684            return Ok(());
4685        }
4686
4687        for (id, count) in &pending {
4688            if let Ok(mut record) = self.read_semantic_record(*id).await {
4689                for _ in 0..*count {
4690                    record.record_access();
4691                }
4692                let _ = self.overwrite_semantic_record(&record).await;
4693            }
4694        }
4695
4696        Ok(())
4697    }
4698}
4699
4700#[cfg(test)]
4701mod tests {
4702    use std::sync::Arc;
4703
4704    use hirn_core::RevisionOperation;
4705    use hirn_core::Timestamp;
4706    use hirn_core::episodic::EpisodicRecord;
4707    use hirn_core::id::MemoryId;
4708    use hirn_core::record::MemoryRecord;
4709    use hirn_core::revision::{LogicalMemoryId, RevisionId};
4710    use hirn_core::types::{AgentId, EdgeRelation, EventType, Origin};
4711    use hirn_storage::memory_store::MemoryStore;
4712
4713    use super::*;
4714    use crate::retrieval::recall::{RecallPresentation, RecallResult};
4715    use crate::scoring::ScoreBreakdown;
4716
4717    fn agent() -> AgentId {
4718        AgentId::new("semantic_test").unwrap()
4719    }
4720
4721    async fn temp_db() -> HirnDB {
4722        let dir = tempfile::tempdir().unwrap();
4723        let config = HirnConfig::builder()
4724            .db_path(dir.path().join("semantic-db"))
4725            .working_memory_token_limit(1000)
4726            .build()
4727            .unwrap();
4728        HirnDB::open_with_config(config, Arc::new(MemoryStore::new()))
4729            .await
4730            .unwrap()
4731    }
4732
4733    fn semantic_record(
4734        id: MemoryId,
4735        logical_memory_id: LogicalMemoryId,
4736        created_at: Timestamp,
4737        version: u32,
4738    ) -> SemanticRecord {
4739        let mut record = SemanticRecord::builder()
4740            .concept("deploy_status")
4741            .description("deployment status")
4742            .agent_id(agent())
4743            .build()
4744            .unwrap();
4745        record.id = id;
4746        record.logical_memory_id = logical_memory_id;
4747        record.revision_id = RevisionId::from_memory_id(id);
4748        record.version = version;
4749        record.created_at = created_at;
4750        record.updated_at = created_at;
4751        record.last_accessed = created_at;
4752        record.valid_from = created_at;
4753        record.valid_until = None;
4754        record
4755    }
4756
4757    fn recall_result(record: MemoryRecord) -> RecallResult {
4758        RecallResult {
4759            record,
4760            similarity: 1.0,
4761            composite_score: 1.0,
4762            score_breakdown: ScoreBreakdown {
4763                similarity: 1.0,
4764                importance: 1.0,
4765                recency: 1.0,
4766                activation: 0.0,
4767                causal_relevance: 0.0,
4768                surprise: 0.0,
4769                source_reliability: 1.0,
4770            },
4771            revision: None,
4772            resource_evidence: Vec::new(),
4773            resource_preview_packages: Vec::new(),
4774            resource_score_attribution: Vec::new(),
4775            presentation: RecallPresentation::default(),
4776        }
4777    }
4778
4779    #[test]
4780    fn revision_snapshot_preserves_exact_recorded_boundary_when_timestamps_tie() {
4781        let created_at = Timestamp::from_millis(1_700_000_000_000);
4782        let original_id = MemoryId::parse("01ARZ3NDEKTSV4RRFFQ69G5FAW").unwrap();
4783        let successor_id = MemoryId::parse("01ARZ3NDEKTSV4RRFFQ69G5FAV").unwrap();
4784        let logical_memory_id = LogicalMemoryId::from_memory_id(original_id);
4785
4786        let original = semantic_record(original_id, logical_memory_id, created_at, 1);
4787        let mut successor = semantic_record(successor_id, logical_memory_id, created_at, 2);
4788        successor.revision_operation = RevisionOperation::Correct;
4789        successor.revision_reason = Some("post-incident correction".to_string());
4790        successor.revision_causation_id = Some(original.id);
4791
4792        let revision = semantic_snapshot_head_recorded_at_snapshot(
4793            &[original.clone(), successor],
4794            ResolvedRecallSnapshot::Revision {
4795                cutoff: created_at,
4796                revision_id: original.revision_id,
4797                logical_memory_id,
4798                version: original.version,
4799            },
4800        )
4801        .unwrap();
4802
4803        assert_eq!(revision.id, original.id);
4804        assert_eq!(revision.revision_id, original.revision_id);
4805        assert_eq!(revision.version, 1);
4806    }
4807
4808    #[tokio::test(flavor = "multi_thread")]
4809    async fn override_preserves_conflict_group_visibility_via_carried_contradictions() {
4810        let db = temp_db().await;
4811        let id_a = db
4812            .store_semantic(
4813                SemanticRecord::builder()
4814                    .concept("status_a")
4815                    .description("deployment succeeded")
4816                    .origin(Origin::CrossAgent)
4817                    .agent_id(agent())
4818                    .build()
4819                    .unwrap(),
4820            )
4821            .await
4822            .unwrap();
4823        let id_b = db
4824            .store_semantic(
4825                SemanticRecord::builder()
4826                    .concept("status_b")
4827                    .description("deployment failed")
4828                    .origin(Origin::DirectObservation)
4829                    .agent_id(AgentId::new("other_agent").unwrap())
4830                    .build()
4831                    .unwrap(),
4832            )
4833            .await
4834            .unwrap();
4835
4836        db.connect_with(id_a, id_b, EdgeRelation::Contradicts, 0.9, Metadata::new())
4837            .await
4838            .unwrap();
4839        db.mark_contradiction(id_a, id_b).await.unwrap();
4840
4841        let override_head = db
4842            .override_semantic(
4843                id_a,
4844                SemanticOverride {
4845                    reason: Some("operator confirmed the successful rollout".into()),
4846                    ..SemanticOverride::with_metadata(agent(), id_a)
4847                },
4848            )
4849            .await
4850            .unwrap();
4851
4852        let summary = crate::ql::context::detect_conflicts_for_record(
4853            &db,
4854            &MemoryRecord::Semantic(override_head.clone()),
4855            None,
4856        )
4857        .await;
4858
4859        assert_eq!(summary.groups.len(), 1);
4860        assert_eq!(
4861            summary.groups[0].preferred_memory_id,
4862            Some(override_head.id)
4863        );
4864    }
4865
4866    #[tokio::test(flavor = "multi_thread")]
4867    async fn normalize_current_recall_results_batches_episodic_heads() {
4868        let db = temp_db().await;
4869        let now = Timestamp::now();
4870
4871        let first_id = db
4872            .remember(
4873                EpisodicRecord::builder()
4874                    .event_type(EventType::Observation)
4875                    .content("first chain original")
4876                    .summary("first chain original")
4877                    .importance(0.6)
4878                    .timestamp(now)
4879                    .agent_id(agent())
4880                    .build()
4881                    .unwrap(),
4882            )
4883            .await
4884            .unwrap();
4885        let first_original = db.read_episodic_record(first_id).await.unwrap();
4886        let first_head = db
4887            .append_episodic_successor(
4888                &first_original,
4889                RevisionOperation::Correct,
4890                Some("normalize episodic head".to_string()),
4891                |next| {
4892                    next.importance = 0.9;
4893                },
4894            )
4895            .await
4896            .unwrap();
4897
4898        let second_id = db
4899            .remember(
4900                EpisodicRecord::builder()
4901                    .event_type(EventType::Observation)
4902                    .content("second chain current")
4903                    .summary("second chain current")
4904                    .importance(0.5)
4905                    .timestamp(now)
4906                    .agent_id(agent())
4907                    .build()
4908                    .unwrap(),
4909            )
4910            .await
4911            .unwrap();
4912        let second_head = db.read_episodic_record(second_id).await.unwrap();
4913
4914        let normalized = db
4915            .normalize_current_recall_results(vec![
4916                recall_result(MemoryRecord::Episodic(first_original.clone())),
4917                recall_result(MemoryRecord::Episodic(second_head.clone())),
4918                recall_result(MemoryRecord::Episodic(first_head.clone())),
4919            ])
4920            .await
4921            .unwrap();
4922
4923        assert_eq!(normalized.len(), 2);
4924
4925        let normalized_ids = normalized
4926            .iter()
4927            .map(|result| match &result.record {
4928                MemoryRecord::Episodic(record) => record.id,
4929                other => panic!("expected episodic record, got {other:?}"),
4930            })
4931            .collect::<Vec<_>>();
4932
4933        assert!(normalized_ids.contains(&first_head.id));
4934        assert!(normalized_ids.contains(&second_head.id));
4935        assert!(!normalized_ids.contains(&first_original.id));
4936        assert!(normalized.iter().all(|result| result.revision.is_some()));
4937    }
4938
4939    #[tokio::test(flavor = "multi_thread")]
4940    async fn semantic_successors_preserve_nonsemantic_contradictions() {
4941        let db = temp_db().await;
4942        let semantic_id = db
4943            .store_semantic(
4944                SemanticRecord::builder()
4945                    .concept("deploy_health")
4946                    .description("deployment remained healthy")
4947                    .agent_id(agent())
4948                    .build()
4949                    .unwrap(),
4950            )
4951            .await
4952            .unwrap();
4953        let episodic_id = db
4954            .remember_bypass_admission(
4955                EpisodicRecord::builder()
4956                    .event_type(EventType::Observation)
4957                    .content("deployment triggered error spikes")
4958                    .summary("error spikes after deployment")
4959                    .importance(0.9)
4960                    .agent_id(agent())
4961                    .build()
4962                    .unwrap(),
4963            )
4964            .await
4965            .unwrap();
4966
4967        db.connect_with(
4968            semantic_id,
4969            episodic_id,
4970            EdgeRelation::Contradicts,
4971            0.9,
4972            Metadata::new(),
4973        )
4974        .await
4975        .unwrap();
4976
4977        let history_after_connect = db.semantic_history(semantic_id).await.unwrap();
4978        assert_eq!(history_after_connect.len(), 2);
4979        assert!(history_after_connect[0].contradiction_ids.is_empty());
4980        assert_eq!(
4981            history_after_connect[1].contradiction_ids,
4982            vec![episodic_id]
4983        );
4984
4985        let next = db
4986            .correct_semantic(
4987                semantic_id,
4988                SemanticUpdate {
4989                    description: Some("deployment required rollback".into()),
4990                    reason: Some("post-incident correction".into()),
4991                    ..SemanticUpdate::with_metadata(agent(), semantic_id)
4992                },
4993            )
4994            .await
4995            .unwrap();
4996
4997        let current = db.read_semantic_record(next.id).await.unwrap();
4998        assert_eq!(current.contradiction_ids, vec![episodic_id]);
4999
5000        let edges = db
5001            .cached_graph()
5002            .get_edges_between(next.id, episodic_id)
5003            .await
5004            .unwrap();
5005        assert!(
5006            edges
5007                .iter()
5008                .any(|edge| edge.relation == EdgeRelation::Contradicts)
5009        );
5010
5011        let history = db.semantic_history(semantic_id).await.unwrap();
5012        assert_eq!(history.len(), 3);
5013        assert!(history[0].contradiction_ids.is_empty());
5014        assert_eq!(history[1].contradiction_ids, vec![episodic_id]);
5015        assert_eq!(history[2].contradiction_ids, vec![episodic_id]);
5016    }
5017
5018    #[tokio::test(flavor = "multi_thread")]
5019    async fn retract_semantic_records_applied_mutation_envelope() {
5020        let store = Arc::new(MemoryStore::new());
5021        let dir = tempfile::tempdir().unwrap();
5022        let config = HirnConfig::builder()
5023            .db_path(dir.path().join("semantic-retract-envelope"))
5024            .working_memory_token_limit(1000)
5025            .build()
5026            .unwrap();
5027        let db = HirnDB::open_with_config(config, store.clone())
5028            .await
5029            .unwrap();
5030
5031        let id = db
5032            .store_semantic(
5033                SemanticRecord::builder()
5034                    .concept("retract_target")
5035                    .description("retract me")
5036                    .agent_id(agent())
5037                    .build()
5038                    .unwrap(),
5039            )
5040            .await
5041            .unwrap();
5042
5043        let tombstone = db
5044            .retract_semantic(id, SemanticRetraction::with_metadata(agent(), id))
5045            .await
5046            .unwrap();
5047
5048        let envelope = hirn_storage::get_mutation_envelope(
5049            store.as_ref(),
5050            &format!("semantic-retract:{}", tombstone.id),
5051        )
5052        .await
5053        .unwrap()
5054        .unwrap();
5055        assert_eq!(envelope.state, hirn_storage::MutationEnvelopeState::Applied);
5056    }
5057
5058    #[tokio::test(flavor = "multi_thread")]
5059    async fn store_semantic_records_applied_mutation_envelope() {
5060        let store = Arc::new(MemoryStore::new());
5061        let dir = tempfile::tempdir().unwrap();
5062        let config = HirnConfig::builder()
5063            .db_path(dir.path().join("semantic-create-envelope"))
5064            .working_memory_token_limit(1000)
5065            .build()
5066            .unwrap();
5067        let db = HirnDB::open_with_config(config, store.clone())
5068            .await
5069            .unwrap();
5070
5071        let id = db
5072            .store_semantic(
5073                SemanticRecord::builder()
5074                    .concept("create_target")
5075                    .description("create me")
5076                    .agent_id(agent())
5077                    .build()
5078                    .unwrap(),
5079            )
5080            .await
5081            .unwrap();
5082
5083        let envelope =
5084            hirn_storage::get_mutation_envelope(store.as_ref(), &format!("semantic-create:{id}"))
5085                .await
5086                .unwrap()
5087                .unwrap();
5088        assert_eq!(envelope.state, hirn_storage::MutationEnvelopeState::Applied);
5089        assert!(db.cached_graph().has_node(id).await.unwrap());
5090    }
5091
5092    #[tokio::test(flavor = "multi_thread")]
5093    async fn batch_store_semantic_records_applied_mutation_envelopes() {
5094        let store = Arc::new(MemoryStore::new());
5095        let dir = tempfile::tempdir().unwrap();
5096        let config = HirnConfig::builder()
5097            .db_path(dir.path().join("semantic-batch-create-envelope"))
5098            .working_memory_token_limit(1000)
5099            .build()
5100            .unwrap();
5101        let db = HirnDB::open_with_config(config, store.clone())
5102            .await
5103            .unwrap();
5104
5105        let records = vec![
5106            SemanticRecord::builder()
5107                .concept("batch_create_a")
5108                .description("alpha")
5109                .agent_id(agent())
5110                .build()
5111                .unwrap(),
5112            SemanticRecord::builder()
5113                .concept("batch_create_b")
5114                .description("beta")
5115                .agent_id(agent())
5116                .build()
5117                .unwrap(),
5118        ];
5119        let record_ids = records.iter().map(|record| record.id).collect::<Vec<_>>();
5120
5121        let results = db.batch_store_semantic(records).await;
5122
5123        assert!(results.iter().all(|result| result.is_ok()));
5124        for id in record_ids {
5125            let envelope = hirn_storage::get_mutation_envelope(
5126                store.as_ref(),
5127                &format!("semantic-create:{id}"),
5128            )
5129            .await
5130            .unwrap()
5131            .unwrap();
5132            assert_eq!(envelope.state, hirn_storage::MutationEnvelopeState::Applied);
5133            assert!(db.cached_graph().has_node(id).await.unwrap());
5134        }
5135    }
5136
5137    #[tokio::test(flavor = "multi_thread")]
5138    async fn correct_semantic_records_applied_mutation_envelope() {
5139        let store = Arc::new(MemoryStore::new());
5140        let dir = tempfile::tempdir().unwrap();
5141        let config = HirnConfig::builder()
5142            .db_path(dir.path().join("semantic-successor-envelope"))
5143            .working_memory_token_limit(1000)
5144            .build()
5145            .unwrap();
5146        let db = HirnDB::open_with_config(config, store.clone())
5147            .await
5148            .unwrap();
5149
5150        let id = db
5151            .store_semantic(
5152                SemanticRecord::builder()
5153                    .concept("successor_target")
5154                    .description("original description")
5155                    .agent_id(agent())
5156                    .build()
5157                    .unwrap(),
5158            )
5159            .await
5160            .unwrap();
5161
5162        let corrected = db
5163            .correct_semantic(
5164                id,
5165                SemanticUpdate {
5166                    description: Some("corrected description".into()),
5167                    ..SemanticUpdate::with_metadata(agent(), id)
5168                },
5169            )
5170            .await
5171            .unwrap();
5172
5173        let envelope = hirn_storage::get_mutation_envelope(
5174            store.as_ref(),
5175            &format!("semantic-successor:{}", corrected.id),
5176        )
5177        .await
5178        .unwrap()
5179        .unwrap();
5180        assert_eq!(envelope.state, hirn_storage::MutationEnvelopeState::Applied);
5181        assert!(!db.cached_graph().has_node(id).await.unwrap());
5182    }
5183
5184    #[tokio::test(flavor = "multi_thread")]
5185    async fn merge_semantic_records_applied_mutation_envelope() {
5186        let store = Arc::new(MemoryStore::new());
5187        let dir = tempfile::tempdir().unwrap();
5188        let config = HirnConfig::builder()
5189            .db_path(dir.path().join("semantic-merge-envelope"))
5190            .working_memory_token_limit(1000)
5191            .build()
5192            .unwrap();
5193        let db = HirnDB::open_with_config(config, store.clone())
5194            .await
5195            .unwrap();
5196
5197        let target_id = db
5198            .store_semantic(
5199                SemanticRecord::builder()
5200                    .concept("deploy_summary")
5201                    .description("deployment succeeded")
5202                    .agent_id(agent())
5203                    .build()
5204                    .unwrap(),
5205            )
5206            .await
5207            .unwrap();
5208        let source_id = db
5209            .store_semantic(
5210                SemanticRecord::builder()
5211                    .concept("deploy_summary")
5212                    .description("deployment recovered after retry")
5213                    .agent_id(AgentId::new("other_agent").unwrap())
5214                    .build()
5215                    .unwrap(),
5216            )
5217            .await
5218            .unwrap();
5219
5220        let outcome = db
5221            .merge_semantic(
5222                target_id,
5223                SemanticMerge {
5224                    source_ids: vec![source_id],
5225                    description: Some("canonical deployment summary".into()),
5226                    reason: Some("dedupe".into()),
5227                    ..SemanticMerge::with_metadata(agent(), target_id)
5228                },
5229            )
5230            .await
5231            .unwrap();
5232
5233        let envelope = hirn_storage::get_mutation_envelope(
5234            store.as_ref(),
5235            &format!("semantic-merge:{}", outcome.target.id),
5236        )
5237        .await
5238        .unwrap()
5239        .unwrap();
5240        assert_eq!(envelope.state, hirn_storage::MutationEnvelopeState::Applied);
5241        assert!(!db.cached_graph().has_node(target_id).await.unwrap());
5242        assert!(!db.cached_graph().has_node(source_id).await.unwrap());
5243        assert!(db.cached_graph().has_node(outcome.target.id).await.unwrap());
5244        assert!(
5245            db.cached_graph()
5246                .has_node(outcome.merged_sources[0].id)
5247                .await
5248                .unwrap()
5249        );
5250    }
5251
5252    #[tokio::test(flavor = "multi_thread")]
5253    async fn contradiction_connect_records_applied_mutation_envelope() {
5254        let store = Arc::new(MemoryStore::new());
5255        let dir = tempfile::tempdir().unwrap();
5256        let config = HirnConfig::builder()
5257            .db_path(dir.path().join("semantic-contradiction-envelope"))
5258            .working_memory_token_limit(1000)
5259            .build()
5260            .unwrap();
5261        let db = HirnDB::open_with_config(config, store.clone())
5262            .await
5263            .unwrap();
5264
5265        let id_a = db
5266            .store_semantic(
5267                SemanticRecord::builder()
5268                    .concept("deploy_status_a")
5269                    .description("deployment succeeded")
5270                    .agent_id(agent())
5271                    .build()
5272                    .unwrap(),
5273            )
5274            .await
5275            .unwrap();
5276        let id_b = db
5277            .store_semantic(
5278                SemanticRecord::builder()
5279                    .concept("deploy_status_b")
5280                    .description("deployment failed")
5281                    .agent_id(AgentId::new("other_agent").unwrap())
5282                    .build()
5283                    .unwrap(),
5284            )
5285            .await
5286            .unwrap();
5287
5288        db.connect_with(id_a, id_b, EdgeRelation::Contradicts, 0.9, Metadata::new())
5289            .await
5290            .unwrap();
5291
5292        let history_a = db.semantic_history(id_a).await.unwrap();
5293        let history_b = db.semantic_history(id_b).await.unwrap();
5294        let head_a = history_a.last().unwrap();
5295        let head_b = history_b.last().unwrap();
5296        let envelope = build_semantic_contradiction_sync_envelope(
5297            vec![id_a, id_b],
5298            vec![head_a.id, head_b.id],
5299        )
5300        .unwrap();
5301
5302        let stored = hirn_storage::get_mutation_envelope(store.as_ref(), &envelope.id)
5303            .await
5304            .unwrap()
5305            .unwrap();
5306        assert_eq!(stored.state, hirn_storage::MutationEnvelopeState::Applied);
5307        assert!(!db.cached_graph().has_node(id_a).await.unwrap());
5308        assert!(!db.cached_graph().has_node(id_b).await.unwrap());
5309    }
5310
5311    #[tokio::test(flavor = "multi_thread")]
5312    async fn purge_semantic_records_applied_mutation_envelope() {
5313        let store = Arc::new(MemoryStore::new());
5314        let dir = tempfile::tempdir().unwrap();
5315        let config = HirnConfig::builder()
5316            .db_path(dir.path().join("semantic-purge-envelope"))
5317            .working_memory_token_limit(1000)
5318            .build()
5319            .unwrap();
5320        let db = HirnDB::open_with_config(config, store.clone())
5321            .await
5322            .unwrap();
5323
5324        let id = db
5325            .store_semantic(
5326                SemanticRecord::builder()
5327                    .concept("purge_target")
5328                    .description("purge me")
5329                    .agent_id(agent())
5330                    .build()
5331                    .unwrap(),
5332            )
5333            .await
5334            .unwrap();
5335        let corrected = db
5336            .correct_semantic(
5337                id,
5338                SemanticUpdate {
5339                    description: Some("purge me v2".into()),
5340                    ..SemanticUpdate::with_metadata(agent(), id)
5341                },
5342            )
5343            .await
5344            .unwrap();
5345        let logical_memory_id = db
5346            .read_semantic_record(corrected.id)
5347            .await
5348            .unwrap()
5349            .logical_memory_id;
5350
5351        db.purge_semantic(corrected.id).await.unwrap();
5352
5353        let envelope = hirn_storage::get_mutation_envelope(
5354            store.as_ref(),
5355            &format!("semantic-purge:{logical_memory_id}"),
5356        )
5357        .await
5358        .unwrap()
5359        .unwrap();
5360        assert_eq!(envelope.state, hirn_storage::MutationEnvelopeState::Applied);
5361        assert!(!db.cached_graph().has_node(id).await.unwrap());
5362        assert!(!db.cached_graph().has_node(corrected.id).await.unwrap());
5363        assert!(matches!(
5364            db.semantic_head_for_logical_id(logical_memory_id).await,
5365            Err(HirnError::NotFound(_))
5366        ));
5367    }
5368
5369    #[tokio::test(flavor = "multi_thread")]
5370    async fn open_reconciles_pending_semantic_retract_mutations() {
5371        let dir = tempfile::tempdir().unwrap();
5372        let path = dir.path().join("semantic-retract-envelope-recovery");
5373        let store = Arc::new(MemoryStore::new());
5374        let config = HirnConfig::builder()
5375            .db_path(&path)
5376            .working_memory_token_limit(1000)
5377            .build()
5378            .unwrap();
5379
5380        let db = HirnDB::open_with_config(config.clone(), store.clone())
5381            .await
5382            .unwrap();
5383        let id = db
5384            .store_semantic(
5385                SemanticRecord::builder()
5386                    .concept("recovery_target")
5387                    .description("pending retract")
5388                    .agent_id(agent())
5389                    .build()
5390                    .unwrap(),
5391            )
5392            .await
5393            .unwrap();
5394        let current = db.read_semantic_record(id).await.unwrap();
5395
5396        let mut tombstone = current.clone();
5397        let tombstone_id = MemoryId::new();
5398        let now = Timestamp::now();
5399        tombstone.id = tombstone_id;
5400        tombstone.revision_id = RevisionId::from_memory_id(tombstone_id);
5401        tombstone.version = current.version + 1;
5402        tombstone.revision_operation = RevisionOperation::Retract;
5403        tombstone.revision_reason = Some("manual recovery test".into());
5404        tombstone.revision_causation_id = Some(current.id);
5405        tombstone.created_at = now;
5406        tombstone.updated_at = now;
5407        tombstone.valid_from = now;
5408        tombstone.valid_until = None;
5409        tombstone.superseded_by = None;
5410        tombstone.merged_into = None;
5411        normalize_semantic_record_timestamps(&mut tombstone);
5412
5413        db.append_semantic_record(&tombstone).await.unwrap();
5414        let envelope = build_semantic_retract_envelope(current.id, tombstone.id).unwrap();
5415        hirn_storage::append_mutation_envelope(store.as_ref(), &envelope)
5416            .await
5417            .unwrap();
5418
5419        assert!(db.cached_graph().has_node(current.id).await.unwrap());
5420        drop(db);
5421
5422        let reopened = HirnDB::open_with_config(config, store.clone())
5423            .await
5424            .unwrap();
5425
5426        assert!(!reopened.cached_graph().has_node(current.id).await.unwrap());
5427        let stored_envelope = hirn_storage::get_mutation_envelope(store.as_ref(), &envelope.id)
5428            .await
5429            .unwrap()
5430            .unwrap();
5431        assert_eq!(
5432            stored_envelope.state,
5433            hirn_storage::MutationEnvelopeState::Applied
5434        );
5435    }
5436
5437    #[tokio::test(flavor = "multi_thread")]
5438    async fn open_reconciles_pending_semantic_create_mutations_without_resurrecting_stale_heads() {
5439        let dir = tempfile::tempdir().unwrap();
5440        let path = dir
5441            .path()
5442            .join("semantic-create-envelope-recovery-stale-head");
5443        let store = Arc::new(MemoryStore::new());
5444        let config = HirnConfig::builder()
5445            .db_path(&path)
5446            .working_memory_token_limit(1000)
5447            .build()
5448            .unwrap();
5449
5450        let db = HirnDB::open_with_config(config.clone(), store.clone())
5451            .await
5452            .unwrap();
5453        let id = db
5454            .store_semantic(
5455                SemanticRecord::builder()
5456                    .concept("create_recovery_target")
5457                    .description("pending create")
5458                    .agent_id(agent())
5459                    .build()
5460                    .unwrap(),
5461            )
5462            .await
5463            .unwrap();
5464        let create_envelope = build_semantic_create_envelope(id).unwrap();
5465        hirn_storage::append_mutation_envelope(store.as_ref(), &create_envelope)
5466            .await
5467            .unwrap();
5468
5469        let corrected = db
5470            .correct_semantic(
5471                id,
5472                SemanticUpdate {
5473                    description: Some("pending create v2".into()),
5474                    ..SemanticUpdate::with_metadata(agent(), id)
5475                },
5476            )
5477            .await
5478            .unwrap();
5479        let corrected_record = db.read_semantic_record(corrected.id).await.unwrap();
5480
5481        assert!(!db.cached_graph().has_node(id).await.unwrap());
5482        assert!(db.cached_graph().has_node(corrected.id).await.unwrap());
5483        drop(db);
5484
5485        let reopened = HirnDB::open_with_config(config, store.clone())
5486            .await
5487            .unwrap();
5488
5489        assert!(!reopened.cached_graph().has_node(id).await.unwrap());
5490        assert!(
5491            reopened
5492                .cached_graph()
5493                .has_node(corrected.id)
5494                .await
5495                .unwrap()
5496        );
5497        let head = reopened
5498            .semantic_head_for_logical_id(corrected_record.logical_memory_id)
5499            .await
5500            .unwrap();
5501        assert_eq!(head.id, corrected.id);
5502        let stored_envelope =
5503            hirn_storage::get_mutation_envelope(store.as_ref(), &create_envelope.id)
5504                .await
5505                .unwrap()
5506                .unwrap();
5507        assert_eq!(
5508            stored_envelope.state,
5509            hirn_storage::MutationEnvelopeState::Applied
5510        );
5511    }
5512
5513    #[tokio::test(flavor = "multi_thread")]
5514    async fn open_marks_missing_pending_semantic_create_mutations_failed_and_cleans_graph() {
5515        let dir = tempfile::tempdir().unwrap();
5516        let path = dir.path().join("semantic-create-envelope-recovery-missing");
5517        let store = Arc::new(MemoryStore::new());
5518        let config = HirnConfig::builder()
5519            .db_path(&path)
5520            .working_memory_token_limit(1000)
5521            .build()
5522            .unwrap();
5523
5524        let db = HirnDB::open_with_config(config.clone(), store.clone())
5525            .await
5526            .unwrap();
5527        let record = SemanticRecord::builder()
5528            .concept("missing_create_recovery")
5529            .description("orphaned graph node")
5530            .agent_id(agent())
5531            .build()
5532            .unwrap();
5533        let envelope = build_semantic_create_envelope(record.id).unwrap();
5534        hirn_storage::append_mutation_envelope(store.as_ref(), &envelope)
5535            .await
5536            .unwrap();
5537        db.cached_graph()
5538            .add_node(
5539                record.id,
5540                Layer::Semantic,
5541                record.confidence,
5542                record.created_at,
5543                record.namespace,
5544            )
5545            .await
5546            .unwrap();
5547
5548        assert!(db.cached_graph().has_node(record.id).await.unwrap());
5549        drop(db);
5550
5551        let reopened = HirnDB::open_with_config(config, store.clone())
5552            .await
5553            .unwrap();
5554
5555        assert!(!reopened.cached_graph().has_node(record.id).await.unwrap());
5556        let stored_envelope = hirn_storage::get_mutation_envelope(store.as_ref(), &envelope.id)
5557            .await
5558            .unwrap()
5559            .unwrap();
5560        assert_eq!(
5561            stored_envelope.state,
5562            hirn_storage::MutationEnvelopeState::Failed
5563        );
5564    }
5565
5566    #[tokio::test(flavor = "multi_thread")]
5567    async fn open_reconciles_pending_semantic_successor_mutations() {
5568        let dir = tempfile::tempdir().unwrap();
5569        let path = dir.path().join("semantic-successor-envelope-recovery");
5570        let store = Arc::new(MemoryStore::new());
5571        let config = HirnConfig::builder()
5572            .db_path(&path)
5573            .working_memory_token_limit(1000)
5574            .build()
5575            .unwrap();
5576
5577        let db = HirnDB::open_with_config(config.clone(), store.clone())
5578            .await
5579            .unwrap();
5580        let id = db
5581            .store_semantic(
5582                SemanticRecord::builder()
5583                    .concept("successor_recovery_target")
5584                    .description("pending successor")
5585                    .agent_id(agent())
5586                    .build()
5587                    .unwrap(),
5588            )
5589            .await
5590            .unwrap();
5591        let current = db.read_semantic_record(id).await.unwrap();
5592
5593        let mut successor = current.clone();
5594        let successor_id = MemoryId::new();
5595        let now = Timestamp::now();
5596        successor.id = successor_id;
5597        successor.revision_id = RevisionId::from_memory_id(successor_id);
5598        successor.version = current.version + 1;
5599        successor.revision_operation = RevisionOperation::Correct;
5600        successor.revision_reason = Some("manual recovery test".into());
5601        successor.revision_causation_id = Some(current.id);
5602        successor.description = "recovered successor".into();
5603        successor.created_at = now;
5604        successor.updated_at = now;
5605        successor.valid_from = current.valid_from;
5606        successor.valid_until = None;
5607        successor.superseded_by = None;
5608        successor.merged_into = None;
5609        normalize_semantic_record_timestamps(&mut successor);
5610
5611        db.append_semantic_record(&successor).await.unwrap();
5612        let envelope = build_semantic_successor_envelope(current.id, successor.id).unwrap();
5613        hirn_storage::append_mutation_envelope(store.as_ref(), &envelope)
5614            .await
5615            .unwrap();
5616
5617        assert!(db.cached_graph().has_node(current.id).await.unwrap());
5618        drop(db);
5619
5620        let reopened = HirnDB::open_with_config(config, store.clone())
5621            .await
5622            .unwrap();
5623
5624        assert!(!reopened.cached_graph().has_node(current.id).await.unwrap());
5625        assert!(
5626            reopened
5627                .cached_graph()
5628                .has_node(successor.id)
5629                .await
5630                .unwrap()
5631        );
5632        let head = reopened
5633            .semantic_head_for_logical_id(current.logical_memory_id)
5634            .await
5635            .unwrap();
5636        assert_eq!(head.id, successor.id);
5637        let stored_envelope = hirn_storage::get_mutation_envelope(store.as_ref(), &envelope.id)
5638            .await
5639            .unwrap()
5640            .unwrap();
5641        assert_eq!(
5642            stored_envelope.state,
5643            hirn_storage::MutationEnvelopeState::Applied
5644        );
5645    }
5646
5647    #[tokio::test(flavor = "multi_thread")]
5648    async fn open_reconciles_pending_semantic_merge_mutations_without_resurrecting_stale_heads() {
5649        let dir = tempfile::tempdir().unwrap();
5650        let path = dir.path().join("semantic-merge-envelope-recovery");
5651        let store = Arc::new(MemoryStore::new());
5652        let config = HirnConfig::builder()
5653            .db_path(&path)
5654            .working_memory_token_limit(1000)
5655            .build()
5656            .unwrap();
5657
5658        let db = HirnDB::open_with_config(config.clone(), store.clone())
5659            .await
5660            .unwrap();
5661        let target_id = db
5662            .store_semantic(
5663                SemanticRecord::builder()
5664                    .concept("merge_recovery")
5665                    .description("target head")
5666                    .agent_id(agent())
5667                    .build()
5668                    .unwrap(),
5669            )
5670            .await
5671            .unwrap();
5672        let source_id = db
5673            .store_semantic(
5674                SemanticRecord::builder()
5675                    .concept("merge_recovery")
5676                    .description("source head")
5677                    .agent_id(AgentId::new("merge_source_agent").unwrap())
5678                    .build()
5679                    .unwrap(),
5680            )
5681            .await
5682            .unwrap();
5683        let current = db.read_semantic_record(target_id).await.unwrap();
5684        let source = db.read_semantic_record(source_id).await.unwrap();
5685
5686        let successor_id = MemoryId::new();
5687        let successor_now = Timestamp::now();
5688        let mut successor = current.clone();
5689        successor.id = successor_id;
5690        successor.revision_id = RevisionId::from_memory_id(successor_id);
5691        successor.version = current.version + 1;
5692        successor.revision_operation = RevisionOperation::Correct;
5693        successor.revision_reason = Some("manual successor recovery".into());
5694        successor.revision_causation_id = Some(current.id);
5695        successor.description = "intermediate target head".into();
5696        successor.created_at = successor_now;
5697        successor.updated_at = successor_now;
5698        successor.valid_from = current.valid_from;
5699        successor.valid_until = None;
5700        successor.superseded_by = None;
5701        successor.merged_into = None;
5702        normalize_semantic_record_timestamps(&mut successor);
5703
5704        db.append_semantic_record(&successor).await.unwrap();
5705        let successor_envelope =
5706            build_semantic_successor_envelope(current.id, successor.id).unwrap();
5707        hirn_storage::append_mutation_envelope(store.as_ref(), &successor_envelope)
5708            .await
5709            .unwrap();
5710
5711        let merge_now = Timestamp::now();
5712        let merged_target_id = MemoryId::new();
5713        let mut merged_target = successor.clone();
5714        merged_target.id = merged_target_id;
5715        merged_target.revision_id = RevisionId::from_memory_id(merged_target_id);
5716        merged_target.version = successor.version + 1;
5717        merged_target.revision_operation = RevisionOperation::Merge;
5718        merged_target.revision_reason = Some("manual merge recovery".into());
5719        merged_target.revision_causation_id = Some(successor.id);
5720        merged_target.description = "canonical merged target".into();
5721        merged_target.created_at = merge_now;
5722        merged_target.updated_at = merge_now;
5723        merged_target.valid_from = merge_now;
5724        merged_target.valid_until = None;
5725        merged_target.superseded_by = None;
5726        merged_target.merged_into = None;
5727        normalize_semantic_record_timestamps(&mut merged_target);
5728
5729        let merged_source_id = MemoryId::new();
5730        let mut merged_source = source.clone();
5731        merged_source.id = merged_source_id;
5732        merged_source.revision_id = RevisionId::from_memory_id(merged_source_id);
5733        merged_source.version = source.version + 1;
5734        merged_source.revision_operation = RevisionOperation::Merge;
5735        merged_source.revision_reason = Some("manual merge recovery".into());
5736        merged_source.revision_causation_id = Some(merged_target.id);
5737        merged_source.created_at = merge_now;
5738        merged_source.updated_at = merge_now;
5739        merged_source.valid_from = merge_now;
5740        merged_source.valid_until = None;
5741        merged_source.superseded_by = None;
5742        merged_source.merged_into = Some(merged_target.logical_memory_id);
5743        normalize_semantic_record_timestamps(&mut merged_source);
5744
5745        db.append_semantic_records(&[merged_target.clone(), merged_source.clone()])
5746            .await
5747            .unwrap();
5748        let merge_envelope = build_semantic_merge_envelope(
5749            successor.id,
5750            merged_target.id,
5751            vec![source.id],
5752            vec![merged_source.id],
5753        )
5754        .unwrap();
5755        hirn_storage::append_mutation_envelope(store.as_ref(), &merge_envelope)
5756            .await
5757            .unwrap();
5758
5759        assert!(db.cached_graph().has_node(current.id).await.unwrap());
5760        assert!(db.cached_graph().has_node(source.id).await.unwrap());
5761        drop(db);
5762
5763        let reopened = HirnDB::open_with_config(config, store.clone())
5764            .await
5765            .unwrap();
5766
5767        assert!(!reopened.cached_graph().has_node(current.id).await.unwrap());
5768        assert!(!reopened.cached_graph().has_node(source.id).await.unwrap());
5769        assert!(
5770            !reopened
5771                .cached_graph()
5772                .has_node(successor.id)
5773                .await
5774                .unwrap()
5775        );
5776        assert!(
5777            reopened
5778                .cached_graph()
5779                .has_node(merged_target.id)
5780                .await
5781                .unwrap()
5782        );
5783        assert!(
5784            reopened
5785                .cached_graph()
5786                .has_node(merged_source.id)
5787                .await
5788                .unwrap()
5789        );
5790
5791        let target_head = reopened
5792            .semantic_head_for_logical_id(current.logical_memory_id)
5793            .await
5794            .unwrap();
5795        let source_head = reopened
5796            .semantic_head_for_logical_id(source.logical_memory_id)
5797            .await
5798            .unwrap();
5799        assert_eq!(target_head.id, merged_target.id);
5800        assert_eq!(source_head.id, merged_source.id);
5801
5802        let stored_successor_envelope =
5803            hirn_storage::get_mutation_envelope(store.as_ref(), &successor_envelope.id)
5804                .await
5805                .unwrap()
5806                .unwrap();
5807        assert_eq!(
5808            stored_successor_envelope.state,
5809            hirn_storage::MutationEnvelopeState::Applied
5810        );
5811        let stored_merge_envelope =
5812            hirn_storage::get_mutation_envelope(store.as_ref(), &merge_envelope.id)
5813                .await
5814                .unwrap()
5815                .unwrap();
5816        assert_eq!(
5817            stored_merge_envelope.state,
5818            hirn_storage::MutationEnvelopeState::Applied
5819        );
5820    }
5821
5822    #[tokio::test(flavor = "multi_thread")]
5823    async fn open_reconciles_pending_semantic_contradiction_sync_mutations() {
5824        let dir = tempfile::tempdir().unwrap();
5825        let path = dir.path().join("semantic-contradiction-envelope-recovery");
5826        let store = Arc::new(MemoryStore::new());
5827        let config = HirnConfig::builder()
5828            .db_path(&path)
5829            .working_memory_token_limit(1000)
5830            .build()
5831            .unwrap();
5832
5833        let db = HirnDB::open_with_config(config.clone(), store.clone())
5834            .await
5835            .unwrap();
5836        let id_a = db
5837            .store_semantic(
5838                SemanticRecord::builder()
5839                    .concept("contradiction_recovery_a")
5840                    .description("deployment succeeded")
5841                    .agent_id(agent())
5842                    .build()
5843                    .unwrap(),
5844            )
5845            .await
5846            .unwrap();
5847        let id_b = db
5848            .store_semantic(
5849                SemanticRecord::builder()
5850                    .concept("contradiction_recovery_b")
5851                    .description("deployment failed")
5852                    .agent_id(AgentId::new("contradiction_source_agent").unwrap())
5853                    .build()
5854                    .unwrap(),
5855            )
5856            .await
5857            .unwrap();
5858        let current_a = db.read_semantic_record(id_a).await.unwrap();
5859        let current_b = db.read_semantic_record(id_b).await.unwrap();
5860
5861        let now = Timestamp::now();
5862        let successor_a_id = MemoryId::new();
5863        let successor_b_id = MemoryId::new();
5864        let successor_a = db.prepare_semantic_contradiction_successor(
5865            &current_a,
5866            successor_a_id,
5867            successor_b_id,
5868            now,
5869        );
5870        let successor_b = db.prepare_semantic_contradiction_successor(
5871            &current_b,
5872            successor_b_id,
5873            successor_a_id,
5874            now,
5875        );
5876
5877        db.append_semantic_records(&[successor_a.clone(), successor_b.clone()])
5878            .await
5879            .unwrap();
5880        let envelope = build_semantic_contradiction_sync_envelope(
5881            vec![current_a.id, current_b.id],
5882            vec![successor_a.id, successor_b.id],
5883        )
5884        .unwrap();
5885        hirn_storage::append_mutation_envelope(store.as_ref(), &envelope)
5886            .await
5887            .unwrap();
5888
5889        assert!(db.cached_graph().has_node(current_a.id).await.unwrap());
5890        assert!(db.cached_graph().has_node(current_b.id).await.unwrap());
5891        drop(db);
5892
5893        let reopened = HirnDB::open_with_config(config, store.clone())
5894            .await
5895            .unwrap();
5896
5897        assert!(
5898            !reopened
5899                .cached_graph()
5900                .has_node(current_a.id)
5901                .await
5902                .unwrap()
5903        );
5904        assert!(
5905            !reopened
5906                .cached_graph()
5907                .has_node(current_b.id)
5908                .await
5909                .unwrap()
5910        );
5911        assert!(
5912            reopened
5913                .cached_graph()
5914                .has_node(successor_a.id)
5915                .await
5916                .unwrap()
5917        );
5918        assert!(
5919            reopened
5920                .cached_graph()
5921                .has_node(successor_b.id)
5922                .await
5923                .unwrap()
5924        );
5925
5926        let head_a = reopened
5927            .semantic_head_for_logical_id(current_a.logical_memory_id)
5928            .await
5929            .unwrap();
5930        let head_b = reopened
5931            .semantic_head_for_logical_id(current_b.logical_memory_id)
5932            .await
5933            .unwrap();
5934        assert_eq!(head_a.id, successor_a.id);
5935        assert_eq!(head_b.id, successor_b.id);
5936
5937        let stored = hirn_storage::get_mutation_envelope(store.as_ref(), &envelope.id)
5938            .await
5939            .unwrap()
5940            .unwrap();
5941        assert_eq!(stored.state, hirn_storage::MutationEnvelopeState::Applied);
5942    }
5943
5944    #[tokio::test(flavor = "multi_thread")]
5945    async fn open_reconciles_pending_semantic_purge_mutations() {
5946        let dir = tempfile::tempdir().unwrap();
5947        let path = dir.path().join("semantic-purge-envelope-recovery");
5948        let store = Arc::new(MemoryStore::new());
5949        let config = HirnConfig::builder()
5950            .db_path(&path)
5951            .working_memory_token_limit(1000)
5952            .build()
5953            .unwrap();
5954
5955        let db = HirnDB::open_with_config(config.clone(), store.clone())
5956            .await
5957            .unwrap();
5958        let id = db
5959            .store_semantic(
5960                SemanticRecord::builder()
5961                    .concept("purge_recovery_target")
5962                    .description("pending purge")
5963                    .agent_id(agent())
5964                    .build()
5965                    .unwrap(),
5966            )
5967            .await
5968            .unwrap();
5969        let corrected = db
5970            .correct_semantic(
5971                id,
5972                SemanticUpdate {
5973                    description: Some("pending purge v2".into()),
5974                    ..SemanticUpdate::with_metadata(agent(), id)
5975                },
5976            )
5977            .await
5978            .unwrap();
5979        let current = db.read_semantic_record(corrected.id).await.unwrap();
5980        let history = db.semantic_history(corrected.id).await.unwrap();
5981        let revision_ids = history
5982            .iter()
5983            .map(|revision| revision.id)
5984            .collect::<Vec<_>>();
5985        let envelope =
5986            build_semantic_purge_envelope(current.logical_memory_id, revision_ids.clone()).unwrap();
5987        hirn_storage::append_mutation_envelope(store.as_ref(), &envelope)
5988            .await
5989            .unwrap();
5990
5991        assert!(!db.cached_graph().has_node(id).await.unwrap());
5992        assert!(db.cached_graph().has_node(corrected.id).await.unwrap());
5993        drop(db);
5994
5995        let reopened = HirnDB::open_with_config(config, store.clone())
5996            .await
5997            .unwrap();
5998
5999        for revision_id in &revision_ids {
6000            assert!(
6001                !reopened
6002                    .cached_graph()
6003                    .has_node(*revision_id)
6004                    .await
6005                    .unwrap()
6006            );
6007        }
6008        assert!(matches!(
6009            reopened
6010                .semantic_head_for_logical_id(current.logical_memory_id)
6011                .await,
6012            Err(HirnError::NotFound(_))
6013        ));
6014        let stored = hirn_storage::get_mutation_envelope(store.as_ref(), &envelope.id)
6015            .await
6016            .unwrap()
6017            .unwrap();
6018        assert_eq!(stored.state, hirn_storage::MutationEnvelopeState::Applied);
6019    }
6020
6021    #[tokio::test(flavor = "multi_thread")]
6022    async fn contradiction_connect_appends_revision_native_semantic_successors() {
6023        let db = temp_db().await;
6024        let id_a = db
6025            .store_semantic(
6026                SemanticRecord::builder()
6027                    .concept("deploy_status_a")
6028                    .description("deployment succeeded")
6029                    .agent_id(agent())
6030                    .build()
6031                    .unwrap(),
6032            )
6033            .await
6034            .unwrap();
6035        let id_b = db
6036            .store_semantic(
6037                SemanticRecord::builder()
6038                    .concept("deploy_status_b")
6039                    .description("deployment failed")
6040                    .agent_id(AgentId::new("other_agent").unwrap())
6041                    .build()
6042                    .unwrap(),
6043            )
6044            .await
6045            .unwrap();
6046
6047        db.connect_with(id_a, id_b, EdgeRelation::Contradicts, 0.9, Metadata::new())
6048            .await
6049            .unwrap();
6050
6051        let history_a = db.semantic_history(id_a).await.unwrap();
6052        let history_b = db.semantic_history(id_b).await.unwrap();
6053
6054        assert_eq!(history_a.len(), 2);
6055        assert_eq!(history_b.len(), 2);
6056        assert!(history_a[0].contradiction_ids.is_empty());
6057        assert!(history_b[0].contradiction_ids.is_empty());
6058
6059        let head_a = history_a.last().unwrap();
6060        let head_b = history_b.last().unwrap();
6061
6062        assert_eq!(head_a.revision_operation, RevisionOperation::Correct);
6063        assert_eq!(head_b.revision_operation, RevisionOperation::Correct);
6064        assert_eq!(head_a.contradiction_ids, vec![head_b.id]);
6065        assert_eq!(head_b.contradiction_ids, vec![head_a.id]);
6066    }
6067
6068    #[tokio::test(flavor = "multi_thread")]
6069    async fn contradiction_connect_canonicalizes_stale_semantic_revision_ids() {
6070        let db = temp_db().await;
6071        let original_a = db
6072            .store_semantic(
6073                SemanticRecord::builder()
6074                    .concept("deploy_status_a")
6075                    .description("deployment succeeded")
6076                    .agent_id(agent())
6077                    .build()
6078                    .unwrap(),
6079            )
6080            .await
6081            .unwrap();
6082        let corrected_a = db
6083            .correct_semantic(
6084                original_a,
6085                SemanticUpdate {
6086                    description: Some("deployment succeeded after rollback".into()),
6087                    reason: Some("postmortem correction".into()),
6088                    ..SemanticUpdate::with_metadata(agent(), original_a)
6089                },
6090            )
6091            .await
6092            .unwrap();
6093        let id_b = db
6094            .store_semantic(
6095                SemanticRecord::builder()
6096                    .concept("deploy_status_b")
6097                    .description("deployment failed")
6098                    .agent_id(AgentId::new("other_agent").unwrap())
6099                    .build()
6100                    .unwrap(),
6101            )
6102            .await
6103            .unwrap();
6104
6105        db.connect_with(
6106            original_a,
6107            id_b,
6108            EdgeRelation::Contradicts,
6109            0.9,
6110            Metadata::new(),
6111        )
6112        .await
6113        .unwrap();
6114
6115        let history_a = db.semantic_history(original_a).await.unwrap();
6116        let history_b = db.semantic_history(id_b).await.unwrap();
6117
6118        assert_eq!(history_a.len(), 3);
6119        assert_eq!(history_b.len(), 2);
6120        assert!(history_a[0].contradiction_ids.is_empty());
6121        assert!(history_a[1].contradiction_ids.is_empty());
6122        assert!(history_b[0].contradiction_ids.is_empty());
6123
6124        let head_a = history_a.last().unwrap();
6125        let head_b = history_b.last().unwrap();
6126
6127        assert_eq!(history_a[1].id, corrected_a.id);
6128        assert_eq!(head_a.revision_operation, RevisionOperation::Correct);
6129        assert_eq!(head_b.revision_operation, RevisionOperation::Correct);
6130        assert_eq!(head_a.contradiction_ids, vec![head_b.id]);
6131        assert_eq!(head_b.contradiction_ids, vec![head_a.id]);
6132    }
6133
6134    #[test]
6135    fn semantic_filters_use_structured_exact_match() {
6136        let logical_memory_id = LogicalMemoryId::new();
6137        let revision_id = RevisionId::new();
6138
6139        assert_eq!(
6140            HirnDB::semantic_logical_exact_filter(logical_memory_id),
6141            hirn_storage::store::ExactMatchFilter::utf8_value(
6142                "logical_memory_id",
6143                logical_memory_id.to_string(),
6144            )
6145        );
6146        assert_eq!(
6147            HirnDB::semantic_revision_exact_filter(revision_id),
6148            hirn_storage::store::ExactMatchFilter::utf8_value(
6149                "revision_id",
6150                revision_id.to_string(),
6151            )
6152        );
6153    }
6154
6155    #[tokio::test(flavor = "multi_thread")]
6156    async fn semantic_edit_target_rejects_stale_initial_revision_ids() {
6157        let db = temp_db().await;
6158        let initial = db
6159            .store_semantic(
6160                SemanticRecord::builder()
6161                    .concept("deploy_status")
6162                    .description("deployment succeeded")
6163                    .origin(Origin::CrossAgent)
6164                    .agent_id(agent())
6165                    .build()
6166                    .unwrap(),
6167            )
6168            .await
6169            .unwrap();
6170
6171        let corrected = db
6172            .correct_semantic(
6173                initial,
6174                SemanticUpdate {
6175                    description: Some("deployment succeeded after retry".into()),
6176                    reason: Some("postmortem correction".into()),
6177                    ..SemanticUpdate::with_metadata(agent(), initial)
6178                },
6179            )
6180            .await
6181            .unwrap();
6182
6183        let stale = db.semantic_edit_target(initial).await.unwrap_err();
6184        assert!(matches!(stale, HirnError::InvalidInput(_)));
6185
6186        let head = db.semantic_edit_target(corrected.id).await.unwrap();
6187        assert_eq!(head.id, corrected.id);
6188        assert_eq!(head.revision_operation, RevisionOperation::Correct);
6189    }
6190}