Skip to main content

hirn_engine/db/
episodic.rs

1use std::collections::{HashMap, HashSet};
2
3use futures::TryStreamExt;
4use hirn_core::RecallSnapshot;
5use hirn_core::revision::{LogicalMemoryId, RevisionId, RevisionOperation};
6
7use super::*;
8use crate::cached_graph_store::EdgeInsert;
9
10pub(super) const EPISODE_REMEMBER_MUTATION_KIND: &str = "episode_remember";
11
12#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
13struct EpisodeRememberEnvelope {
14    record_id: MemoryId,
15    namespace: Namespace,
16    agent_id: AgentId,
17    importance: f32,
18    timestamp_ms: u64,
19    content_preview: String,
20    edge_requests: Vec<EdgeInsert>,
21    temporal_edge_request: Option<EdgeInsert>,
22}
23
24fn encode_episode_remember_envelope(payload: &EpisodeRememberEnvelope) -> HirnResult<Vec<u8>> {
25    serde_json::to_vec(payload)
26        .map_err(|error| HirnError::storage(format!("episode envelope serialize: {error}")))
27}
28
29fn decode_episode_remember_envelope(
30    envelope: &hirn_storage::MutationEnvelopeRecord,
31) -> HirnResult<EpisodeRememberEnvelope> {
32    serde_json::from_slice(&envelope.payload)
33        .map_err(|error| HirnError::storage(format!("episode envelope deserialize: {error}")))
34}
35
36fn update_episode_envelope_temporal_edge(
37    envelope: &mut hirn_storage::MutationEnvelopeRecord,
38    temporal_edge_request: Option<EdgeInsert>,
39) -> HirnResult<()> {
40    let mut payload = decode_episode_remember_envelope(envelope)?;
41    payload.temporal_edge_request = temporal_edge_request;
42    envelope.payload = encode_episode_remember_envelope(&payload)?;
43    envelope.updated_at = Timestamp::now();
44    Ok(())
45}
46
47fn temporal_edge_request_for_arrival(
48    record_id: MemoryId,
49    arrival: &super::write_runtime::TemporalArrival,
50) -> Option<EdgeInsert> {
51    arrival.previous_id.map(|previous_id| EdgeInsert {
52        source: previous_id,
53        target: record_id,
54        relation: EdgeRelation::TemporalNext,
55        weight: 1.0,
56        metadata: temporal_next_metadata(arrival),
57    })
58}
59
60fn target_arrival_sequence(metadata: &Metadata) -> Option<i64> {
61    match metadata.get("target_arrival_sequence") {
62        Some(hirn_core::metadata::MetadataValue::Int(sequence)) => Some(*sequence),
63        _ => None,
64    }
65}
66
67fn temporal_next_metadata(arrival: &super::write_runtime::TemporalArrival) -> Metadata {
68    let mut metadata = Metadata::new();
69    metadata.insert("temporal_basis".into(), "arrival_order".into());
70    metadata.insert("temporal_partition".into(), "namespace".into());
71    if let Some(previous_sequence) = arrival.previous_sequence {
72        metadata.insert("source_arrival_sequence".into(), previous_sequence.into());
73    }
74    metadata.insert("target_arrival_sequence".into(), arrival.sequence.into());
75    metadata
76}
77
78fn apply_admission_decision(
79    record: &mut EpisodicRecord,
80    decision: crate::admission::AdmissionDecision,
81    realm: &str,
82) -> HirnResult<()> {
83    match decision {
84        crate::admission::AdmissionDecision::Accept {
85            importance_override,
86        } => {
87            if let Some(override_val) = importance_override {
88                record.importance = override_val;
89            }
90            Ok(())
91        }
92        crate::admission::AdmissionDecision::Reject { reason } => {
93            metrics::counter!(crate::metrics::ADMISSION_REJECTED_TOTAL, "realm" => realm.to_owned())
94                .increment(1);
95            Err(HirnError::InvalidInput(format!(
96                "admission rejected: {reason}"
97            )))
98        }
99        crate::admission::AdmissionDecision::Defer { until } => Err(HirnError::InvalidInput(
100            format!("admission deferred until {until}"),
101        )),
102        crate::admission::AdmissionDecision::Merge { target } => Err(HirnError::InvalidInput(
103            format!("admission: merge into {target}"),
104        )),
105    }
106}
107
108fn remember_status_for_admission(
109    decision: &crate::admission::AdmissionDecision,
110) -> crate::RememberStatus {
111    match decision {
112        crate::admission::AdmissionDecision::Accept { .. } => crate::RememberStatus::Accepted,
113        crate::admission::AdmissionDecision::Reject { .. } => crate::RememberStatus::Rejected,
114        crate::admission::AdmissionDecision::Defer { .. } => crate::RememberStatus::Deferred,
115        crate::admission::AdmissionDecision::Merge { .. } => crate::RememberStatus::Merged,
116    }
117}
118
119fn build_episode_remember_envelope(
120    record: &EpisodicRecord,
121    content_preview: &str,
122    edge_requests: &[EdgeInsert],
123) -> HirnResult<hirn_storage::MutationEnvelopeRecord> {
124    let timestamp_ms = u64::try_from(record.timestamp.timestamp_ms())
125        .map_err(|_| HirnError::storage("episode timestamp_ms was negative"))?;
126    let payload = EpisodeRememberEnvelope {
127        record_id: record.id,
128        namespace: record.namespace,
129        agent_id: record.provenance.created_by,
130        importance: record.importance,
131        timestamp_ms,
132        content_preview: content_preview.to_string(),
133        edge_requests: edge_requests.to_vec(),
134        temporal_edge_request: None,
135    };
136    let payload = encode_episode_remember_envelope(&payload)?;
137
138    Ok(hirn_storage::MutationEnvelopeRecord::pending(
139        format!("episode-remember:{}", record.id),
140        EPISODE_REMEMBER_MUTATION_KIND,
141        payload,
142    ))
143}
144
145fn build_episodic_scan_filter(filter: &EpisodicFilter) -> Option<String> {
146    let mut parts: Vec<String> = Vec::new();
147
148    if let Some(after) = &filter.after {
149        parts.push(format!("timestamp_ms > {}", after.timestamp_ms()));
150    }
151    if let Some(before) = &filter.before {
152        parts.push(format!("timestamp_ms < {}", before.timestamp_ms()));
153    }
154    if let Some(ns) = &filter.namespace {
155        parts.push(format!("namespace = '{}'", ns.as_str().replace('\'', "''")));
156    }
157    if let Some(event_type) = &filter.event_type {
158        parts.push(format!("event_type = '{event_type:?}'"));
159    }
160    if let Some(min_importance) = filter.min_importance {
161        parts.push(format!("importance >= {min_importance}"));
162    }
163    // Bi-temporal valid-time filter: event must have started at or before `valid_at`
164    // and must not have been superseded yet (valid_until IS NULL OR valid_until > valid_at).
165    if let Some(t) = &filter.valid_at {
166        let t_ms = t.timestamp_ms();
167        parts.push(format!("timestamp_ms <= {t_ms}"));
168        parts.push(format!(
169            "(valid_until_ms IS NULL OR valid_until_ms > {t_ms})"
170        ));
171    }
172
173    if parts.is_empty() {
174        None
175    } else {
176        Some(parts.join(" AND "))
177    }
178}
179
180pub(super) fn episodic_revision_is_newer(
181    candidate: &EpisodicRecord,
182    current: &EpisodicRecord,
183) -> bool {
184    candidate.version > current.version
185        || (candidate.version == current.version
186            && (candidate.created_at > current.created_at
187                || (candidate.created_at == current.created_at
188                    && candidate.revision_id > current.revision_id)))
189}
190
191pub(super) fn collapse_episodic_heads(
192    records: impl IntoIterator<Item = EpisodicRecord>,
193) -> HashMap<LogicalMemoryId, EpisodicRecord> {
194    let mut heads = HashMap::new();
195    for record in records {
196        heads
197            .entry(record.logical_memory_id)
198            .and_modify(|current| {
199                if episodic_revision_is_newer(&record, current) {
200                    *current = record.clone();
201                }
202            })
203            .or_insert(record);
204    }
205    heads
206}
207
208pub(super) fn episodic_snapshot_head_as_of(
209    history: &[EpisodicRecord],
210    cutoff: Timestamp,
211) -> Option<EpisodicRecord> {
212    history
213        .iter()
214        .filter(|record| record.timestamp <= cutoff)
215        .max_by(|left, right| {
216            left.version
217                .cmp(&right.version)
218                .then_with(|| left.created_at.cmp(&right.created_at))
219                .then_with(|| left.revision_id.cmp(&right.revision_id))
220        })
221        .cloned()
222}
223
224pub(super) fn episodic_snapshot_head_recorded_at_snapshot(
225    history: &[EpisodicRecord],
226    snapshot: super::semantic::ResolvedRecallSnapshot,
227) -> Option<EpisodicRecord> {
228    history
229        .iter()
230        .filter(|record| {
231            snapshot.contains_recorded_revision_for_chain(
232                record.logical_memory_id,
233                record.version,
234                record.created_at,
235                record.revision_id,
236            )
237        })
238        .max_by(|left, right| {
239            left.created_at
240                .cmp(&right.created_at)
241                .then_with(|| left.version.cmp(&right.version))
242                .then_with(|| left.revision_id.cmp(&right.revision_id))
243        })
244        .cloned()
245}
246
247impl HirnDB {
248    // ── Episodic Memory ─────────────────────────────────────────────────
249
250    /// Read a single episodic record from LanceDB by ID.
251    pub(super) async fn read_episodic_record(&self, id: MemoryId) -> HirnResult<EpisodicRecord> {
252        let opts = hirn_storage::store::ScanOptions {
253            exact_filter: Some(hirn_storage::store::ExactMatchFilter::utf8_value(
254                "id",
255                id.to_string(),
256            )),
257            limit: Some(1),
258            ..Default::default()
259        };
260        let batches = self
261            .storage_runtime
262            .scan(hirn_storage::datasets::episodic::DATASET_NAME, opts)
263            .await
264            .map_err(|e| HirnError::storage(e))?;
265        for batch in &batches {
266            let recs = hirn_storage::datasets::episodic::from_batch(batch)
267                .map_err(|e| HirnError::storage(e))?;
268            if let Some(rec) = recs.into_iter().next() {
269                return Ok(rec);
270            }
271        }
272        Err(HirnError::NotFound(format!("episodic record {id}")))
273    }
274
275    pub(super) async fn read_episodic_records_batch(
276        &self,
277        ids: &[MemoryId],
278    ) -> HirnResult<HashMap<MemoryId, EpisodicRecord>> {
279        if ids.is_empty() {
280            return Ok(HashMap::new());
281        }
282
283        const ID_SCAN_CHUNK: usize = 256;
284        let mut records = HashMap::with_capacity(ids.len());
285
286        for chunk in ids.chunks(ID_SCAN_CHUNK) {
287            let exact_filter = hirn_storage::store::ExactMatchFilter::utf8_values(
288                "id",
289                chunk.iter().map(ToString::to_string),
290            )
291            .expect("episodic batch chunks are non-empty");
292            let opts = hirn_storage::store::ScanOptions {
293                exact_filter: Some(exact_filter),
294                ..Default::default()
295            };
296
297            let mut batches = self
298                .storage_runtime
299                .scan_stream(hirn_storage::datasets::episodic::DATASET_NAME, opts)
300                .await
301                .map_err(HirnError::storage)?;
302
303            while let Some(batch) = batches.try_next().await.map_err(HirnError::storage)? {
304                let recs = hirn_storage::datasets::episodic::from_batch(&batch)
305                    .map_err(HirnError::storage)?;
306                for rec in recs {
307                    records.insert(rec.id, rec);
308                }
309            }
310        }
311
312        Ok(records)
313    }
314
315    pub(super) fn episodic_logical_exact_filter(
316        logical_memory_id: LogicalMemoryId,
317    ) -> hirn_storage::store::ExactMatchFilter {
318        hirn_storage::store::ExactMatchFilter::utf8_value(
319            "logical_memory_id",
320            logical_memory_id.to_string(),
321        )
322    }
323
324    pub(super) fn episodic_logical_exact_filter_many(
325        logical_memory_ids: &[LogicalMemoryId],
326    ) -> Option<hirn_storage::store::ExactMatchFilter> {
327        hirn_storage::store::ExactMatchFilter::utf8_values(
328            "logical_memory_id",
329            logical_memory_ids
330                .iter()
331                .map(ToString::to_string)
332                .collect::<Vec<_>>(),
333        )
334    }
335
336    fn cached_episodic_head(&self, logical_memory_id: LogicalMemoryId) -> Option<EpisodicRecord> {
337        self.episodic_head_cache_get(logical_memory_id)
338    }
339
340    fn cache_episodic_head(&self, record: &EpisodicRecord) {
341        if let Some(current) = self.cached_episodic_head(record.logical_memory_id) {
342            if !episodic_revision_is_newer(record, &current) {
343                return;
344            }
345        }
346
347        self.episodic_head_cache_put(record.clone());
348    }
349
350    fn evict_episodic_head(&self, logical_memory_id: LogicalMemoryId) {
351        self.episodic_head_cache_evict(logical_memory_id);
352    }
353
354    pub(super) async fn read_episodic_history(
355        &self,
356        logical_memory_id: LogicalMemoryId,
357    ) -> HirnResult<Vec<EpisodicRecord>> {
358        let mut batches = self
359            .storage_runtime
360            .scan_stream(
361                hirn_storage::datasets::episodic::DATASET_NAME,
362                hirn_storage::store::ScanOptions {
363                    exact_filter: Some(Self::episodic_logical_exact_filter(logical_memory_id)),
364                    order_by: Some(vec![
365                        hirn_storage::store::ScanOrdering::desc("version"),
366                        hirn_storage::store::ScanOrdering::desc("created_at_ms"),
367                        hirn_storage::store::ScanOrdering::desc("revision_id"),
368                    ]),
369                    ..Default::default()
370                },
371            )
372            .await
373            .map_err(HirnError::storage)?;
374
375        let mut history = Vec::new();
376        while let Some(batch) = batches.try_next().await.map_err(HirnError::storage)? {
377            let recs =
378                hirn_storage::datasets::episodic::from_batch(&batch).map_err(HirnError::storage)?;
379            history.extend(recs);
380        }
381
382        Ok(history)
383    }
384
385    async fn load_episodic_head_from_storage(
386        &self,
387        logical_memory_id: LogicalMemoryId,
388    ) -> HirnResult<EpisodicRecord> {
389        let mut batches = self
390            .storage_runtime
391            .scan_stream(
392                hirn_storage::datasets::episodic::DATASET_NAME,
393                hirn_storage::store::ScanOptions {
394                    exact_filter: Some(Self::episodic_logical_exact_filter(logical_memory_id)),
395                    order_by: Some(vec![
396                        hirn_storage::store::ScanOrdering::desc("version"),
397                        hirn_storage::store::ScanOrdering::desc("created_at_ms"),
398                        hirn_storage::store::ScanOrdering::desc("revision_id"),
399                    ]),
400                    limit: Some(1),
401                    ..Default::default()
402                },
403            )
404            .await
405            .map_err(HirnError::storage)?;
406
407        while let Some(batch) = batches.try_next().await.map_err(HirnError::storage)? {
408            let recs =
409                hirn_storage::datasets::episodic::from_batch(&batch).map_err(HirnError::storage)?;
410            if let Some(record) = recs.into_iter().next() {
411                return Ok(record);
412            }
413        }
414
415        Err(HirnError::NotFound(format!(
416            "episodic logical memory {logical_memory_id}"
417        )))
418    }
419
420    pub(super) async fn episodic_head_for_logical_id(
421        &self,
422        logical_memory_id: LogicalMemoryId,
423    ) -> HirnResult<EpisodicRecord> {
424        if let Some(record) = self.cached_episodic_head(logical_memory_id) {
425            return Ok(record);
426        }
427
428        match self
429            .load_episodic_head_from_storage(logical_memory_id)
430            .await
431        {
432            Ok(record) => {
433                self.cache_episodic_head(&record);
434                Ok(record)
435            }
436            Err(HirnError::NotFound(_)) => {
437                self.evict_episodic_head(logical_memory_id);
438                Err(HirnError::NotFound(format!(
439                    "episodic logical memory {logical_memory_id}"
440                )))
441            }
442            Err(error) => Err(error),
443        }
444    }
445
446    pub(super) async fn live_episodic_heads_for_logical_ids(
447        &self,
448        logical_memory_ids: &[LogicalMemoryId],
449    ) -> HirnResult<HashMap<LogicalMemoryId, EpisodicRecord>> {
450        let mut heads = HashMap::with_capacity(logical_memory_ids.len());
451        let mut missing = Vec::new();
452
453        for &logical_memory_id in logical_memory_ids {
454            if let Some(record) = self.cached_episodic_head(logical_memory_id) {
455                if record.is_live() {
456                    heads.insert(logical_memory_id, record);
457                }
458            } else {
459                missing.push(logical_memory_id);
460            }
461        }
462
463        let Some(exact_filter) = Self::episodic_logical_exact_filter_many(&missing) else {
464            return Ok(heads);
465        };
466
467        let mut batches = self
468            .storage_runtime
469            .scan_stream(
470                hirn_storage::datasets::episodic::DATASET_NAME,
471                hirn_storage::store::ScanOptions {
472                    exact_filter: Some(exact_filter),
473                    order_by: Some(vec![
474                        hirn_storage::store::ScanOrdering::desc("version"),
475                        hirn_storage::store::ScanOrdering::desc("created_at_ms"),
476                        hirn_storage::store::ScanOrdering::desc("revision_id"),
477                    ]),
478                    ..Default::default()
479                },
480            )
481            .await
482            .map_err(|error| {
483                HirnError::storage(format!(
484                    "failed to scan current episodic heads dataset `episodic`: {error}"
485                ))
486            })?;
487
488        let mut loaded = HashMap::with_capacity(missing.len());
489        while let Some(batch) = batches.try_next().await.map_err(|error| {
490            HirnError::storage(format!(
491                "failed to stream current episodic heads dataset `episodic`: {error}"
492            ))
493        })? {
494            let recs = hirn_storage::datasets::episodic::from_batch(&batch).map_err(|error| {
495                HirnError::storage(format!(
496                    "failed to decode current episodic heads dataset `episodic`: {error}"
497                ))
498            })?;
499            for record in recs {
500                loaded
501                    .entry(record.logical_memory_id)
502                    .and_modify(|current| {
503                        if episodic_revision_is_newer(&record, current) {
504                            *current = record.clone();
505                        }
506                    })
507                    .or_insert(record);
508            }
509        }
510
511        for (logical_memory_id, record) in loaded {
512            self.cache_episodic_head(&record);
513            if record.is_live() {
514                heads.insert(logical_memory_id, record);
515            }
516        }
517
518        for &logical_memory_id in &missing {
519            if !heads.contains_key(&logical_memory_id) {
520                self.evict_episodic_head(logical_memory_id);
521            }
522        }
523
524        Ok(heads)
525    }
526
527    async fn current_episodic_heads(&self) -> HirnResult<HashMap<LogicalMemoryId, EpisodicRecord>> {
528        let mut batches = self
529            .storage_runtime
530            .scan_stream(
531                hirn_storage::datasets::episodic::DATASET_NAME,
532                hirn_storage::store::ScanOptions::default(),
533            )
534            .await
535            .map_err(HirnError::storage)?;
536
537        let mut records = Vec::new();
538        while let Some(batch) = batches.try_next().await.map_err(HirnError::storage)? {
539            let recs =
540                hirn_storage::datasets::episodic::from_batch(&batch).map_err(HirnError::storage)?;
541            records.extend(recs);
542        }
543
544        Ok(collapse_episodic_heads(records))
545    }
546
547    pub(super) async fn episodic_revision_for_logical_id_at_snapshot(
548        &self,
549        logical_memory_id: LogicalMemoryId,
550        snapshot: RecallSnapshot,
551    ) -> HirnResult<Option<EpisodicRecord>> {
552        let history = self.read_episodic_history(logical_memory_id).await?;
553        if history.is_empty() {
554            return Ok(None);
555        }
556
557        let resolved_snapshot = self.resolve_recall_snapshot(snapshot).await?;
558        let revision = match resolved_snapshot {
559            super::semantic::ResolvedRecallSnapshot::Observed(cutoff) => {
560                episodic_snapshot_head_as_of(&history, cutoff)
561            }
562            recorded_snapshot => {
563                episodic_snapshot_head_recorded_at_snapshot(&history, recorded_snapshot)
564            }
565        };
566
567        Ok(revision)
568    }
569
570    pub(super) async fn episodic_edit_target(&self, id: MemoryId) -> HirnResult<EpisodicRecord> {
571        let record = self.read_episodic_record(id).await?;
572        let head = self
573            .episodic_head_for_logical_id(record.logical_memory_id)
574            .await?;
575
576        if head.is_live() {
577            Ok(head)
578        } else {
579            Err(HirnError::InvalidInput(format!(
580                "episodic logical memory {} is not live",
581                head.logical_memory_id
582            )))
583        }
584    }
585
586    pub(super) async fn append_episodic_record(&self, record: &EpisodicRecord) -> HirnResult<()> {
587        let dims = self.config.embedding_dimensions.as_usize();
588        let batch = hirn_storage::datasets::episodic::to_batch(std::slice::from_ref(record), dims)
589            .map_err(HirnError::storage)?;
590        self.storage_runtime
591            .append(hirn_storage::datasets::episodic::DATASET_NAME, batch)
592            .await
593            .map_err(HirnError::storage)?;
594        self.cache_episodic_head(record);
595        Ok(())
596    }
597
598    pub(super) async fn append_episodic_successor<F>(
599        &self,
600        current: &EpisodicRecord,
601        operation: RevisionOperation,
602        reason: Option<String>,
603        update: F,
604    ) -> HirnResult<EpisodicRecord>
605    where
606        F: FnOnce(&mut EpisodicRecord),
607    {
608        let now = Timestamp::now();
609        let new_id = MemoryId::new();
610
611        let mut next = current.clone();
612        next.id = new_id;
613        next.revision_id = RevisionId::from_memory_id(new_id);
614        next.version = current.version + 1;
615        next.revision_operation = operation;
616        next.revision_reason = reason;
617        next.revision_causation_id = Some(current.id);
618        next.created_at = now;
619        next.updated_at = now;
620        next.last_accessed = now;
621        next.superseded_by = None;
622
623        update(&mut next);
624
625        if next.content != current.content || next.multi_content != current.multi_content {
626            let embedding_text = next
627                .multi_content
628                .as_ref()
629                .map(|content| content.text_for_embedding().into_owned())
630                .unwrap_or_else(|| next.content.clone());
631            next.embedding = Some(self.embed_text(&embedding_text).await?);
632        }
633
634        self.cached_graph()
635            .add_node(
636                next.id,
637                Layer::Episodic,
638                next.importance,
639                next.timestamp,
640                next.namespace,
641            )
642            .await?;
643
644        if let Err(error) = self
645            .rebind_graph_edges_excluding(current.id, next.id, &[EdgeRelation::SimilarTo])
646            .await
647        {
648            let _ = self.cached_graph().remove_node(next.id).await;
649            return Err(error);
650        }
651
652        if next.is_live() {
653            if let Some(ref emb) = next.embedding {
654                let candidates = self.find_similarity_candidates(emb).await;
655                if let Err(error) = self.apply_similarity_edges(next.id, &candidates).await {
656                    let _ = self.cached_graph().remove_node(next.id).await;
657                    return Err(error);
658                }
659            }
660        }
661
662        if let Err(error) = self.append_episodic_record(&next).await {
663            let _ = self.cached_graph().remove_node(next.id).await;
664            return Err(error);
665        }
666
667        if let Err(error) = self.cached_graph().remove_node(current.id).await {
668            tracing::warn!(id = %current.id, error = %error, "failed to remove superseded episodic graph node");
669        }
670
671        Ok(next)
672    }
673
674    /// Delete + re-append an episodic record (LanceDB update pattern).
675    pub(super) async fn write_episodic_record(&self, record: &EpisodicRecord) -> HirnResult<()> {
676        let id = record.id;
677        let exact_filter = hirn_storage::store::ExactMatchFilter::utf8_value("id", id.to_string());
678        self.storage_runtime
679            .delete_exact(
680                hirn_storage::datasets::episodic::DATASET_NAME,
681                &exact_filter,
682            )
683            .await
684            .map_err(|e| HirnError::storage(e))?;
685        let dims = self.config.embedding_dimensions.as_usize();
686        let batch = hirn_storage::datasets::episodic::to_batch(std::slice::from_ref(record), dims)
687            .map_err(|e| HirnError::storage(e))?;
688        self.storage_runtime
689            .append(hirn_storage::datasets::episodic::DATASET_NAME, batch)
690            .await
691            .map_err(|e| HirnError::storage(e))?;
692        self.evict_episodic_head(record.logical_memory_id);
693        Ok(())
694    }
695
696    /// Store an episodic record. Returns the assigned ID.
697    ///
698    /// If the record has an embedding, it is also inserted into the HNSW index.
699    /// Also adds a node in the property graph and detects auto-edges.
700    ///
701    /// When an admission pipeline is configured, the record is evaluated before
702    /// writing. Rejected candidates return `HirnError::InvalidInput` with the
703    /// rejection reason.
704    pub(crate) async fn remember(&self, record: EpisodicRecord) -> HirnResult<MemoryId> {
705        self.remember_with_explanation(record)
706            .await
707            .map(|(id, _)| id)
708            .map_err(|failure| failure.error)
709    }
710
711    pub(crate) async fn remember_with_explanation(
712        &self,
713        record: EpisodicRecord,
714    ) -> Result<(MemoryId, crate::RememberExplanation), crate::RememberFailure> {
715        let realm = self.config.default_realm.clone();
716        let event_namespace = record.namespace;
717        let event_agent = record.provenance.created_by;
718        let start = std::time::Instant::now();
719        match self.remember_inner_with_explanation(record, false).await {
720            Ok((id, explanation)) => {
721                metrics::counter!(crate::metrics::REMEMBER_TOTAL, "realm" => realm.clone(), "status" => "success").increment(1);
722                metrics::histogram!(crate::metrics::STORE_DURATION_SECONDS, "realm" => realm)
723                    .record(start.elapsed().as_secs_f64());
724                Ok((id, explanation))
725            }
726            Err(failure) => {
727                let status = if matches!(
728                    failure.error,
729                    HirnError::AccessDenied(_) | HirnError::InvalidInput(_)
730                ) {
731                    "client_error"
732                } else {
733                    "server_error"
734                };
735                metrics::counter!(crate::metrics::REMEMBER_TOTAL, "realm" => realm.clone(), "status" => status).increment(1);
736                self.emit_scoped(
737                    event_namespace.as_str(),
738                    event_agent.as_str(),
739                    crate::event::MemoryEvent::Error {
740                        operation: "remember".to_string(),
741                        message: failure.error.to_string(),
742                    },
743                )
744                .await;
745                Err(failure)
746            }
747        }
748    }
749
750    /// Store an episodic record bypassing admission control.
751    ///
752    /// Useful for data migration, admin writes, or replaying events.
753    pub async fn remember_bypass_admission(&self, record: EpisodicRecord) -> HirnResult<MemoryId> {
754        self.remember_inner(record, true).await
755    }
756
757    /// Store multiple episodic records in a single batch. Returns per-record results.
758    ///
759    /// All records must belong to the same agent (Cedar authorization is checked
760    /// once per unique namespace, not per record). Admission is evaluated
761    /// per-record — some may be rejected while others succeed. Embedding,
762    /// LanceDB append, and graph updates are batched for throughput.
763    pub(crate) async fn batch_remember(
764        &self,
765        records: Vec<EpisodicRecord>,
766    ) -> Vec<HirnResult<MemoryId>> {
767        if records.is_empty() {
768            return Vec::new();
769        }
770
771        let n = records.len();
772        let realm = self.config.default_realm.clone();
773        let record_batch_stage = |stage: &'static str, elapsed: std::time::Duration| {
774            metrics::histogram!(
775                crate::metrics::BATCH_REMEMBER_STAGE_DURATION_SECONDS,
776                "realm" => realm.clone(),
777                "stage" => stage,
778            )
779            .record(elapsed.as_secs_f64());
780        };
781
782        // ── 1. Validate all records share the same agent_id ─────────────
783        let agent_id = records[0].provenance.created_by;
784        for rec in records.iter().skip(1) {
785            if rec.provenance.created_by != agent_id {
786                return (0..n)
787                    .map(|_| {
788                        Err(HirnError::InvalidInput(
789                            "batch_remember: all records must have the same agent_id".into(),
790                        ))
791                    })
792                    .collect();
793            }
794        }
795
796        // ── 2. Cedar enforce once per unique namespace ──────────────────
797        {
798            let stage_start = std::time::Instant::now();
799            let mut checked_namespaces = HashSet::new();
800            for rec in &records {
801                if checked_namespaces.insert(rec.namespace) {
802                    if let Err(e) = self
803                        .enforce(
804                            agent_id.as_str(),
805                            crate::policy::Action::Remember,
806                            &realm,
807                            rec.namespace.as_str(),
808                        )
809                        .await
810                    {
811                        let msg = format!("{e}");
812                        return (0..n)
813                            .map(|_| Err(HirnError::AccessDenied(msg.clone())))
814                            .collect();
815                    }
816                }
817            }
818            record_batch_stage("authorize", stage_start.elapsed());
819        }
820
821        // Per-record result slots.
822        let mut results: Vec<Option<HirnResult<MemoryId>>> = (0..n).map(|_| None).collect();
823
824        // ── 3. Admission per record ─────────────────────────────────────
825        let stage_start = std::time::Instant::now();
826        let mut admitted: Vec<(usize, EpisodicRecord)> = Vec::with_capacity(n);
827        for (idx, mut rec) in records.into_iter().enumerate() {
828            match self.admission_runtime().evaluate_record(&rec).await {
829                Ok(Some(result)) => {
830                    let candidate = crate::admission::MemoryCandidate::from_record(&rec);
831                    let controllers_consulted: Vec<String> = result
832                        .verdicts
833                        .iter()
834                        .map(|v| v.controller.clone())
835                        .collect();
836                    let decision_str = format!("{:?}", result.decision);
837                    self.emit_scoped(
838                        rec.namespace.as_str(),
839                        rec.provenance.created_by.as_str(),
840                        MemoryEvent::AdmissionEvaluated {
841                            candidate_id: candidate.id,
842                            decision: decision_str,
843                            controllers_consulted,
844                        },
845                    )
846                    .await;
847
848                    match apply_admission_decision(&mut rec, result.decision, &realm) {
849                        Ok(()) => admitted.push((idx, rec)),
850                        Err(error) => results[idx] = Some(Err(error)),
851                    }
852                }
853                Ok(None) => admitted.push((idx, rec)),
854                Err(error) => results[idx] = Some(Err(error)),
855            }
856        }
857        record_batch_stage("admission", stage_start.elapsed());
858
859        if admitted.is_empty() {
860            return results
861                .into_iter()
862                .map(|r| r.unwrap_or_else(|| Err(HirnError::InvalidInput("unreachable".into()))))
863                .collect();
864        }
865
866        // ── 4. Batch auto-embed ─────────────────────────────────────────
867        let stage_start = std::time::Instant::now();
868        // Collect indices (into `admitted`) and text for records needing embedding.
869        let mut need_embed: Vec<(usize, String)> = Vec::new();
870        for (i, (_, rec)) in admitted.iter().enumerate() {
871            if rec.embedding.is_none() {
872                if let Some(ref mc) = rec.multi_content {
873                    need_embed.push((i, mc.text_for_embedding().into_owned()));
874                }
875            }
876        }
877
878        if !need_embed.is_empty() {
879            let texts: Vec<&str> = need_embed.iter().map(|(_, t)| t.as_str()).collect();
880            let embedding_slots = if let Some(embedder) = self.provider_runtime().embedder() {
881                match embedder.embed(&texts).await {
882                    Ok(embs) => embs.into_iter().map(Some).collect(),
883                    Err(e) => {
884                        let error_display = e.to_string();
885                        if let Some(partial) = e.into_partial_embedding_batch() {
886                            let mut failures = 0u64;
887
888                            tracing::warn!(
889                                completed = partial.completed(),
890                                failed = partial.failed(),
891                                count = texts.len(),
892                                "batch_remember: preserving partial embed successes and requeuing only failed items"
893                            );
894
895                            for (local_idx, maybe_embedding) in
896                                partial.embeddings.iter().enumerate()
897                            {
898                                if maybe_embedding.is_none() {
899                                    failures += 1;
900                                    let admitted_idx = need_embed[local_idx].0;
901                                    self.write_runtime()
902                                        .enqueue_pending_embed(admitted[admitted_idx].1.id);
903                                }
904                            }
905
906                            if failures > 0 {
907                                metrics::counter!(
908                                    crate::metrics::PROVIDER_FALLBACK_TOTAL,
909                                    "realm" => realm.clone(),
910                                    "provider_type" => "embed"
911                                )
912                                .increment(failures);
913                            }
914
915                            partial.embeddings
916                        } else {
917                            // Provider fallback: log warning and continue without embeddings.
918                            tracing::warn!(
919                                error = %error_display,
920                                count = texts.len(),
921                                "batch_remember: embed failed, storing without embeddings (provider fallback)"
922                            );
923                            metrics::counter!(
924                                crate::metrics::PROVIDER_FALLBACK_TOTAL,
925                                "realm" => realm.clone(),
926                                "provider_type" => "embed"
927                            )
928                            .increment(texts.len() as u64);
929                            // Enqueue all records needing embedding for background retry.
930                            for (idx, _) in &need_embed {
931                                self.write_runtime()
932                                    .enqueue_pending_embed(admitted[*idx].1.id);
933                            }
934                            vec![None; texts.len()]
935                        }
936                    }
937                }
938            } else {
939                let pseudo = hirn_provider::PseudoEmbedder::new(self.embedding_dims());
940                match pseudo.embed(&texts).await {
941                    Ok(embs) => embs.into_iter().map(Some).collect(),
942                    Err(_) => vec![None; texts.len()],
943                }
944            };
945
946            for (i, (admitted_idx, _)) in need_embed.iter().enumerate() {
947                if let Some(Some(emb)) = embedding_slots.get(i) {
948                    admitted[*admitted_idx].1.embedding = Some(emb.vector.clone());
949                }
950            }
951        }
952        record_batch_stage("embedding", stage_start.elapsed());
953
954        // ── 5. RPE gating + text retention + dimension validation ────
955        let stage_start = std::time::Instant::now();
956        // Compute RPE per-record before text retention (need content for slow path).
957        // Capture content for slow-path write intelligence before text is stripped.
958        struct WritePathInfo {
959            content: Option<String>,     // Some = slow path, None = fast path
960            max_similarity: Option<f32>, // For interference tracking
961            namespace: hirn_core::types::Namespace,
962        }
963        let mut write_path_infos: Vec<Option<WritePathInfo>> = (0..n).map(|_| None).collect();
964
965        if self.config.rpe_enabled {
966            let mut partition_stats: HashMap<
967                super::write_path::RpePartitionKey,
968                super::write_path::RunningRpeStats,
969            > = HashMap::new();
970            let mut partition_deltas: HashMap<
971                super::write_path::RpePartitionKey,
972                super::write_path::RunningRpeStats,
973            > = HashMap::new();
974
975            // Step 1: collect (admitted_index, orig_idx, embedding) — immutable borrow.
976            let indexed_embeddings: Vec<(usize, usize, Vec<f32>)> = admitted
977                .iter()
978                .enumerate()
979                .filter_map(|(ai, (orig_idx, rec))| {
980                    rec.embedding.as_ref().map(|e| (ai, *orig_idx, e.clone()))
981                })
982                .collect();
983
984            // Step 2: batch vector search — 3 calls total instead of 3×N serial calls.
985            // Collect per-partition circuit breakers; skip the batch if any is open.
986            let batch_breakers: Vec<(usize, std::sync::Arc<super::write_path::RpeCircuitBreaker>)> =
987                indexed_embeddings
988                    .iter()
989                    .map(|(ai, _, _)| {
990                        let ns = admitted[*ai].1.namespace;
991                        let key = self.write_runtime().rpe_partition_key(
992                            ns,
993                            &self.rpe_model_id(),
994                            hirn_core::types::Layer::Episodic,
995                        );
996                        (*ai, self.write_runtime().rpe_circuit_breaker_for(&key))
997                    })
998                    .collect();
999            let any_circuit_open = batch_breakers.iter().any(|(_, b)| b.is_open());
1000
1001            let max_sims: Vec<f32> = if indexed_embeddings.is_empty() {
1002                Vec::new()
1003            } else if any_circuit_open {
1004                tracing::warn!(
1005                    count = indexed_embeddings.len(),
1006                    "RPE circuit open — batch vector search skipped, defaulting to max similarity"
1007                );
1008                vec![1.0_f32; indexed_embeddings.len()]
1009            } else {
1010                let embeddings: Vec<Vec<f32>> =
1011                    indexed_embeddings.iter().map(|(_, _, e)| e.clone()).collect();
1012                match super::write_path::batch_vector_search_max_sim(
1013                    self.storage_backend(),
1014                    &embeddings,
1015                    self.config.rpe_similarity_search_limit,
1016                )
1017                .await
1018                {
1019                    None => Vec::new(),
1020                    Some(result) => {
1021                        // Update per-partition circuit breakers from the batch outcome.
1022                        for (_, breaker) in &batch_breakers {
1023                            if result.had_storage_error {
1024                                breaker.record_failure(super::write_path::RPE_CIRCUIT_OPEN_SECS);
1025                            } else {
1026                                breaker.record_success();
1027                            }
1028                        }
1029                        result.max_sims
1030                    }
1031                }
1032            };
1033
1034            // Step 3: compute z-score per record and apply RPE decisions — mutable borrow.
1035            for ((ai, orig_idx, _), max_sim) in
1036                indexed_embeddings.iter().zip(max_sims.iter())
1037            {
1038                let (_, rec) = &mut admitted[*ai];
1039                let key = self
1040                    .write_runtime()
1041                    .rpe_partition_key(rec.namespace, &self.rpe_model_id(), hirn_core::types::Layer::Episodic);
1042                let stats = partition_stats
1043                    .entry(key.clone())
1044                    .or_insert_with(|| self.write_runtime().snapshot_rpe_stats(&key));
1045
1046                let max_sim = *max_sim;
1047                let distance = 1.0_f32 - max_sim;
1048                let z_score = stats.z_score(f64::from(distance)) as f32;
1049                // Feed current distance into running population (same ordering as serial path).
1050                stats.update(f64::from(distance));
1051                let rpe_score = (distance * (1.0 + z_score)).clamp(0.0, 2.0);
1052                let is_fast_path = rpe_score < self.config.rpe_fast_path_threshold;
1053
1054                let rpe = super::write_path::RpeResult {
1055                    score: rpe_score,
1056                    max_similarity: max_sim,
1057                    is_fast_path,
1058                };
1059                self.write_runtime().record_rpe_routing_metric(
1060                    &key,
1061                    &rpe,
1062                    self.config.rpe_fast_path_threshold,
1063                );
1064                partition_deltas
1065                    .entry(key)
1066                    .or_default()
1067                    .update(f64::from(distance));
1068                if is_fast_path {
1069                    rec.importance = 0.3 + 0.2 * rpe_score;
1070                    tracing::debug!(id = %rec.id, rpe_score, "batch RPE fast-path");
1071                }
1072                write_path_infos[*orig_idx] = Some(WritePathInfo {
1073                    content: if is_fast_path {
1074                        None
1075                    } else {
1076                        Some(rec.content.clone())
1077                    },
1078                    max_similarity: Some(max_sim),
1079                    namespace: rec.namespace,
1080                });
1081            }
1082            for (key, delta) in partition_deltas {
1083                self.write_runtime().merge_rpe_stats(&key, &delta);
1084            }
1085        } else {
1086            // RPE disabled → always slow path (same as remember_inner behavior).
1087            for (orig_idx, rec) in &admitted {
1088                write_path_infos[*orig_idx] = Some(WritePathInfo {
1089                    content: Some(rec.content.clone()),
1090                    max_similarity: None,
1091                    namespace: rec.namespace,
1092                });
1093            }
1094        }
1095
1096        // Validate embedding dimensions before mutating records.
1097        for (orig_idx, rec) in &admitted {
1098            if let Some(ref emb) = rec.embedding {
1099                if emb.len() != self.config.embedding_dimensions.as_usize() {
1100                    results[*orig_idx] = Some(Err(HirnError::InvalidInput(format!(
1101                        "embedding dimension mismatch: expected {}, got {}",
1102                        self.config.embedding_dimensions.as_usize(),
1103                        emb.len()
1104                    ))));
1105                }
1106            }
1107        }
1108
1109        // Filter out records that failed dimension validation.
1110        let mut admitted: Vec<(usize, EpisodicRecord)> = admitted
1111            .into_iter()
1112            .filter(|(idx, _)| results[*idx].is_none())
1113            .collect();
1114
1115        // Apply text retention policy (after validation, before storage).
1116        for (_orig_idx, rec) in &mut admitted {
1117            match self.config.text_retention {
1118                hirn_core::TextRetention::Full => {}
1119                hirn_core::TextRetention::SummaryOnly => {
1120                    rec.content = String::new();
1121                }
1122                hirn_core::TextRetention::None => {
1123                    rec.content = String::new();
1124                    rec.summary = String::new();
1125                }
1126            }
1127        }
1128
1129        if admitted.is_empty() {
1130            return results
1131                .into_iter()
1132                .map(|r| r.unwrap_or_else(|| Err(HirnError::InvalidInput("unreachable".into()))))
1133                .collect();
1134        }
1135        record_batch_stage("prepare", stage_start.elapsed());
1136
1137        // ── 6. Per-record processing (graph, blobs, edges) ─────────────
1138        struct PreparedRecord {
1139            idx: usize,
1140            record: EpisodicRecord,
1141            content_preview: String,
1142            edge_requests: Vec<EdgeInsert>,
1143            episode_envelope: hirn_storage::MutationEnvelopeRecord,
1144        }
1145        let mut prepared: Vec<PreparedRecord> = Vec::with_capacity(admitted.len());
1146        let stage_start = std::time::Instant::now();
1147        let fallback_entity_candidates = if admitted
1148            .iter()
1149            .any(|(_, rec)| rec.embedding.is_none() && !rec.entities.is_empty())
1150        {
1151            match self.fetch_recent_entity_candidate_records().await {
1152                Ok(records) => Some(records),
1153                Err(error) => {
1154                    tracing::warn!(error = %error, "batch_remember: failed to prefetch entity-edge candidates; falling back to per-record scan");
1155                    None
1156                }
1157            }
1158        } else {
1159            None
1160        };
1161        let mut unique_embeddings: Vec<Vec<f32>> = Vec::new();
1162        let mut unique_embedding_result_indices: HashMap<Vec<u32>, usize> = HashMap::new();
1163        let mut record_embedding_result_indices: HashMap<MemoryId, usize> = HashMap::new();
1164        let mut embedded_candidate_ids = Vec::new();
1165        let mut embedded_candidate_id_set = HashSet::new();
1166        for (_idx, rec) in &admitted {
1167            let Some(embedding) = rec.embedding.as_deref() else {
1168                continue;
1169            };
1170            let embedding_key: Vec<u32> = embedding.iter().map(|value| value.to_bits()).collect();
1171            let result_index =
1172                if let Some(&index) = unique_embedding_result_indices.get(&embedding_key) {
1173                    index
1174                } else {
1175                    let index = unique_embeddings.len();
1176                    unique_embeddings.push(embedding.to_vec());
1177                    unique_embedding_result_indices.insert(embedding_key, index);
1178                    index
1179                };
1180            record_embedding_result_indices.insert(rec.id, result_index);
1181        }
1182        let embedded_auto_edge_candidate_results = self
1183            .find_auto_edge_candidates_many(&unique_embeddings)
1184            .await;
1185        for (_idx, rec) in &admitted {
1186            let Some(&result_index) = record_embedding_result_indices.get(&rec.id) else {
1187                continue;
1188            };
1189            let Some(candidates) = embedded_auto_edge_candidate_results.get(result_index) else {
1190                continue;
1191            };
1192            for &(uid, _sim) in candidates {
1193                let candidate_id = MemoryId::from_ulid(ulid::Ulid(uid));
1194                if candidate_id != rec.id && embedded_candidate_id_set.insert(candidate_id) {
1195                    embedded_candidate_ids.push(candidate_id);
1196                }
1197            }
1198        }
1199        let prefetched_embedded_candidate_records = if embedded_candidate_ids.is_empty() {
1200            None
1201        } else {
1202            let include_entities = admitted.iter().any(|(_, rec)| !rec.entities.is_empty());
1203            match self
1204                .fetch_hydrated_candidate_records_by_ids(&embedded_candidate_ids, include_entities)
1205                .await
1206            {
1207                Ok(records_by_id) => Some(records_by_id),
1208                Err(error) => {
1209                    tracing::warn!(error = %error, "batch_remember: failed to prefetch embedded auto-edge candidate records; falling back to per-record hydration");
1210                    None
1211                }
1212            }
1213        };
1214        record_batch_stage("auto_edge_prefetch", stage_start.elapsed());
1215
1216        let stage_start = std::time::Instant::now();
1217
1218        for (idx, mut rec) in admitted {
1219            let namespace = rec.namespace;
1220            let content_preview = rec.content.chars().take(120).collect::<String>();
1221            let entities: Vec<String> = rec.entities.iter().map(|e| e.name.clone()).collect();
1222
1223            // Blob extraction.
1224            if let Some(ref mc) = rec.multi_content {
1225                match self
1226                    .extract_and_store_resources(namespace, rec.provenance.created_by, mc)
1227                    .await
1228                {
1229                    Ok(extracted) => {
1230                        rec.multi_content = Some(extracted.content);
1231                        rec.provenance
1232                            .evidence_links
1233                            .extend(extracted.evidence_links);
1234                    }
1235                    Err(e) => {
1236                        results[idx] = Some(Err(e));
1237                        continue;
1238                    }
1239                }
1240            }
1241            let edge_requests = match self
1242                .plan_auto_episode_edge_requests(
1243                    rec.id,
1244                    rec.namespace,
1245                    rec.embedding.as_deref(),
1246                    &rec.content,
1247                    &entities,
1248                    record_embedding_result_indices
1249                        .get(&rec.id)
1250                        .and_then(|&result_index| {
1251                            embedded_auto_edge_candidate_results
1252                                .get(result_index)
1253                                .map(Vec::as_slice)
1254                        }),
1255                    prefetched_embedded_candidate_records.as_ref(),
1256                    fallback_entity_candidates.as_deref(),
1257                )
1258                .await
1259            {
1260                Ok(edge_requests) => edge_requests,
1261                Err(error) => {
1262                    results[idx] = Some(Err(error));
1263                    continue;
1264                }
1265            };
1266            let episode_envelope =
1267                match build_episode_remember_envelope(&rec, &content_preview, &edge_requests) {
1268                    Ok(envelope) => envelope,
1269                    Err(error) => {
1270                        results[idx] = Some(Err(error));
1271                        continue;
1272                    }
1273                };
1274
1275            prepared.push(PreparedRecord {
1276                idx,
1277                record: rec,
1278                content_preview,
1279                edge_requests,
1280                episode_envelope,
1281            });
1282        }
1283
1284        let envelope_list: Vec<_> = prepared.iter().map(|r| r.episode_envelope.clone()).collect();
1285        let graph_nodes = prepared
1286            .iter()
1287            .map(|prepared_record| crate::graph::GraphNodeData {
1288                id: prepared_record.record.id,
1289                layer: Layer::Episodic,
1290                importance: prepared_record.record.importance,
1291                created_at: prepared_record.record.timestamp,
1292                namespace: prepared_record.record.namespace,
1293                access_count: 0,
1294            })
1295            .collect::<Vec<_>>();
1296
1297        // PERF-4: append_mutation_envelopes (mutation_envelopes dataset) and
1298        // add_nodes (graph_nodes dataset) are independent — run them concurrently.
1299        // Both must succeed before we proceed to the episodic append (stage 7).
1300        let (envelope_result, node_result) = tokio::join!(
1301            hirn_storage::append_mutation_envelopes(self.storage_backend(), &envelope_list),
1302            self.cached_graph().add_nodes(&graph_nodes),
1303        );
1304        match (envelope_result, node_result) {
1305            (Ok(()), Ok(())) => {}
1306            (Err(error), Ok(())) => {
1307                // Envelope write failed; roll back graph nodes we just added.
1308                for node in &graph_nodes {
1309                    let _ = self.cached_graph().remove_node(node.id).await;
1310                }
1311                let message = error.to_string();
1312                for record in &prepared {
1313                    results[record.idx] = Some(Err(HirnError::storage(message.clone())));
1314                }
1315                record_batch_stage("graph_prepare", stage_start.elapsed());
1316                return results
1317                    .into_iter()
1318                    .map(|r| {
1319                        r.unwrap_or_else(|| {
1320                            Err(HirnError::storage("episode envelope append failed"))
1321                        })
1322                    })
1323                    .collect();
1324            }
1325            (Ok(()), Err(error)) => {
1326                let message = error.to_string();
1327                for prepared_record in &prepared {
1328                    results[prepared_record.idx] =
1329                        Some(Err(HirnError::storage(message.clone())));
1330                }
1331                record_batch_stage("graph_prepare", stage_start.elapsed());
1332                return results
1333                    .into_iter()
1334                    .map(|r| {
1335                        r.unwrap_or_else(|| {
1336                            Err(HirnError::storage("graph node batch persist failed"))
1337                        })
1338                    })
1339                    .collect();
1340            }
1341            (Err(env_err), Err(node_err)) => {
1342                tracing::warn!(
1343                    node_error = %node_err,
1344                    "batch_remember: add_nodes also failed during envelope error"
1345                );
1346                let message = env_err.to_string();
1347                for record in &prepared {
1348                    results[record.idx] = Some(Err(HirnError::storage(message.clone())));
1349                }
1350                record_batch_stage("graph_prepare", stage_start.elapsed());
1351                return results
1352                    .into_iter()
1353                    .map(|r| {
1354                        r.unwrap_or_else(|| {
1355                            Err(HirnError::storage("episode envelope append failed"))
1356                        })
1357                    })
1358                    .collect();
1359            }
1360        }
1361
1362        let edge_request_batches = prepared
1363            .iter()
1364            .map(|prepared_record| {
1365                (
1366                    prepared_record.record.namespace,
1367                    prepared_record.record.provenance.created_by,
1368                    prepared_record.edge_requests.as_slice(),
1369                )
1370            })
1371            .collect::<Vec<_>>();
1372        if let Err(error) = self
1373            .apply_episode_edge_request_batches(&edge_request_batches)
1374            .await
1375        {
1376            let message = error.to_string();
1377            for prepared_record in &prepared {
1378                if let Err(cleanup_err) = self
1379                    .cached_graph()
1380                    .remove_node(prepared_record.record.id)
1381                    .await
1382                {
1383                    tracing::warn!(
1384                        id = %prepared_record.record.id,
1385                        error = %cleanup_err,
1386                        "batch_remember: failed to remove graph node after batched edge application error"
1387                    );
1388                }
1389                results[prepared_record.idx] =
1390                    Some(Err(HirnError::StorageError(message.clone().into())));
1391            }
1392            record_batch_stage("graph_prepare", stage_start.elapsed());
1393            return results
1394                .into_iter()
1395                .map(|r| {
1396                    r.unwrap_or_else(|| {
1397                        Err(HirnError::storage(
1398                            "batched episode edge application failed",
1399                        ))
1400                    })
1401                })
1402                .collect();
1403        }
1404        let mut prepared = prepared;
1405        record_batch_stage("graph_prepare", stage_start.elapsed());
1406
1407        // ── 7. Single LanceDB append ────────────────────────────────────
1408        let stage_start = std::time::Instant::now();
1409        if !prepared.is_empty() {
1410            let lance_records = prepared
1411                .iter()
1412                .map(|record| record.record.clone())
1413                .collect::<Vec<_>>();
1414            let dims = self.config.embedding_dimensions.as_usize();
1415            match hirn_storage::datasets::episodic::to_batch(&lance_records, dims) {
1416                Ok(batch) => {
1417                    if let Err(e) = self
1418                        .storage_runtime
1419                        .append(hirn_storage::datasets::episodic::DATASET_NAME, batch)
1420                        .await
1421                    {
1422                        tracing::error!(
1423                            count = lance_records.len(),
1424                            error = %e,
1425                            "batch_remember: LanceDB batch append failed"
1426                        );
1427                        // Best-effort cleanup: remove orphaned graph nodes.
1428                        for record in &prepared {
1429                            if let Err(cleanup_err) =
1430                                self.cached_graph().remove_node(record.record.id).await
1431                            {
1432                                tracing::warn!(
1433                                    id = %record.record.id,
1434                                    error = %cleanup_err,
1435                                    "batch_remember: failed to remove orphaned graph node"
1436                                );
1437                            }
1438                        }
1439                        let msg = format!("{e}");
1440                        for record in &prepared {
1441                            results[record.idx] =
1442                                Some(Err(HirnError::StorageError(msg.clone().into())));
1443                        }
1444                        record_batch_stage("append", stage_start.elapsed());
1445                        return results
1446                            .into_iter()
1447                            .map(|r| {
1448                                r.unwrap_or_else(|| {
1449                                    Err(HirnError::storage("LanceDB append failed"))
1450                                })
1451                            })
1452                            .collect();
1453                    }
1454                }
1455                Err(e) => {
1456                    tracing::error!(
1457                        count = lance_records.len(),
1458                        error = %e,
1459                        "batch_remember: LanceDB to_batch failed"
1460                    );
1461                    // Best-effort cleanup: remove orphaned graph nodes.
1462                    for record in &prepared {
1463                        if let Err(cleanup_err) =
1464                            self.cached_graph().remove_node(record.record.id).await
1465                        {
1466                            tracing::warn!(
1467                                id = %record.record.id,
1468                                error = %cleanup_err,
1469                                "batch_remember: failed to remove orphaned graph node"
1470                            );
1471                        }
1472                    }
1473                    let msg = format!("{e}");
1474                    for record in &prepared {
1475                        results[record.idx] =
1476                            Some(Err(HirnError::StorageError(msg.clone().into())));
1477                    }
1478                    record_batch_stage("append", stage_start.elapsed());
1479                    return results
1480                        .into_iter()
1481                        .map(|r| {
1482                            r.unwrap_or_else(|| Err(HirnError::storage("LanceDB to_batch failed")))
1483                        })
1484                        .collect();
1485                }
1486            }
1487        }
1488        record_batch_stage("append", stage_start.elapsed());
1489
1490        // ── 8. TemporalNext edges ───────────────────────────────────────
1491        // Capture namespace-local write order immediately after the durable
1492        // append so optional slow-path work cannot reorder the chain.
1493        let stage_start = std::time::Instant::now();
1494        let mut temporal_edge_requests = Vec::with_capacity(prepared.len().saturating_sub(1));
1495        let mut temporal_envelope_updates = Vec::new();
1496        for prepared_record in &mut prepared {
1497            let arrival = self
1498                .write_runtime()
1499                .record_arrival(prepared_record.record.namespace, prepared_record.record.id);
1500            let temporal_edge_request =
1501                temporal_edge_request_for_arrival(prepared_record.record.id, &arrival);
1502            if temporal_edge_request.is_some() {
1503                match update_episode_envelope_temporal_edge(
1504                    &mut prepared_record.episode_envelope,
1505                    temporal_edge_request.clone(),
1506                ) {
1507                    Ok(()) => {
1508                        temporal_envelope_updates.push(prepared_record.episode_envelope.clone());
1509                    }
1510                    Err(error) => {
1511                        tracing::warn!(
1512                            id = %prepared_record.record.id,
1513                            error = %error,
1514                            "batch_remember: failed to encode TemporalNext episode envelope update"
1515                        );
1516                    }
1517                }
1518            }
1519            if let Some(request) = temporal_edge_request {
1520                temporal_edge_requests.push(request);
1521            }
1522        }
1523        // PERF-5: envelope update and graph edge flush write to different
1524        // datasets — run them concurrently to halve the critical-path latency.
1525        let (envelope_result, _) = tokio::join!(
1526            hirn_storage::replace_mutation_envelopes(
1527                self.storage_backend(),
1528                &temporal_envelope_updates,
1529            ),
1530            self.cached_graph().add_edges_best_effort(&temporal_edge_requests),
1531        );
1532        if let Err(error) = envelope_result {
1533            tracing::warn!(
1534                count = temporal_envelope_updates.len(),
1535                error = %error,
1536                "batch_remember: failed to persist TemporalNext episode envelope updates"
1537            );
1538        }
1539        record_batch_stage("temporal_next", stage_start.elapsed());
1540
1541        // ── 9. Events ────────────────────────────────────────────────────
1542        let stage_start = std::time::Instant::now();
1543        let mut finalized_envelopes = Vec::new();
1544        let mut group_start = 0usize;
1545        while group_start < prepared.len() {
1546            let group_namespace = prepared[group_start].record.namespace;
1547            let group_agent_id = prepared[group_start].record.provenance.created_by;
1548            let mut group_end = group_start + 1;
1549            while group_end < prepared.len()
1550                && prepared[group_end].record.namespace == group_namespace
1551                && prepared[group_end].record.provenance.created_by == group_agent_id
1552            {
1553                group_end += 1;
1554            }
1555
1556            let events = prepared[group_start..group_end]
1557                .iter()
1558                .map(|prepared_record| MemoryEvent::EpisodeCreated {
1559                    id: prepared_record.record.id,
1560                    content_preview: prepared_record.content_preview.clone(),
1561                })
1562                .collect::<Vec<_>>();
1563
1564            match self
1565                .event_runtime()
1566                .emit_checked_batch(
1567                    &self.config.default_realm,
1568                    group_namespace.as_str(),
1569                    group_agent_id.as_str(),
1570                    events,
1571                )
1572                .await
1573            {
1574                Ok(()) => {
1575                    for prepared_record in &mut prepared[group_start..group_end] {
1576                        results[prepared_record.idx] = Some(Ok(prepared_record.record.id));
1577                        prepared_record.episode_envelope.state =
1578                            hirn_storage::MutationEnvelopeState::Applied;
1579                        prepared_record.episode_envelope.last_error = None;
1580                        prepared_record.episode_envelope.updated_at = Timestamp::now();
1581                        finalized_envelopes.push(prepared_record.episode_envelope.clone());
1582                    }
1583                }
1584                Err(error) => {
1585                    let message = error.to_string();
1586                    for prepared_record in &prepared[group_start..group_end] {
1587                        results[prepared_record.idx] =
1588                            Some(Err(HirnError::storage(message.clone())));
1589                    }
1590                }
1591            }
1592
1593            group_start = group_end;
1594        }
1595        if let Err(error) =
1596            hirn_storage::replace_mutation_envelopes(self.storage_backend(), &finalized_envelopes)
1597                .await
1598        {
1599            tracing::warn!(
1600                count = finalized_envelopes.len(),
1601                error = %error,
1602                "batch_remember: episode mutation envelope finalize failed; recovery will retry cleanup"
1603            );
1604        }
1605        for prepared_record in &prepared {
1606            if results[prepared_record.idx]
1607                .as_ref()
1608                .is_some_and(Result::is_ok)
1609            {
1610                self.cache_episodic_head(&prepared_record.record);
1611            }
1612        }
1613        record_batch_stage("events", stage_start.elapsed());
1614
1615        // ── 10. Slow-path write intelligence ────────────────────────────
1616        let stage_start = std::time::Instant::now();
1617        let mut prospective_batches = Vec::new();
1618        let mut prospective_rows = 0usize;
1619        let mut svo_batches = Vec::new();
1620        let mut svo_rows = 0usize;
1621
1622        for p in &prepared {
1623            let info = write_path_infos[p.idx].take();
1624            if let Some(info) = info {
1625                // Slow-path: prospective indexing + SVO extraction.
1626                if let Some(ref content) = info.content {
1627                    if self.config.prospective_indexing_enabled {
1628                        if let Some(embedder) = self.provider_runtime().embedder() {
1629                            if let Some(batch) =
1630                                super::write_path::prepare_prospective_implications_batch(
1631                                    &*embedder,
1632                                    p.record.id,
1633                                    content,
1634                                    self.config.prospective_indexing_num_questions,
1635                                    self.config.prospective_indexing_timeout_secs,
1636                                    &self.config.prospective_indexing_templates,
1637                                    info.namespace.as_str(),
1638                                )
1639                                .await
1640                            {
1641                                let count = batch.num_rows();
1642                                prospective_rows += count;
1643                                prospective_batches.push(batch);
1644                                tracing::debug!(id = %p.record.id, count, "batch: prospective implications prepared");
1645                            }
1646                        }
1647                    }
1648
1649                    if self.config.svo_extraction_enabled {
1650                        if let Some(batch) = super::write_path::prepare_svo_events_batch(
1651                            p.record.id,
1652                            content,
1653                            self.config.svo_confidence_threshold,
1654                            info.namespace.as_str(),
1655                            self.config.embedding_dimensions.as_usize(),
1656                        ) {
1657                            let count = batch.num_rows();
1658                            svo_rows += count;
1659                            svo_batches.push(batch);
1660                            tracing::debug!(id = %p.record.id, count, "batch: SVO events prepared");
1661                        }
1662                    }
1663                }
1664
1665                // Interference tracking (reuses RPE max_similarity).
1666                if let Some(max_sim) = info.max_similarity {
1667                    let interference =
1668                        super::write_path::interference_score_from_similarity(max_sim);
1669                    if interference > 0.0 {
1670                        let action = self.write_runtime().accumulate_interference(
1671                            interference,
1672                            info.namespace,
1673                            self.config.interference_consolidation_threshold,
1674                            self.config.interference_consolidation_cooldown_secs,
1675                        );
1676                        match action {
1677                            super::write_path::InterferenceAction::TriggerConsolidation {
1678                                namespaces,
1679                                backlog_score,
1680                                cause,
1681                            } => {
1682                                tracing::info!(
1683                                    namespace_count = namespaces.len(),
1684                                    backlog_score,
1685                                    cause = cause.as_str(),
1686                                    "batch: interference threshold exceeded, consolidation requested"
1687                                );
1688                            }
1689                            super::write_path::InterferenceAction::Suppressed {
1690                                reason,
1691                                backlog_score,
1692                            } => {
1693                                tracing::debug!(
1694                                    reason = reason.as_str(),
1695                                    backlog_score,
1696                                    "batch: interference request suppressed"
1697                                );
1698                            }
1699                            super::write_path::InterferenceAction::None => {}
1700                        }
1701                    }
1702                }
1703            }
1704        }
1705
1706        if !prospective_batches.is_empty() {
1707            let batch_count = prospective_batches.len();
1708            if let Err(e) = self
1709                .storage_runtime
1710                .append_batches(
1711                    hirn_storage::datasets::prospective_implications::DATASET_NAME,
1712                    prospective_batches,
1713                )
1714                .await
1715            {
1716                tracing::warn!(error = %e, "batch: failed to write prospective implications");
1717            } else {
1718                tracing::debug!(
1719                    batch_count,
1720                    count = prospective_rows,
1721                    "batch: prospective implications stored"
1722                );
1723            }
1724        }
1725
1726        if !svo_batches.is_empty() {
1727            let batch_count = svo_batches.len();
1728            if let Err(e) = self
1729                .storage_runtime
1730                .append_batches("svo_events", svo_batches)
1731                .await
1732            {
1733                tracing::warn!(error = %e, "batch: failed to write SVO events");
1734            } else {
1735                tracing::debug!(batch_count, count = svo_rows, "batch: SVO events stored");
1736            }
1737        }
1738        record_batch_stage("slow_path", stage_start.elapsed());
1739
1740        // ── 11. Metrics ─────────────────────────────────────────────────
1741        let success_count = results
1742            .iter()
1743            .filter(|result| matches!(result, Some(Ok(_))))
1744            .count() as u64;
1745        let fail_count = (n as u64).saturating_sub(success_count);
1746        if success_count > 0 {
1747            metrics::counter!(crate::metrics::REMEMBER_TOTAL, "realm" => realm.clone(), "status" => "success")
1748                .increment(success_count);
1749        }
1750        if fail_count > 0 {
1751            metrics::counter!(crate::metrics::REMEMBER_TOTAL, "realm" => realm, "status" => "client_error")
1752                .increment(fail_count);
1753        }
1754
1755        // Periodically persist RPE population stats so calibration survives restarts.
1756        self.write_runtime().flush_rpe_stats_if_due(self.path());
1757
1758        results
1759            .into_iter()
1760            .map(|r| r.unwrap_or_else(|| Err(HirnError::InvalidInput("unreachable".into()))))
1761            .collect()
1762    }
1763
1764    /// Internal remember implementation with optional admission bypass.
1765    async fn remember_inner(
1766        &self,
1767        record: EpisodicRecord,
1768        bypass_admission: bool,
1769    ) -> HirnResult<MemoryId> {
1770        self.remember_inner_with_explanation(record, bypass_admission)
1771            .await
1772            .map(|(id, _)| id)
1773            .map_err(|failure| failure.error)
1774    }
1775
1776    async fn remember_inner_with_explanation(
1777        &self,
1778        mut record: EpisodicRecord,
1779        bypass_admission: bool,
1780    ) -> Result<(MemoryId, crate::RememberExplanation), crate::RememberFailure> {
1781        let actor_id = record.provenance.created_by;
1782        let namespace = record.namespace;
1783        let initial_embedding = if record.embedding.is_some() {
1784            crate::EmbeddingDisposition::Provided
1785        } else {
1786            crate::EmbeddingDisposition::Missing
1787        };
1788        let mut explanation = crate::RememberExplanation::new(
1789            actor_id,
1790            namespace,
1791            bypass_admission,
1792            initial_embedding,
1793            self.config.text_retention,
1794        );
1795
1796        // ── Cedar policy enforcement ──
1797        if let Err(error) = self
1798            .enforce(
1799                record.provenance.created_by.as_str(),
1800                crate::policy::Action::Remember,
1801                &self.config.default_realm,
1802                record.namespace.as_str(),
1803            )
1804            .await
1805        {
1806            explanation.status = crate::RememberStatus::Rejected;
1807            explanation.error = Some(error.to_string());
1808            return Err(crate::RememberFailure::new(error, explanation));
1809        }
1810
1811        // ── Admission Control ──
1812        if !bypass_admission {
1813            let admission_result = self
1814                .admission_runtime()
1815                .evaluate_record(&record)
1816                .await
1817                .map_err(|error| {
1818                    explanation.status = crate::RememberStatus::Failed;
1819                    explanation.error = Some(error.to_string());
1820                    crate::RememberFailure::new(error, explanation.clone())
1821                })?;
1822            if let Some(result) = admission_result {
1823                let candidate = crate::admission::MemoryCandidate::from_record(&record);
1824                let controllers_consulted: Vec<String> = result
1825                    .verdicts
1826                    .iter()
1827                    .map(|v| v.controller.clone())
1828                    .collect();
1829                let decision = result.decision.clone();
1830                let decision_str = format!("{:?}", decision);
1831                self.emit_scoped(
1832                    record.namespace.as_str(),
1833                    record.provenance.created_by.as_str(),
1834                    MemoryEvent::AdmissionEvaluated {
1835                        candidate_id: candidate.id,
1836                        decision: decision_str,
1837                        controllers_consulted: controllers_consulted.clone(),
1838                    },
1839                )
1840                .await;
1841
1842                explanation.admission = Some(crate::AdmissionExplanation {
1843                    decision: decision.clone(),
1844                    controllers_consulted,
1845                });
1846                if let Err(error) = apply_admission_decision(
1847                    &mut record,
1848                    result.decision,
1849                    &self.config.default_realm,
1850                ) {
1851                    explanation.status = remember_status_for_admission(&decision);
1852                    explanation.error = Some(error.to_string());
1853                    return Err(crate::RememberFailure::new(error, explanation));
1854                }
1855            }
1856        }
1857
1858        // Auto-embed from multi_content when no embedding is provided.
1859        // On embed failure: fall back to storing without embedding (provider fallback).
1860        if record.embedding.is_none() {
1861            if let Some(ref mc) = record.multi_content {
1862                match self.embed_content(mc).await {
1863                    Ok(emb) => {
1864                        record.embedding = Some(emb);
1865                        explanation.embedding = crate::EmbeddingDisposition::Generated;
1866                    }
1867                    Err(e) => {
1868                        tracing::warn!(error = %e, id = %record.id, "Embed failed, storing without embedding (provider fallback)");
1869                        metrics::counter!(
1870                            crate::metrics::PROVIDER_FALLBACK_TOTAL,
1871                            "realm" => self.config.default_realm.clone(),
1872                            "provider_type" => "embed"
1873                        )
1874                        .increment(1);
1875                        self.write_runtime().enqueue_pending_embed(record.id);
1876                        explanation.embedding = crate::EmbeddingDisposition::PendingRetry;
1877                    }
1878                }
1879            }
1880        }
1881
1882        // ── RPE-gated fast/slow path ──
1883        let (is_fast_path, rpe_max_similarity) = if self.config.rpe_enabled {
1884            if let Some(ref emb) = record.embedding {
1885                let key = self
1886                    .write_runtime()
1887                    .rpe_partition_key(record.namespace, &self.rpe_model_id(), hirn_core::types::Layer::Episodic);
1888                let mut stats_snapshot = self.write_runtime().snapshot_rpe_stats(&key);
1889                let rpe = super::write_path::compute_rpe(
1890                    self.storage_backend(),
1891                    emb,
1892                    self.config.rpe_fast_path_threshold,
1893                    self.config.rpe_similarity_search_limit,
1894                    &mut stats_snapshot,
1895                    &self.write_runtime().rpe_circuit_breaker_for(&key),
1896                )
1897                .await;
1898                self.write_runtime()
1899                    .record_rpe_distance(&key, f64::from(1.0 - rpe.max_similarity));
1900                self.write_runtime().record_rpe_routing_metric(
1901                    &key,
1902                    &rpe,
1903                    self.config.rpe_fast_path_threshold,
1904                );
1905                tracing::debug!(
1906                    rpe_score = rpe.score,
1907                    max_similarity = rpe.max_similarity,
1908                    fast_path = rpe.is_fast_path,
1909                    "RPE admission score"
1910                );
1911                explanation.rpe = Some(crate::RpeExplanation {
1912                    enabled: true,
1913                    score: Some(rpe.score),
1914                    max_similarity: Some(rpe.max_similarity),
1915                    threshold: self.config.rpe_fast_path_threshold,
1916                    is_fast_path: rpe.is_fast_path,
1917                });
1918                if rpe.is_fast_path {
1919                    record.importance = 0.3 + 0.2 * rpe.score;
1920                    tracing::info!(id = %record.id, rpe_score = rpe.score, "RPE fast-path: skipping LLM analysis");
1921                }
1922                (rpe.is_fast_path, Some(rpe.max_similarity))
1923            } else {
1924                explanation.rpe = Some(crate::RpeExplanation {
1925                    enabled: true,
1926                    score: None,
1927                    max_similarity: None,
1928                    threshold: self.config.rpe_fast_path_threshold,
1929                    is_fast_path: false,
1930                });
1931                (false, None)
1932            }
1933        } else {
1934            (false, None)
1935        };
1936
1937        if let Some(ref emb) = record.embedding {
1938            if emb.len() != self.config.embedding_dimensions.as_usize() {
1939                let error = HirnError::InvalidInput(format!(
1940                    "embedding dimension mismatch: expected {}, got {}",
1941                    self.config.embedding_dimensions.as_usize(),
1942                    emb.len()
1943                ));
1944                explanation.status = crate::RememberStatus::Failed;
1945                explanation.error = Some(error.to_string());
1946                return Err(crate::RememberFailure::new(error, explanation));
1947            }
1948        }
1949
1950        let content_for_write_path = if !is_fast_path {
1951            Some(record.content.clone())
1952        } else {
1953            None
1954        };
1955
1956        match self.config.text_retention {
1957            hirn_core::TextRetention::Full => {}
1958            hirn_core::TextRetention::SummaryOnly => {
1959                record.content = String::new();
1960            }
1961            hirn_core::TextRetention::None => {
1962                record.content = String::new();
1963                record.summary = String::new();
1964            }
1965        }
1966
1967        let id = record.id;
1968        let importance = record.importance;
1969        let timestamp = record.timestamp;
1970        let namespace = record.namespace;
1971        let content_preview = record.content.chars().take(120).collect::<String>();
1972        let entities: Vec<String> = record.entities.iter().map(|e| e.name.clone()).collect();
1973
1974        if let Some(ref mc) = record.multi_content {
1975            let extracted = self
1976                .extract_and_store_resources(namespace, record.provenance.created_by, mc)
1977                .await
1978                .map_err(|error| {
1979                    explanation.status = crate::RememberStatus::Failed;
1980                    explanation.error = Some(error.to_string());
1981                    crate::RememberFailure::new(error, explanation.clone())
1982                })?;
1983            record.multi_content = Some(extracted.content);
1984            record
1985                .provenance
1986                .evidence_links
1987                .extend(extracted.evidence_links);
1988            explanation.resources_extracted = true;
1989        }
1990
1991        let edge_requests = self
1992            .plan_auto_episode_edge_requests(
1993                id,
1994                record.namespace,
1995                record.embedding.as_deref(),
1996                &record.content,
1997                &entities,
1998                None,
1999                None,
2000                None,
2001            )
2002            .await
2003            .map_err(|error| {
2004                explanation.status = crate::RememberStatus::Failed;
2005                explanation.error = Some(error.to_string());
2006                crate::RememberFailure::new(error, explanation.clone())
2007            })?;
2008        let episode_envelope =
2009            build_episode_remember_envelope(&record, &content_preview, &edge_requests).map_err(
2010                |error| {
2011                    explanation.status = crate::RememberStatus::Failed;
2012                    explanation.error = Some(error.to_string());
2013                    crate::RememberFailure::new(error, explanation.clone())
2014                },
2015            )?;
2016        hirn_storage::append_mutation_envelope(self.storage_backend(), &episode_envelope)
2017            .await
2018            .map_err(HirnError::storage)
2019            .map_err(|error| {
2020                explanation.status = crate::RememberStatus::Failed;
2021                explanation.error = Some(error.to_string());
2022                crate::RememberFailure::new(error, explanation.clone())
2023            })?;
2024
2025        self.cached_graph()
2026            .add_node(id, Layer::Episodic, importance, timestamp, namespace)
2027            .await
2028            .map_err(|error| {
2029                explanation.status = crate::RememberStatus::Failed;
2030                explanation.error = Some(error.to_string());
2031                crate::RememberFailure::new(error, explanation.clone())
2032            })?;
2033
2034        if let Err(e) = self
2035            .apply_episode_edge_requests(
2036                record.namespace,
2037                record.provenance.created_by,
2038                &edge_requests,
2039            )
2040            .await
2041        {
2042            let _ = self.cached_graph().remove_node(id).await;
2043            explanation.status = crate::RememberStatus::Failed;
2044            explanation.error = Some(e.to_string());
2045            return Err(crate::RememberFailure::new(e, explanation));
2046        }
2047
2048        {
2049            let dims = self.config.embedding_dimensions.as_usize();
2050            let batch =
2051                hirn_storage::datasets::episodic::to_batch(std::slice::from_ref(&record), dims)
2052                    .map_err(|e| HirnError::storage(e))
2053                    .map_err(|error| {
2054                        explanation.status = crate::RememberStatus::Failed;
2055                        explanation.error = Some(error.to_string());
2056                        crate::RememberFailure::new(error, explanation.clone())
2057                    })?;
2058            self.storage_runtime
2059                .append(hirn_storage::datasets::episodic::DATASET_NAME, batch)
2060                .await
2061                .map_err(|e| HirnError::storage(e))
2062                .map_err(|error| {
2063                    explanation.status = crate::RememberStatus::Failed;
2064                    explanation.error = Some(error.to_string());
2065                    crate::RememberFailure::new(error, explanation.clone())
2066                })?;
2067        }
2068
2069        let arrival = self.write_runtime().record_arrival(namespace, id);
2070        explanation.arrival_sequence = u64::try_from(arrival.sequence).ok();
2071        if let Some(prev_id) = arrival.previous_id {
2072            let _ = self
2073                .cached_graph()
2074                .add_edge(
2075                    prev_id,
2076                    id,
2077                    EdgeRelation::TemporalNext,
2078                    1.0,
2079                    temporal_next_metadata(&arrival),
2080                )
2081                .await;
2082        }
2083
2084        if let Some(ref content) = content_for_write_path {
2085            if self.config.prospective_indexing_enabled {
2086                if let Some(embedder) = self.provider_runtime().embedder() {
2087                    let count = super::write_path::store_prospective_implications(
2088                        self.storage_backend(),
2089                        &*embedder,
2090                        id,
2091                        content,
2092                        self.config.prospective_indexing_num_questions,
2093                        self.config.prospective_indexing_timeout_secs,
2094                        &self.config.prospective_indexing_templates,
2095                        namespace.as_str(),
2096                    )
2097                    .await;
2098                    explanation.prospective_indexing =
2099                        crate::WritePathOperationExplanation::applied(count);
2100                    if count > 0 {
2101                        tracing::debug!(id = %id, count, "Prospective implications stored");
2102                    }
2103                } else {
2104                    explanation.prospective_indexing =
2105                        crate::WritePathOperationExplanation::unavailable();
2106                }
2107            } else {
2108                explanation.prospective_indexing = crate::WritePathOperationExplanation::disabled();
2109            }
2110
2111            if self.config.svo_extraction_enabled {
2112                let count = super::write_path::extract_and_store_svo_events(
2113                    self.storage_backend(),
2114                    id,
2115                    content,
2116                    self.config.svo_confidence_threshold,
2117                    namespace.as_str(),
2118                    self.config.embedding_dimensions.as_usize(),
2119                )
2120                .await;
2121                explanation.svo_extraction = crate::WritePathOperationExplanation::applied(count);
2122                if count > 0 {
2123                    tracing::debug!(id = %id, count, "SVO events extracted and stored");
2124                }
2125            } else {
2126                explanation.svo_extraction = crate::WritePathOperationExplanation::disabled();
2127            }
2128        } else {
2129            explanation.prospective_indexing =
2130                crate::WritePathOperationExplanation::skipped_fast_path();
2131            explanation.svo_extraction = crate::WritePathOperationExplanation::skipped_fast_path();
2132        }
2133
2134        if let Some(max_sim) = rpe_max_similarity {
2135            let interference = super::write_path::interference_score_from_similarity(max_sim);
2136            let disposition = if interference > 0.0 {
2137                let action = self.write_runtime().accumulate_interference(
2138                    interference,
2139                    namespace,
2140                    self.config.interference_consolidation_threshold,
2141                    self.config.interference_consolidation_cooldown_secs,
2142                );
2143                match action {
2144                    super::write_path::InterferenceAction::TriggerConsolidation {
2145                        namespaces,
2146                        backlog_score,
2147                        cause,
2148                    } => {
2149                        tracing::info!(
2150                            namespace_count = namespaces.len(),
2151                            backlog_score,
2152                            cause = cause.as_str(),
2153                            "Interference threshold exceeded, consolidation requested"
2154                        );
2155                        crate::InterferenceDisposition::TriggerConsolidation {
2156                            namespaces,
2157                            backlog_score,
2158                            cause: cause.as_str(),
2159                        }
2160                    }
2161                    super::write_path::InterferenceAction::Suppressed {
2162                        reason,
2163                        backlog_score,
2164                    } => {
2165                        tracing::debug!(
2166                            reason = reason.as_str(),
2167                            backlog_score,
2168                            "Interference request suppressed"
2169                        );
2170                        crate::InterferenceDisposition::Suppressed {
2171                            reason: reason.as_str(),
2172                            backlog_score,
2173                        }
2174                    }
2175                    super::write_path::InterferenceAction::None => {
2176                        crate::InterferenceDisposition::None
2177                    }
2178                }
2179            } else {
2180                crate::InterferenceDisposition::None
2181            };
2182            explanation.interference = Some(crate::InterferenceExplanation {
2183                score: interference,
2184                disposition,
2185            });
2186        }
2187
2188        self.emit_scoped_checked(
2189            namespace.as_str(),
2190            record.provenance.created_by.as_str(),
2191            MemoryEvent::EpisodeCreated {
2192                id,
2193                content_preview,
2194            },
2195        )
2196        .await
2197        .map_err(|error| {
2198            explanation.status = crate::RememberStatus::Failed;
2199            explanation.error = Some(error.to_string());
2200            crate::RememberFailure::new(error, explanation.clone())
2201        })?;
2202        if let Err(error) = hirn_storage::update_mutation_envelope_state(
2203            self.storage_backend(),
2204            &episode_envelope.id,
2205            hirn_storage::MutationEnvelopeState::Applied,
2206            None,
2207        )
2208        .await
2209        {
2210            tracing::warn!(
2211                id = %id,
2212                error = %error,
2213                "episode mutation envelope finalize failed; recovery will retry cleanup"
2214            );
2215        }
2216        explanation.status = crate::RememberStatus::Accepted;
2217        explanation.memory_id = Some(id);
2218
2219        // ── Backward Memory Evolution (A-MEM) ──────────────────────────────
2220        // Async: enqueue an offline Evolve job referencing the new memory.
2221        // Synchronous: run evolution inline (best-effort, never fails the write).
2222        match self.config.evolution_mode {
2223            hirn_core::EvolutionMode::Async { .. } => {
2224                let target = hirn_core::offline::OfflineJobTarget {
2225                    namespace: Some(namespace),
2226                    memory_ids: vec![id],
2227                    ..Default::default()
2228                };
2229                let job = hirn_core::CognitiveJob::new(hirn_core::CognitiveJobKind::Evolve, target);
2230                if let Err(e) = self.offline_scheduler_runtime().submit_job(job).await {
2231                    tracing::warn!(id = %id, error = %e, "backward evolution job enqueue failed; skipping");
2232                }
2233            }
2234            hirn_core::EvolutionMode::Synchronous { max_neighbors } => {
2235                let evo_config = crate::consolidation::EvolutionConfig {
2236                    evolution_top_k: max_neighbors,
2237                    ..Default::default()
2238                };
2239                if let Err(e) = crate::consolidation::evolve_on_new_memory(
2240                    self,
2241                    &record,
2242                    &evo_config,
2243                )
2244                .await
2245                {
2246                    tracing::warn!(id = %id, error = %e, "synchronous backward evolution failed; skipping");
2247                }
2248            }
2249            hirn_core::EvolutionMode::None => {}
2250        }
2251
2252        Ok((id, explanation))
2253    }
2254
2255    pub(crate) async fn reconcile_pending_episode_mutations(&self) -> HirnResult<usize> {
2256        let envelopes = hirn_storage::list_pending_mutation_envelopes(
2257            self.storage_backend(),
2258            Some(EPISODE_REMEMBER_MUTATION_KIND),
2259        )
2260        .await
2261        .map_err(HirnError::storage)?;
2262        let mut reconciled = 0usize;
2263
2264        for envelope in envelopes {
2265            match self
2266                .reconcile_single_pending_episode_mutation(&envelope)
2267                .await
2268            {
2269                Ok(true) => reconciled += 1,
2270                Ok(false) => {}
2271                Err(error) => {
2272                    hirn_storage::update_mutation_envelope_state(
2273                        self.storage_backend(),
2274                        &envelope.id,
2275                        hirn_storage::MutationEnvelopeState::Failed,
2276                        Some(error.to_string()),
2277                    )
2278                    .await
2279                    .map_err(HirnError::storage)?;
2280                }
2281            }
2282        }
2283
2284        Ok(reconciled)
2285    }
2286
2287    async fn reconcile_single_pending_episode_mutation(
2288        &self,
2289        envelope: &hirn_storage::MutationEnvelopeRecord,
2290    ) -> HirnResult<bool> {
2291        let payload = decode_episode_remember_envelope(envelope)?;
2292
2293        match self.read_episodic_record(payload.record_id).await {
2294            Ok(_record) => {
2295                if !self.cached_graph().has_node(payload.record_id).await? {
2296                    self.cached_graph()
2297                        .add_node(
2298                            payload.record_id,
2299                            Layer::Episodic,
2300                            payload.importance,
2301                            Timestamp::from_millis(payload.timestamp_ms),
2302                            payload.namespace,
2303                        )
2304                        .await?;
2305                }
2306
2307                self.apply_episode_edge_requests(
2308                    payload.namespace,
2309                    payload.agent_id,
2310                    &payload.edge_requests,
2311                )
2312                .await?;
2313                if let Some(temporal_edge_request) = payload.temporal_edge_request.as_ref() {
2314                    self.apply_episode_edge_requests(
2315                        payload.namespace,
2316                        payload.agent_id,
2317                        std::slice::from_ref(temporal_edge_request),
2318                    )
2319                    .await?;
2320                }
2321
2322                if !self.episode_created_event_logged(&payload).await? {
2323                    self.emit_scoped_checked(
2324                        payload.namespace.as_str(),
2325                        payload.agent_id.as_str(),
2326                        MemoryEvent::EpisodeCreated {
2327                            id: payload.record_id,
2328                            content_preview: payload.content_preview.clone(),
2329                        },
2330                    )
2331                    .await?;
2332                }
2333
2334                hirn_storage::update_mutation_envelope_state(
2335                    self.storage_backend(),
2336                    &envelope.id,
2337                    hirn_storage::MutationEnvelopeState::Applied,
2338                    None,
2339                )
2340                .await
2341                .map_err(HirnError::storage)?;
2342                Ok(true)
2343            }
2344            Err(HirnError::NotFound(_)) => {
2345                if self.cached_graph().has_node(payload.record_id).await? {
2346                    let _ = self.cached_graph().remove_node(payload.record_id).await;
2347                }
2348                hirn_storage::update_mutation_envelope_state(
2349                    self.storage_backend(),
2350                    &envelope.id,
2351                    hirn_storage::MutationEnvelopeState::Failed,
2352                    Some(format!(
2353                        "episode record missing during recovery: {}",
2354                        payload.record_id
2355                    )),
2356                )
2357                .await
2358                .map_err(HirnError::storage)?;
2359                Ok(true)
2360            }
2361            Err(error) => Err(error),
2362        }
2363    }
2364
2365    async fn episode_created_event_logged(
2366        &self,
2367        payload: &EpisodeRememberEnvelope,
2368    ) -> HirnResult<bool> {
2369        let Some(event_log) = self.event_log() else {
2370            return Ok(false);
2371        };
2372
2373        let events = event_log
2374            .read_with_filter(&crate::event_log::EventFilter {
2375                realm: Some(self.config.default_realm.clone()),
2376                namespace: Some(payload.namespace.as_str().to_string()),
2377                event_type: Some("episode_created".to_string()),
2378                agent_id: Some(payload.agent_id.as_str().to_string()),
2379                after_us: None,
2380                before_us: None,
2381            })
2382            .await?;
2383
2384        Ok(events.into_iter().any(|env| {
2385            matches!(
2386                env.event,
2387                MemoryEvent::EpisodeCreated { id, .. } if id == payload.record_id
2388            )
2389        }))
2390    }
2391
2392    pub(crate) async fn hydrate_temporal_arrival_cursors(&self) -> HirnResult<()> {
2393        let mut latest_by_namespace: HashMap<Namespace, (MemoryId, i64)> = HashMap::new();
2394
2395        for edge in self.cached_graph().all_edges().await? {
2396            if edge.relation != EdgeRelation::TemporalNext {
2397                continue;
2398            }
2399            let Some(sequence) = target_arrival_sequence(&edge.metadata) else {
2400                continue;
2401            };
2402            latest_by_namespace
2403                .entry(edge.namespace)
2404                .and_modify(|current| {
2405                    if sequence > current.1 || (sequence == current.1 && edge.target > current.0) {
2406                        *current = (edge.target, sequence);
2407                    }
2408                })
2409                .or_insert((edge.target, sequence));
2410        }
2411
2412        let mut seed_records_by_namespace: HashMap<Namespace, Vec<EpisodicRecord>> = HashMap::new();
2413        for record in self.list_episode_arrival_seed_records().await? {
2414            seed_records_by_namespace
2415                .entry(record.namespace)
2416                .or_default()
2417                .push(record);
2418        }
2419
2420        for (namespace, mut records) in seed_records_by_namespace {
2421            latest_by_namespace.entry(namespace).or_insert_with(|| {
2422                records.sort_by(|left, right| {
2423                    left.created_at
2424                        .cmp(&right.created_at)
2425                        .then_with(|| left.revision_id.cmp(&right.revision_id))
2426                });
2427                let last = records
2428                    .last()
2429                    .expect("namespace seed records should be non-empty");
2430                (last.id, i64::try_from(records.len()).unwrap_or(i64::MAX))
2431            });
2432        }
2433
2434        for (namespace, (id, sequence)) in latest_by_namespace {
2435            self.write_runtime().seed_arrival(namespace, id, sequence);
2436        }
2437
2438        Ok(())
2439    }
2440
2441    async fn list_episode_arrival_seed_records(&self) -> HirnResult<Vec<EpisodicRecord>> {
2442        let mut batches = self
2443            .storage_runtime
2444            .scan_stream(
2445                hirn_storage::datasets::episodic::DATASET_NAME,
2446                hirn_storage::store::ScanOptions {
2447                    filter: Some("version = 1".to_string()),
2448                    ..Default::default()
2449                },
2450            )
2451            .await
2452            .map_err(HirnError::storage)?;
2453        let mut records = Vec::new();
2454
2455        while let Some(batch) = batches.try_next().await.map_err(HirnError::storage)? {
2456            records.extend(
2457                hirn_storage::datasets::episodic::from_batch(&batch).map_err(HirnError::storage)?,
2458            );
2459        }
2460
2461        Ok(records)
2462    }
2463
2464    /// Retrieve a single episodic record by ID.
2465    pub(crate) async fn get_episode(&self, id: MemoryId) -> HirnResult<EpisodicRecord> {
2466        self.read_episodic_record(id).await
2467    }
2468
2469    async fn apply_episode_access_counts(
2470        &self,
2471        counts: HashMap<MemoryId, usize>,
2472    ) -> HirnResult<()> {
2473        if counts.is_empty() {
2474            return Ok(());
2475        }
2476
2477        let unique_ids: Vec<MemoryId> = counts.keys().copied().collect();
2478        let records = self.read_episodic_records_batch(&unique_ids).await?;
2479        let mut updated = Vec::with_capacity(unique_ids.len());
2480
2481        for id in unique_ids {
2482            let Some(mut record) = records.get(&id).cloned() else {
2483                continue;
2484            };
2485            for _ in 0..counts[&id] {
2486                record.record_access();
2487            }
2488            updated.push(record);
2489        }
2490
2491        if updated.is_empty() {
2492            return Ok(());
2493        }
2494
2495        const UPDATE_CHUNK: usize = 256;
2496        for chunk in updated.chunks(UPDATE_CHUNK) {
2497            let in_list = chunk
2498                .iter()
2499                .map(|record| format!("'{}'", record.id.to_string().replace('\'', "''")))
2500                .collect::<Vec<_>>()
2501                .join(", ");
2502            self.storage_runtime
2503                .delete(
2504                    hirn_storage::datasets::episodic::DATASET_NAME,
2505                    &format!("id IN ({in_list})"),
2506                )
2507                .await
2508                .map_err(HirnError::storage)?;
2509        }
2510
2511        let dims = self.config.embedding_dimensions.as_usize();
2512        let batch = hirn_storage::datasets::episodic::to_batch(&updated, dims)
2513            .map_err(HirnError::storage)?;
2514        self.storage_runtime
2515            .append(hirn_storage::datasets::episodic::DATASET_NAME, batch)
2516            .await
2517            .map_err(HirnError::storage)?;
2518
2519        Ok(())
2520    }
2521
2522    pub(crate) async fn flush_episodic_access(&self) -> HirnResult<()> {
2523        self.apply_episode_access_counts(self.graph_runtime().drain_episodic_access())
2524            .await
2525    }
2526
2527    pub(crate) fn buffer_episode_access(&self, id: MemoryId) {
2528        self.graph_runtime().buffer_episodic_access(id);
2529    }
2530
2531    /// Update access stats for a single episodic record.
2532    /// Called periodically or on important reads (e.g., from recall).
2533    pub async fn record_episode_access(&self, id: MemoryId) -> HirnResult<()> {
2534        self.apply_episode_access_counts(HashMap::from([(id, 1)]))
2535            .await
2536    }
2537
2538    /// List episodic records matching the filter. Records are returned in
2539    /// temporal order (oldest first).
2540    pub(crate) async fn list_episodes(
2541        &self,
2542        filter: &EpisodicFilter,
2543    ) -> HirnResult<Vec<EpisodicRecord>> {
2544        let now = Timestamp::now();
2545        let requested_limit = filter.limit.unwrap_or(usize::MAX);
2546        if requested_limit == 0 {
2547            return Ok(Vec::new());
2548        }
2549
2550        let mut batches = self
2551            .storage_runtime
2552            .scan_stream(
2553                hirn_storage::datasets::episodic::DATASET_NAME,
2554                hirn_storage::store::ScanOptions {
2555                    filter: build_episodic_scan_filter(filter),
2556                    order_by: Some(vec![
2557                        hirn_storage::store::ScanOrdering::asc("timestamp_ms"),
2558                        hirn_storage::store::ScanOrdering::asc("id"),
2559                    ]),
2560                    ..Default::default()
2561                },
2562            )
2563            .await
2564            .map_err(HirnError::storage)?;
2565
2566        let mut heads = HashMap::new();
2567
2568        while let Some(batch) = batches.try_next().await.map_err(HirnError::storage)? {
2569            let recs =
2570                hirn_storage::datasets::episodic::from_batch(&batch).map_err(HirnError::storage)?;
2571            for rec in recs {
2572                heads
2573                    .entry(rec.logical_memory_id)
2574                    .and_modify(|current| {
2575                        if episodic_revision_is_newer(&rec, current) {
2576                            *current = rec.clone();
2577                        }
2578                    })
2579                    .or_insert(rec);
2580            }
2581        }
2582
2583        let mut results = heads
2584            .into_values()
2585            .filter(|rec| self.episode_matches_filter_at(rec, filter, now))
2586            .collect::<Vec<_>>();
2587        results.sort_by(|left, right| {
2588            left.timestamp
2589                .cmp(&right.timestamp)
2590                .then_with(|| left.id.cmp(&right.id))
2591        });
2592
2593        let offset = filter.offset.unwrap_or(0);
2594        if offset >= results.len() {
2595            return Ok(Vec::new());
2596        }
2597        let end = offset.saturating_add(requested_limit).min(results.len());
2598        results = results[offset..end].to_vec();
2599
2600        Ok(results)
2601    }
2602
2603    fn episode_matches_filter_at(
2604        &self,
2605        rec: &EpisodicRecord,
2606        filter: &EpisodicFilter,
2607        now: Timestamp,
2608    ) -> bool {
2609        if !filter.include_archived && rec.archived {
2610            return false;
2611        }
2612        // Automatically exclude expired records (TTL).
2613        if rec.is_expired(now) {
2614            return false;
2615        }
2616        if let Some(et) = &filter.event_type {
2617            if rec.event_type != *et {
2618                return false;
2619            }
2620        }
2621        if let Some(after) = &filter.after {
2622            if rec.timestamp <= *after {
2623                return false;
2624            }
2625        }
2626        if let Some(before) = &filter.before {
2627            if rec.timestamp >= *before {
2628                return false;
2629            }
2630        }
2631        if let Some(min_imp) = filter.min_importance {
2632            if rec.importance < min_imp {
2633                return false;
2634            }
2635        }
2636        if let Some(entity) = &filter.entity_name {
2637            if !rec.entities.iter().any(|e| e.name == *entity) {
2638                return false;
2639            }
2640        }
2641        if let Some(ns) = &filter.namespace {
2642            if rec.namespace != *ns {
2643                return false;
2644            }
2645        }
2646        true
2647    }
2648
2649    /// Hard-delete an episodic record and remove it from the graph.
2650    pub(crate) async fn delete_episode(&self, id: MemoryId) -> HirnResult<()> {
2651        let record = self.read_episodic_record(id).await?;
2652        let head = self
2653            .episodic_head_for_logical_id(record.logical_memory_id)
2654            .await?;
2655
2656        // ── Cedar policy enforcement ──
2657        self.enforce(
2658            head.provenance.created_by.as_str(),
2659            crate::policy::Action::Forget,
2660            &self.config.default_realm,
2661            head.namespace.as_str(),
2662        )
2663        .await?;
2664
2665        let history = self.read_episodic_history(record.logical_memory_id).await?;
2666        for revision in &history {
2667            let _ = self.cached_graph().remove_node(revision.id).await;
2668        }
2669
2670        // Delete the full logical chain from LanceDB.
2671        let exact_filter = Self::episodic_logical_exact_filter(record.logical_memory_id);
2672        self.storage_runtime
2673            .delete_exact(
2674                hirn_storage::datasets::episodic::DATASET_NAME,
2675                &exact_filter,
2676            )
2677            .await
2678            .map_err(|e| HirnError::storage(e))?;
2679
2680        self.emit_scoped(
2681            head.namespace.as_str(),
2682            head.provenance.created_by.as_str(),
2683            MemoryEvent::Forgotten { id },
2684        )
2685        .await;
2686        Ok(())
2687    }
2688
2689    /// Soft-delete: mark an episodic record as archived.
2690    pub(crate) async fn archive_episode(&self, id: MemoryId) -> HirnResult<()> {
2691        let record = self.episodic_edit_target(id).await?;
2692
2693        // ── Cedar policy enforcement ──
2694        self.enforce(
2695            record.provenance.created_by.as_str(),
2696            crate::policy::Action::Forget,
2697            &self.config.default_realm,
2698            record.namespace.as_str(),
2699        )
2700        .await?;
2701
2702        let archived = self
2703            .append_episodic_successor(
2704                &record,
2705                RevisionOperation::Retract,
2706                Some("episodic record archived".to_string()),
2707                |next| {
2708                    next.archived = true;
2709                },
2710            )
2711            .await?;
2712        self.emit_scoped(
2713            archived.namespace.as_str(),
2714            archived.provenance.created_by.as_str(),
2715            MemoryEvent::Archived { id: archived.id },
2716        )
2717        .await;
2718        Ok(())
2719    }
2720
2721    // ── Memory Decay & TTL ──────────────────────────────────
2722
2723    /// Apply time-based importance decay to all non-archived episodic records.
2724    ///
2725    /// Formula: `importance *= decay_factor ^ (hours_since_last_access / half_life_hours)`
2726    ///
2727    /// Records that fall below `memory_min_importance` are automatically archived
2728    /// via append-only successor revisions.
2729    /// Returns the number of records that were archived due to decay.
2730    pub(crate) async fn decay_memories(&self) -> HirnResult<usize> {
2731        let decay_factor = self.config.memory_decay_factor;
2732        let half_life_hours = self.config.memory_half_life_hours;
2733        let min_importance = self.config.memory_min_importance;
2734        let now = Timestamp::now();
2735        let now_dt = now.as_datetime();
2736        let mut archived_count = 0;
2737
2738        for record in self.current_episodic_heads().await?.into_values() {
2739            if !record.is_live() || record.is_expired(now) {
2740                continue;
2741            }
2742
2743            let last_dt = record.last_accessed.as_datetime();
2744            let hours_elapsed = (now_dt - last_dt).num_seconds().max(0) as f64 / 3600.0;
2745
2746            let hours_since_creation = now_dt
2747                .signed_duration_since(record.timestamp.as_datetime())
2748                .num_hours() as f64;
2749            if hours_since_creation < 1.0 {
2750                continue;
2751            }
2752
2753            let exponent = hours_elapsed / half_life_hours as f64;
2754            let new_importance = record.importance * (decay_factor as f64).powf(exponent) as f32;
2755
2756            if new_importance < min_importance {
2757                let archived = self
2758                    .append_episodic_successor(
2759                        &record,
2760                        RevisionOperation::Retract,
2761                        Some("episodic importance decayed below archival threshold".to_string()),
2762                        |next| {
2763                            next.importance = new_importance;
2764                            next.archived = true;
2765                        },
2766                    )
2767                    .await?;
2768                archived_count += 1;
2769                self.emit_scoped(
2770                    archived.namespace.as_str(),
2771                    archived.provenance.created_by.as_str(),
2772                    MemoryEvent::Archived { id: archived.id },
2773                )
2774                .await;
2775            } else if new_importance < record.importance * 0.999 {
2776                let _ = self
2777                    .append_episodic_successor(
2778                        &record,
2779                        RevisionOperation::Correct,
2780                        Some("episodic importance decayed".to_string()),
2781                        |next| {
2782                            next.importance = new_importance;
2783                        },
2784                    )
2785                    .await?;
2786            }
2787        }
2788
2789        Ok(archived_count)
2790    }
2791
2792    /// Hard-delete all episodic records whose `expires_at` has passed.
2793    ///
2794    /// Returns the number of records purged.
2795    pub(crate) async fn purge_expired(&self) -> HirnResult<usize> {
2796        let now = Timestamp::now();
2797        let expired_ids: Vec<MemoryId> = self
2798            .current_episodic_heads()
2799            .await?
2800            .into_values()
2801            .filter(|record| record.is_expired(now))
2802            .map(|record| record.id)
2803            .collect();
2804
2805        // Delete each expired record via the existing delete_episode path.
2806        let count = expired_ids.len();
2807        for id in expired_ids {
2808            // Best-effort: skip records that were already deleted (race with manual delete).
2809            let _ = self.delete_episode(id).await;
2810        }
2811
2812        Ok(count)
2813    }
2814
2815    /// Start a background task that periodically runs decay + TTL purge.
2816    ///
2817    /// Returns a `JoinHandle` that can be awaited on shutdown.
2818    /// The task runs at `interval` cadence and stops when the returned
2819    /// handle is aborted or the runtime shuts down.
2820    pub fn start_decay_task(
2821        self: &std::sync::Arc<Self>,
2822        interval: std::time::Duration,
2823    ) -> tokio::task::JoinHandle<()> {
2824        let db = std::sync::Arc::clone(self);
2825        tokio::spawn(async move {
2826            let mut ticker = tokio::time::interval(interval);
2827            ticker.tick().await; // consume the immediate first tick
2828            loop {
2829                ticker.tick().await;
2830                if let Err(e) = db.decay_memories().await {
2831                    tracing::warn!(error = %e, "memory decay task failed");
2832                }
2833                if let Err(e) = db.purge_expired().await {
2834                    tracing::warn!(error = %e, "TTL purge task failed");
2835                }
2836            }
2837        })
2838    }
2839}
2840
2841#[cfg(test)]
2842mod tests {
2843    use std::sync::Arc;
2844
2845    use super::*;
2846    use hirn_core::HirnConfig;
2847    use hirn_core::id::MemoryId;
2848    use hirn_core::revision::{LogicalMemoryId, RevisionId};
2849    use hirn_core::types::{AgentId, EdgeRelation, EventType};
2850    use hirn_storage::memory_store::MemoryStore;
2851
2852    fn agent() -> AgentId {
2853        AgentId::new("episodic-tests").unwrap()
2854    }
2855
2856    async fn temp_db() -> (HirnDB, tempfile::TempDir) {
2857        let dir = tempfile::tempdir().unwrap();
2858        let path = dir.path().join("episodic-tests");
2859        let config = HirnConfig::builder()
2860            .db_path(&path)
2861            .embedding_dimensions(4)
2862            .working_memory_token_limit(1000)
2863            .memory_decay_factor(0.5)
2864            .memory_half_life_hours(1)
2865            .memory_min_importance(0.05)
2866            .build()
2867            .unwrap();
2868        let db = HirnDB::open_with_config(config, Arc::new(MemoryStore::new()))
2869            .await
2870            .unwrap();
2871        (db, dir)
2872    }
2873
2874    fn episodic_record(
2875        id: MemoryId,
2876        logical_memory_id: LogicalMemoryId,
2877        created_at: Timestamp,
2878        version: u32,
2879    ) -> EpisodicRecord {
2880        let mut record = EpisodicRecord::builder()
2881            .event_type(EventType::Observation)
2882            .content("deployment note")
2883            .summary("deployment note")
2884            .importance(0.7)
2885            .agent_id(agent())
2886            .build()
2887            .unwrap();
2888        record.id = id;
2889        record.logical_memory_id = logical_memory_id;
2890        record.revision_id = RevisionId::from_memory_id(id);
2891        record.version = version;
2892        record.timestamp = created_at;
2893        record.created_at = created_at;
2894        record.updated_at = created_at;
2895        record.last_accessed = created_at;
2896        record
2897    }
2898
2899    #[test]
2900    fn revision_snapshot_preserves_exact_recorded_boundary_when_timestamps_tie() {
2901        let created_at = Timestamp::from_millis(1_700_000_000_000);
2902        let original_id = MemoryId::parse("01ARZ3NDEKTSV4RRFFQ69G5FAW").unwrap();
2903        let successor_id = MemoryId::parse("01ARZ3NDEKTSV4RRFFQ69G5FAV").unwrap();
2904        let logical_memory_id = LogicalMemoryId::from_memory_id(original_id);
2905
2906        let original = episodic_record(original_id, logical_memory_id, created_at, 1);
2907        let mut successor = episodic_record(successor_id, logical_memory_id, created_at, 2);
2908        successor.revision_operation = RevisionOperation::Correct;
2909        successor.revision_reason = Some("content refined".to_string());
2910        successor.revision_causation_id = Some(original.id);
2911
2912        let revision = episodic_snapshot_head_recorded_at_snapshot(
2913            &[original.clone(), successor],
2914            super::super::semantic::ResolvedRecallSnapshot::Revision {
2915                cutoff: created_at,
2916                revision_id: original.revision_id,
2917                logical_memory_id,
2918                version: original.version,
2919            },
2920        )
2921        .unwrap();
2922
2923        assert_eq!(revision.id, original.id);
2924        assert_eq!(revision.revision_id, original.revision_id);
2925        assert_eq!(revision.version, 1);
2926    }
2927
2928    #[tokio::test(flavor = "multi_thread")]
2929    async fn decay_advances_only_the_active_revision_head() {
2930        let (db, _dir) = temp_db().await;
2931        let stale_timestamp =
2932            Timestamp::from_millis(Timestamp::now().millis() - (6 * 60 * 60 * 1000));
2933
2934        let id = db
2935            .remember(
2936                EpisodicRecord::builder()
2937                    .event_type(EventType::Observation)
2938                    .content("stale chain")
2939                    .summary("stale chain")
2940                    .importance(0.8)
2941                    .timestamp(stale_timestamp)
2942                    .agent_id(agent())
2943                    .build()
2944                    .unwrap(),
2945            )
2946            .await
2947            .unwrap();
2948
2949        let original = db.read_episodic_record(id).await.unwrap();
2950        let active = db
2951            .append_episodic_successor(
2952                &original,
2953                RevisionOperation::Correct,
2954                Some("prepare stale active head".to_string()),
2955                |next| {
2956                    next.importance = 0.8;
2957                },
2958            )
2959            .await
2960            .unwrap();
2961
2962        let mut stale_active = db.read_episodic_record(active.id).await.unwrap();
2963        stale_active.last_accessed = stale_timestamp;
2964        stale_active.updated_at = stale_timestamp;
2965        db.write_episodic_record(&stale_active).await.unwrap();
2966
2967        let archived = db.decay_memories().await.unwrap();
2968        assert_eq!(archived, 1);
2969
2970        let history = db
2971            .read_episodic_history(original.logical_memory_id)
2972            .await
2973            .unwrap();
2974        assert_eq!(history.len(), 3);
2975
2976        let original_after = db.read_episodic_record(id).await.unwrap();
2977        assert_eq!(original_after.version, 1);
2978        assert_eq!(original_after.importance, 0.8);
2979        assert!(original_after.superseded_by.is_none());
2980
2981        let head = db
2982            .episodic_head_for_logical_id(original.logical_memory_id)
2983            .await
2984            .unwrap();
2985        assert_eq!(head.version, 3);
2986        assert_eq!(head.revision_operation, RevisionOperation::Retract);
2987        assert!(head.archived);
2988    }
2989
2990    #[tokio::test(flavor = "multi_thread")]
2991    async fn batch_remember_primes_episodic_head_cache() {
2992        let (db, _dir) = temp_db().await;
2993
2994        let records = vec![
2995            EpisodicRecord::builder()
2996                .event_type(EventType::Observation)
2997                .content("first cached episode")
2998                .summary("first cached episode")
2999                .importance(0.7)
3000                .agent_id(agent())
3001                .build()
3002                .unwrap(),
3003            EpisodicRecord::builder()
3004                .event_type(EventType::Observation)
3005                .content("second cached episode")
3006                .summary("second cached episode")
3007                .importance(0.6)
3008                .agent_id(agent())
3009                .build()
3010                .unwrap(),
3011        ];
3012
3013        let logical_ids = records
3014            .iter()
3015            .map(|record| record.logical_memory_id)
3016            .collect::<Vec<_>>();
3017
3018        let results = db.batch_remember(records).await;
3019        assert!(results.iter().all(Result::is_ok));
3020
3021        for logical_memory_id in logical_ids {
3022            let cached = db.cached_episodic_head(logical_memory_id);
3023            assert!(
3024                cached.is_some(),
3025                "expected cached head for {logical_memory_id}"
3026            );
3027        }
3028    }
3029
3030    #[tokio::test(flavor = "multi_thread")]
3031    async fn open_reconciles_pending_episode_mutation_envelopes() {
3032        let dir = tempfile::tempdir().unwrap();
3033        let path = dir.path().join("episodic-envelope-recovery");
3034        let store = Arc::new(MemoryStore::new());
3035        let config = HirnConfig::builder()
3036            .db_path(&path)
3037            .embedding_dimensions(4)
3038            .working_memory_token_limit(1000)
3039            .build()
3040            .unwrap();
3041
3042        let db = HirnDB::open_with_config(config.clone(), store.clone())
3043            .await
3044            .unwrap();
3045        let record = episodic_record(MemoryId::new(), LogicalMemoryId::new(), Timestamp::now(), 1);
3046        let preview = record.content.chars().take(120).collect::<String>();
3047        let envelope = build_episode_remember_envelope(&record, &preview, &[]).unwrap();
3048
3049        let batch = hirn_storage::datasets::episodic::to_batch(
3050            std::slice::from_ref(&record),
3051            db.config.embedding_dimensions.as_usize(),
3052        )
3053        .unwrap();
3054        db.storage_runtime
3055            .append(hirn_storage::datasets::episodic::DATASET_NAME, batch)
3056            .await
3057            .unwrap();
3058        hirn_storage::append_mutation_envelope(store.as_ref(), &envelope)
3059            .await
3060            .unwrap();
3061
3062        assert!(!db.cached_graph().has_node(record.id).await.unwrap());
3063        drop(db);
3064
3065        let reopened = HirnDB::open_with_config(config, store.clone())
3066            .await
3067            .unwrap();
3068
3069        assert!(reopened.cached_graph().has_node(record.id).await.unwrap());
3070
3071        let events = reopened.event_log().unwrap().read_all().await.unwrap();
3072        assert!(events.into_iter().any(|env| {
3073            matches!(env.event, MemoryEvent::EpisodeCreated { id, .. } if id == record.id)
3074        }));
3075
3076        let stored_envelope = hirn_storage::get_mutation_envelope(store.as_ref(), &envelope.id)
3077            .await
3078            .unwrap()
3079            .unwrap();
3080        assert_eq!(
3081            stored_envelope.state,
3082            hirn_storage::MutationEnvelopeState::Applied
3083        );
3084    }
3085
3086    #[tokio::test(flavor = "multi_thread")]
3087    async fn open_reconciles_pending_episode_mutation_temporal_next_edges() {
3088        let dir = tempfile::tempdir().unwrap();
3089        let path = dir.path().join("episodic-envelope-temporal-recovery");
3090        let store = Arc::new(MemoryStore::new());
3091        let config = HirnConfig::builder()
3092            .db_path(&path)
3093            .embedding_dimensions(4)
3094            .working_memory_token_limit(1000)
3095            .build()
3096            .unwrap();
3097
3098        let db = HirnDB::open_with_config(config.clone(), store.clone())
3099            .await
3100            .unwrap();
3101        let previous_id = db
3102            .remember(
3103                EpisodicRecord::builder()
3104                    .event_type(EventType::Observation)
3105                    .content("first recovery edge")
3106                    .summary("first recovery edge")
3107                    .importance(0.7)
3108                    .agent_id(agent())
3109                    .build()
3110                    .unwrap(),
3111            )
3112            .await
3113            .unwrap();
3114        let record = episodic_record(MemoryId::new(), LogicalMemoryId::new(), Timestamp::now(), 1);
3115        let preview = record.content.chars().take(120).collect::<String>();
3116        let mut envelope = build_episode_remember_envelope(&record, &preview, &[]).unwrap();
3117        let arrival = super::write_runtime::TemporalArrival {
3118            previous_id: Some(previous_id),
3119            previous_sequence: Some(1),
3120            sequence: 2,
3121        };
3122        update_episode_envelope_temporal_edge(
3123            &mut envelope,
3124            temporal_edge_request_for_arrival(record.id, &arrival),
3125        )
3126        .unwrap();
3127
3128        let batch = hirn_storage::datasets::episodic::to_batch(
3129            std::slice::from_ref(&record),
3130            db.config.embedding_dimensions.as_usize(),
3131        )
3132        .unwrap();
3133        db.storage_runtime
3134            .append(hirn_storage::datasets::episodic::DATASET_NAME, batch)
3135            .await
3136            .unwrap();
3137        hirn_storage::append_mutation_envelope(store.as_ref(), &envelope)
3138            .await
3139            .unwrap();
3140
3141        drop(db);
3142
3143        let reopened = HirnDB::open_with_config(config, store.clone())
3144            .await
3145            .unwrap();
3146
3147        let temporal_edges = reopened
3148            .cached_graph()
3149            .get_edges_between(previous_id, record.id)
3150            .await
3151            .unwrap();
3152        let edge = temporal_edges
3153            .into_iter()
3154            .find(|edge| edge.relation == EdgeRelation::TemporalNext)
3155            .expect("pending episode recovery should restore TemporalNext edge");
3156        assert_eq!(target_arrival_sequence(&edge.metadata), Some(2));
3157
3158        let stored_envelope = hirn_storage::get_mutation_envelope(store.as_ref(), &envelope.id)
3159            .await
3160            .unwrap()
3161            .unwrap();
3162        assert_eq!(
3163            stored_envelope.state,
3164            hirn_storage::MutationEnvelopeState::Applied
3165        );
3166    }
3167
3168    #[tokio::test(flavor = "multi_thread")]
3169    async fn restart_hydrates_temporal_arrival_cursor_for_next_remember() {
3170        let dir = tempfile::tempdir().unwrap();
3171        let path = dir.path().join("episodic-temporal-restart");
3172        let store = Arc::new(MemoryStore::new());
3173        let config = HirnConfig::builder()
3174            .db_path(&path)
3175            .embedding_dimensions(4)
3176            .working_memory_token_limit(1000)
3177            .build()
3178            .unwrap();
3179
3180        let first_id = {
3181            let db = HirnDB::open_with_config(config.clone(), store.clone())
3182                .await
3183                .unwrap();
3184            let first_id = db
3185                .remember(
3186                    EpisodicRecord::builder()
3187                        .event_type(EventType::Observation)
3188                        .content("first after boot")
3189                        .summary("first after boot")
3190                        .importance(0.7)
3191                        .agent_id(agent())
3192                        .build()
3193                        .unwrap(),
3194                )
3195                .await
3196                .unwrap();
3197            drop(db);
3198            first_id
3199        };
3200
3201        let reopened = HirnDB::open_with_config(config, store).await.unwrap();
3202        let second_id = reopened
3203            .remember(
3204                EpisodicRecord::builder()
3205                    .event_type(EventType::Observation)
3206                    .content("second after restart")
3207                    .summary("second after restart")
3208                    .importance(0.8)
3209                    .agent_id(agent())
3210                    .build()
3211                    .unwrap(),
3212            )
3213            .await
3214            .unwrap();
3215
3216        let temporal_edges = reopened
3217            .cached_graph()
3218            .get_edges_of_type(second_id, EdgeRelation::TemporalNext)
3219            .await
3220            .unwrap();
3221        let edge = temporal_edges
3222            .into_iter()
3223            .find(|edge| edge.source == first_id && edge.target == second_id)
3224            .expect("restarted remember should continue the TemporalNext chain");
3225        assert_eq!(target_arrival_sequence(&edge.metadata), Some(2));
3226    }
3227
3228    #[tokio::test(flavor = "multi_thread")]
3229    async fn purge_expired_only_considers_the_active_revision_head() {
3230        let (db, _dir) = temp_db().await;
3231        let now = Timestamp::now();
3232        let expired_timestamp = Timestamp::from_millis(now.millis().saturating_sub(1_000));
3233        let future_timestamp = Timestamp::from_millis(now.millis() + (60 * 60 * 1000));
3234
3235        let id = db
3236            .remember(
3237                EpisodicRecord::builder()
3238                    .event_type(EventType::Observation)
3239                    .content("ttl chain")
3240                    .summary("ttl chain")
3241                    .expires_at(expired_timestamp)
3242                    .agent_id(agent())
3243                    .build()
3244                    .unwrap(),
3245            )
3246            .await
3247            .unwrap();
3248
3249        let original = db.read_episodic_record(id).await.unwrap();
3250        let active = db
3251            .append_episodic_successor(
3252                &original,
3253                RevisionOperation::Correct,
3254                Some("extend ttl on active head".to_string()),
3255                |next| {
3256                    next.expires_at = Some(future_timestamp);
3257                },
3258            )
3259            .await
3260            .unwrap();
3261
3262        let purged = db.purge_expired().await.unwrap();
3263        assert_eq!(purged, 0);
3264
3265        let history = db
3266            .read_episodic_history(original.logical_memory_id)
3267            .await
3268            .unwrap();
3269        assert_eq!(history.len(), 2);
3270        assert!(history.iter().any(|record| record.id == id));
3271        assert!(history.iter().any(|record| record.id == active.id));
3272    }
3273
3274    #[tokio::test(flavor = "multi_thread")]
3275    async fn archive_canonicalizes_stale_revision_ids_to_the_live_head() {
3276        let (db, _dir) = temp_db().await;
3277
3278        let id = db
3279            .remember(
3280                EpisodicRecord::builder()
3281                    .event_type(EventType::Observation)
3282                    .content("draft note")
3283                    .summary("draft note")
3284                    .importance(0.7)
3285                    .agent_id(agent())
3286                    .build()
3287                    .unwrap(),
3288            )
3289            .await
3290            .unwrap();
3291
3292        let original = db.read_episodic_record(id).await.unwrap();
3293        let current = db
3294            .append_episodic_successor(
3295                &original,
3296                RevisionOperation::Correct,
3297                Some("refresh content".to_string()),
3298                |next| {
3299                    next.content = "fresh note".to_string();
3300                    next.summary = "fresh note".to_string();
3301                },
3302            )
3303            .await
3304            .unwrap();
3305
3306        db.archive_episode(id).await.unwrap();
3307
3308        let history = db
3309            .read_episodic_history(original.logical_memory_id)
3310            .await
3311            .unwrap();
3312        assert_eq!(history.len(), 3);
3313
3314        let original_after = db.read_episodic_record(id).await.unwrap();
3315        assert_eq!(original_after.version, 1);
3316        assert!(!original_after.archived);
3317
3318        let current_after = db.read_episodic_record(current.id).await.unwrap();
3319        assert_eq!(current_after.version, 2);
3320        assert!(!current_after.archived);
3321
3322        let archived = db
3323            .episodic_head_for_logical_id(original.logical_memory_id)
3324            .await
3325            .unwrap();
3326        assert_eq!(archived.version, 3);
3327        assert!(archived.archived);
3328        assert_eq!(archived.revision_operation, RevisionOperation::Retract);
3329    }
3330
3331    #[tokio::test(flavor = "multi_thread")]
3332    async fn batch_remember_preserves_auto_edges_against_existing_records() {
3333        let (db, _dir) = temp_db().await;
3334        let embedding = vec![1.0, 0.0, 0.0, 0.0];
3335
3336        let existing_id = db
3337            .remember(
3338                EpisodicRecord::builder()
3339                    .event_type(EventType::Observation)
3340                    .content("deploy service alpha")
3341                    .summary("deploy service alpha")
3342                    .embedding(embedding.clone())
3343                    .entity("deploy", "topic")
3344                    .entity("service", "topic")
3345                    .agent_id(agent())
3346                    .build()
3347                    .unwrap(),
3348            )
3349            .await
3350            .unwrap();
3351
3352        let new_id = db
3353            .batch_remember(vec![
3354                EpisodicRecord::builder()
3355                    .event_type(EventType::Observation)
3356                    .content("deploy service beta")
3357                    .summary("deploy service beta")
3358                    .embedding(embedding)
3359                    .entity("deploy", "topic")
3360                    .entity("service", "topic")
3361                    .agent_id(agent())
3362                    .build()
3363                    .unwrap(),
3364            ])
3365            .await
3366            .into_iter()
3367            .next()
3368            .unwrap()
3369            .unwrap();
3370
3371        let edges = db
3372            .cached_graph()
3373            .get_edges_between(existing_id, new_id)
3374            .await
3375            .unwrap();
3376
3377        assert!(
3378            edges
3379                .iter()
3380                .any(|edge| edge.relation == EdgeRelation::SimilarTo),
3381            "expected SimilarTo edge between existing and batched record"
3382        );
3383        assert!(
3384            edges
3385                .iter()
3386                .any(|edge| edge.relation == EdgeRelation::RelatedTo),
3387            "expected RelatedTo edge between existing and batched record"
3388        );
3389    }
3390
3391    #[tokio::test(flavor = "multi_thread")]
3392    async fn max_auto_edges_zero_disables_episode_auto_edges() {
3393        let dir = tempfile::tempdir().unwrap();
3394        let path = dir.path().join("episodic-tests-no-auto-edges");
3395        let config = HirnConfig::builder()
3396            .db_path(&path)
3397            .embedding_dimensions(4)
3398            .working_memory_token_limit(1000)
3399            .max_auto_edges_per_record(0)
3400            .entity_overlap_threshold(1)
3401            .build()
3402            .unwrap();
3403        let db = HirnDB::open_with_config(config, Arc::new(MemoryStore::new()))
3404            .await
3405            .unwrap();
3406
3407        let embedding = vec![1.0, 0.0, 0.0, 0.0];
3408        let existing_id = db
3409            .remember(
3410                EpisodicRecord::builder()
3411                    .event_type(EventType::Observation)
3412                    .content("deploy service alpha")
3413                    .summary("deploy service alpha")
3414                    .embedding(embedding.clone())
3415                    .entity("deploy", "topic")
3416                    .agent_id(agent())
3417                    .build()
3418                    .unwrap(),
3419            )
3420            .await
3421            .unwrap();
3422
3423        let new_id = db
3424            .remember(
3425                EpisodicRecord::builder()
3426                    .event_type(EventType::Observation)
3427                    .content("deploy service beta")
3428                    .summary("deploy service beta")
3429                    .embedding(embedding)
3430                    .entity("deploy", "topic")
3431                    .agent_id(agent())
3432                    .build()
3433                    .unwrap(),
3434            )
3435            .await
3436            .unwrap();
3437
3438        let edges = db
3439            .cached_graph()
3440            .get_edges_between(existing_id, new_id)
3441            .await
3442            .unwrap();
3443        assert!(
3444            !edges.iter().any(|edge| {
3445                matches!(
3446                    edge.relation,
3447                    EdgeRelation::SimilarTo
3448                        | EdgeRelation::RelatedTo
3449                        | EdgeRelation::ParticipatesIn
3450                        | EdgeRelation::Contradicts
3451                )
3452            }),
3453            "expected no auto-edge relations when max_auto_edges_per_record is zero"
3454        );
3455    }
3456
3457    #[tokio::test(flavor = "multi_thread")]
3458    async fn batch_remember_entity_only_records_preserve_auto_edges_against_existing_records() {
3459        let (db, _dir) = temp_db().await;
3460
3461        let existing_id = db
3462            .remember(
3463                EpisodicRecord::builder()
3464                    .event_type(EventType::Observation)
3465                    .content("deploy service alpha")
3466                    .summary("deploy service alpha")
3467                    .entity("deploy", "topic")
3468                    .entity("service", "topic")
3469                    .agent_id(agent())
3470                    .build()
3471                    .unwrap(),
3472            )
3473            .await
3474            .unwrap();
3475
3476        let new_id = db
3477            .batch_remember(vec![
3478                EpisodicRecord::builder()
3479                    .event_type(EventType::Observation)
3480                    .content("deploy service beta")
3481                    .summary("deploy service beta")
3482                    .entity("deploy", "topic")
3483                    .entity("service", "topic")
3484                    .agent_id(agent())
3485                    .build()
3486                    .unwrap(),
3487            ])
3488            .await
3489            .into_iter()
3490            .next()
3491            .unwrap()
3492            .unwrap();
3493
3494        let edges = db
3495            .cached_graph()
3496            .get_edges_between(existing_id, new_id)
3497            .await
3498            .unwrap();
3499
3500        assert!(
3501            edges
3502                .iter()
3503                .any(|edge| edge.relation == EdgeRelation::RelatedTo),
3504            "expected RelatedTo edge between existing and batched entity-only record"
3505        );
3506    }
3507
3508    #[tokio::test(flavor = "multi_thread")]
3509    async fn batch_remember_multiple_embedded_records_preserve_auto_edges_against_existing_records()
3510    {
3511        let (db, _dir) = temp_db().await;
3512        let embedding = vec![1.0, 0.0, 0.0, 0.0];
3513
3514        let existing_id = db
3515            .remember(
3516                EpisodicRecord::builder()
3517                    .event_type(EventType::Observation)
3518                    .content("deploy service alpha")
3519                    .summary("deploy service alpha")
3520                    .embedding(embedding.clone())
3521                    .entity("deploy", "topic")
3522                    .entity("service", "topic")
3523                    .agent_id(agent())
3524                    .build()
3525                    .unwrap(),
3526            )
3527            .await
3528            .unwrap();
3529
3530        let batched_ids = db
3531            .batch_remember(vec![
3532                EpisodicRecord::builder()
3533                    .event_type(EventType::Observation)
3534                    .content("deploy service beta")
3535                    .summary("deploy service beta")
3536                    .embedding(embedding.clone())
3537                    .entity("deploy", "topic")
3538                    .entity("service", "topic")
3539                    .agent_id(agent())
3540                    .build()
3541                    .unwrap(),
3542                EpisodicRecord::builder()
3543                    .event_type(EventType::Observation)
3544                    .content("deploy service gamma")
3545                    .summary("deploy service gamma")
3546                    .embedding(embedding)
3547                    .entity("deploy", "topic")
3548                    .entity("service", "topic")
3549                    .agent_id(agent())
3550                    .build()
3551                    .unwrap(),
3552            ])
3553            .await
3554            .into_iter()
3555            .map(Result::unwrap)
3556            .collect::<Vec<_>>();
3557
3558        for new_id in batched_ids {
3559            let edges = db
3560                .cached_graph()
3561                .get_edges_between(existing_id, new_id)
3562                .await
3563                .unwrap();
3564
3565            assert!(
3566                edges
3567                    .iter()
3568                    .any(|edge| edge.relation == EdgeRelation::SimilarTo),
3569                "expected SimilarTo edge between existing and batched embedded record"
3570            );
3571            assert!(
3572                edges
3573                    .iter()
3574                    .any(|edge| edge.relation == EdgeRelation::RelatedTo),
3575                "expected RelatedTo edge between existing and batched embedded record"
3576            );
3577        }
3578    }
3579
3580    #[tokio::test(flavor = "multi_thread")]
3581    async fn batch_remember_distinct_embedded_records_preserve_auto_edges_against_matching_existing_records()
3582     {
3583        let (db, _dir) = temp_db().await;
3584        let deploy_embedding = vec![1.0, 0.0, 0.0, 0.0];
3585        let cache_embedding = vec![0.0, 1.0, 0.0, 0.0];
3586
3587        let deploy_existing_id = db
3588            .remember(
3589                EpisodicRecord::builder()
3590                    .event_type(EventType::Observation)
3591                    .content("deploy service alpha")
3592                    .summary("deploy service alpha")
3593                    .embedding(deploy_embedding.clone())
3594                    .entity("deploy", "topic")
3595                    .entity("service", "topic")
3596                    .agent_id(agent())
3597                    .build()
3598                    .unwrap(),
3599            )
3600            .await
3601            .unwrap();
3602
3603        let cache_existing_id = db
3604            .remember(
3605                EpisodicRecord::builder()
3606                    .event_type(EventType::Observation)
3607                    .content("cache index warmup")
3608                    .summary("cache index warmup")
3609                    .embedding(cache_embedding.clone())
3610                    .entity("cache", "topic")
3611                    .entity("index", "topic")
3612                    .agent_id(agent())
3613                    .build()
3614                    .unwrap(),
3615            )
3616            .await
3617            .unwrap();
3618
3619        let batched_ids = db
3620            .batch_remember(vec![
3621                EpisodicRecord::builder()
3622                    .event_type(EventType::Observation)
3623                    .content("deploy service beta")
3624                    .summary("deploy service beta")
3625                    .embedding(deploy_embedding)
3626                    .entity("deploy", "topic")
3627                    .entity("service", "topic")
3628                    .agent_id(agent())
3629                    .build()
3630                    .unwrap(),
3631                EpisodicRecord::builder()
3632                    .event_type(EventType::Observation)
3633                    .content("cache index refresh")
3634                    .summary("cache index refresh")
3635                    .embedding(cache_embedding)
3636                    .entity("cache", "topic")
3637                    .entity("index", "topic")
3638                    .agent_id(agent())
3639                    .build()
3640                    .unwrap(),
3641            ])
3642            .await
3643            .into_iter()
3644            .map(Result::unwrap)
3645            .collect::<Vec<_>>();
3646
3647        let deploy_edges = db
3648            .cached_graph()
3649            .get_edges_between(deploy_existing_id, batched_ids[0])
3650            .await
3651            .unwrap();
3652        assert!(
3653            deploy_edges
3654                .iter()
3655                .any(|edge| edge.relation == EdgeRelation::SimilarTo),
3656            "expected SimilarTo edge between deploy records"
3657        );
3658        assert!(
3659            deploy_edges
3660                .iter()
3661                .any(|edge| edge.relation == EdgeRelation::RelatedTo),
3662            "expected RelatedTo edge between deploy records"
3663        );
3664
3665        let cache_edges = db
3666            .cached_graph()
3667            .get_edges_between(cache_existing_id, batched_ids[1])
3668            .await
3669            .unwrap();
3670        assert!(
3671            cache_edges
3672                .iter()
3673                .any(|edge| edge.relation == EdgeRelation::SimilarTo),
3674            "expected SimilarTo edge between cache records"
3675        );
3676        assert!(
3677            cache_edges
3678                .iter()
3679                .any(|edge| edge.relation == EdgeRelation::RelatedTo),
3680            "expected RelatedTo edge between cache records"
3681        );
3682    }
3683}