1use std::collections::{HashMap, HashSet};
2
3use futures::TryStreamExt;
4use hirn_core::RecallSnapshot;
5use hirn_core::revision::{LogicalMemoryId, RevisionId, RevisionOperation};
6
7use super::*;
8use crate::cached_graph_store::EdgeInsert;
9
10pub(super) const EPISODE_REMEMBER_MUTATION_KIND: &str = "episode_remember";
11
12#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
13struct EpisodeRememberEnvelope {
14 record_id: MemoryId,
15 namespace: Namespace,
16 agent_id: AgentId,
17 importance: f32,
18 timestamp_ms: u64,
19 content_preview: String,
20 edge_requests: Vec<EdgeInsert>,
21 temporal_edge_request: Option<EdgeInsert>,
22}
23
24fn encode_episode_remember_envelope(payload: &EpisodeRememberEnvelope) -> HirnResult<Vec<u8>> {
25 serde_json::to_vec(payload)
26 .map_err(|error| HirnError::storage(format!("episode envelope serialize: {error}")))
27}
28
29fn decode_episode_remember_envelope(
30 envelope: &hirn_storage::MutationEnvelopeRecord,
31) -> HirnResult<EpisodeRememberEnvelope> {
32 serde_json::from_slice(&envelope.payload)
33 .map_err(|error| HirnError::storage(format!("episode envelope deserialize: {error}")))
34}
35
36fn update_episode_envelope_temporal_edge(
37 envelope: &mut hirn_storage::MutationEnvelopeRecord,
38 temporal_edge_request: Option<EdgeInsert>,
39) -> HirnResult<()> {
40 let mut payload = decode_episode_remember_envelope(envelope)?;
41 payload.temporal_edge_request = temporal_edge_request;
42 envelope.payload = encode_episode_remember_envelope(&payload)?;
43 envelope.updated_at = Timestamp::now();
44 Ok(())
45}
46
47fn temporal_edge_request_for_arrival(
48 record_id: MemoryId,
49 arrival: &super::write_runtime::TemporalArrival,
50) -> Option<EdgeInsert> {
51 arrival.previous_id.map(|previous_id| EdgeInsert {
52 source: previous_id,
53 target: record_id,
54 relation: EdgeRelation::TemporalNext,
55 weight: 1.0,
56 metadata: temporal_next_metadata(arrival),
57 })
58}
59
60fn target_arrival_sequence(metadata: &Metadata) -> Option<i64> {
61 match metadata.get("target_arrival_sequence") {
62 Some(hirn_core::metadata::MetadataValue::Int(sequence)) => Some(*sequence),
63 _ => None,
64 }
65}
66
67fn temporal_next_metadata(arrival: &super::write_runtime::TemporalArrival) -> Metadata {
68 let mut metadata = Metadata::new();
69 metadata.insert("temporal_basis".into(), "arrival_order".into());
70 metadata.insert("temporal_partition".into(), "namespace".into());
71 if let Some(previous_sequence) = arrival.previous_sequence {
72 metadata.insert("source_arrival_sequence".into(), previous_sequence.into());
73 }
74 metadata.insert("target_arrival_sequence".into(), arrival.sequence.into());
75 metadata
76}
77
78fn apply_admission_decision(
79 record: &mut EpisodicRecord,
80 decision: crate::admission::AdmissionDecision,
81 realm: &str,
82) -> HirnResult<()> {
83 match decision {
84 crate::admission::AdmissionDecision::Accept {
85 importance_override,
86 } => {
87 if let Some(override_val) = importance_override {
88 record.importance = override_val;
89 }
90 Ok(())
91 }
92 crate::admission::AdmissionDecision::Reject { reason } => {
93 metrics::counter!(crate::metrics::ADMISSION_REJECTED_TOTAL, "realm" => realm.to_owned())
94 .increment(1);
95 Err(HirnError::InvalidInput(format!(
96 "admission rejected: {reason}"
97 )))
98 }
99 crate::admission::AdmissionDecision::Defer { until } => Err(HirnError::InvalidInput(
100 format!("admission deferred until {until}"),
101 )),
102 crate::admission::AdmissionDecision::Merge { target } => Err(HirnError::InvalidInput(
103 format!("admission: merge into {target}"),
104 )),
105 }
106}
107
108fn remember_status_for_admission(
109 decision: &crate::admission::AdmissionDecision,
110) -> crate::RememberStatus {
111 match decision {
112 crate::admission::AdmissionDecision::Accept { .. } => crate::RememberStatus::Accepted,
113 crate::admission::AdmissionDecision::Reject { .. } => crate::RememberStatus::Rejected,
114 crate::admission::AdmissionDecision::Defer { .. } => crate::RememberStatus::Deferred,
115 crate::admission::AdmissionDecision::Merge { .. } => crate::RememberStatus::Merged,
116 }
117}
118
119fn build_episode_remember_envelope(
120 record: &EpisodicRecord,
121 content_preview: &str,
122 edge_requests: &[EdgeInsert],
123) -> HirnResult<hirn_storage::MutationEnvelopeRecord> {
124 let timestamp_ms = u64::try_from(record.timestamp.timestamp_ms())
125 .map_err(|_| HirnError::storage("episode timestamp_ms was negative"))?;
126 let payload = EpisodeRememberEnvelope {
127 record_id: record.id,
128 namespace: record.namespace,
129 agent_id: record.provenance.created_by,
130 importance: record.importance,
131 timestamp_ms,
132 content_preview: content_preview.to_string(),
133 edge_requests: edge_requests.to_vec(),
134 temporal_edge_request: None,
135 };
136 let payload = encode_episode_remember_envelope(&payload)?;
137
138 Ok(hirn_storage::MutationEnvelopeRecord::pending(
139 format!("episode-remember:{}", record.id),
140 EPISODE_REMEMBER_MUTATION_KIND,
141 payload,
142 ))
143}
144
145fn build_episodic_scan_filter(filter: &EpisodicFilter) -> Option<String> {
146 let mut parts: Vec<String> = Vec::new();
147
148 if let Some(after) = &filter.after {
149 parts.push(format!("timestamp_ms > {}", after.timestamp_ms()));
150 }
151 if let Some(before) = &filter.before {
152 parts.push(format!("timestamp_ms < {}", before.timestamp_ms()));
153 }
154 if let Some(ns) = &filter.namespace {
155 parts.push(format!("namespace = '{}'", ns.as_str().replace('\'', "''")));
156 }
157 if let Some(event_type) = &filter.event_type {
158 parts.push(format!("event_type = '{event_type:?}'"));
159 }
160 if let Some(min_importance) = filter.min_importance {
161 parts.push(format!("importance >= {min_importance}"));
162 }
163 if let Some(t) = &filter.valid_at {
166 let t_ms = t.timestamp_ms();
167 parts.push(format!("timestamp_ms <= {t_ms}"));
168 parts.push(format!(
169 "(valid_until_ms IS NULL OR valid_until_ms > {t_ms})"
170 ));
171 }
172
173 if parts.is_empty() {
174 None
175 } else {
176 Some(parts.join(" AND "))
177 }
178}
179
180pub(super) fn episodic_revision_is_newer(
181 candidate: &EpisodicRecord,
182 current: &EpisodicRecord,
183) -> bool {
184 candidate.version > current.version
185 || (candidate.version == current.version
186 && (candidate.created_at > current.created_at
187 || (candidate.created_at == current.created_at
188 && candidate.revision_id > current.revision_id)))
189}
190
191pub(super) fn collapse_episodic_heads(
192 records: impl IntoIterator<Item = EpisodicRecord>,
193) -> HashMap<LogicalMemoryId, EpisodicRecord> {
194 let mut heads = HashMap::new();
195 for record in records {
196 heads
197 .entry(record.logical_memory_id)
198 .and_modify(|current| {
199 if episodic_revision_is_newer(&record, current) {
200 *current = record.clone();
201 }
202 })
203 .or_insert(record);
204 }
205 heads
206}
207
208pub(super) fn episodic_snapshot_head_as_of(
209 history: &[EpisodicRecord],
210 cutoff: Timestamp,
211) -> Option<EpisodicRecord> {
212 history
213 .iter()
214 .filter(|record| record.timestamp <= cutoff)
215 .max_by(|left, right| {
216 left.version
217 .cmp(&right.version)
218 .then_with(|| left.created_at.cmp(&right.created_at))
219 .then_with(|| left.revision_id.cmp(&right.revision_id))
220 })
221 .cloned()
222}
223
224pub(super) fn episodic_snapshot_head_recorded_at_snapshot(
225 history: &[EpisodicRecord],
226 snapshot: super::semantic::ResolvedRecallSnapshot,
227) -> Option<EpisodicRecord> {
228 history
229 .iter()
230 .filter(|record| {
231 snapshot.contains_recorded_revision_for_chain(
232 record.logical_memory_id,
233 record.version,
234 record.created_at,
235 record.revision_id,
236 )
237 })
238 .max_by(|left, right| {
239 left.created_at
240 .cmp(&right.created_at)
241 .then_with(|| left.version.cmp(&right.version))
242 .then_with(|| left.revision_id.cmp(&right.revision_id))
243 })
244 .cloned()
245}
246
247impl HirnDB {
248 pub(super) async fn read_episodic_record(&self, id: MemoryId) -> HirnResult<EpisodicRecord> {
252 let opts = hirn_storage::store::ScanOptions {
253 exact_filter: Some(hirn_storage::store::ExactMatchFilter::utf8_value(
254 "id",
255 id.to_string(),
256 )),
257 limit: Some(1),
258 ..Default::default()
259 };
260 let batches = self
261 .storage_runtime
262 .scan(hirn_storage::datasets::episodic::DATASET_NAME, opts)
263 .await
264 .map_err(|e| HirnError::storage(e))?;
265 for batch in &batches {
266 let recs = hirn_storage::datasets::episodic::from_batch(batch)
267 .map_err(|e| HirnError::storage(e))?;
268 if let Some(rec) = recs.into_iter().next() {
269 return Ok(rec);
270 }
271 }
272 Err(HirnError::NotFound(format!("episodic record {id}")))
273 }
274
275 pub(super) async fn read_episodic_records_batch(
276 &self,
277 ids: &[MemoryId],
278 ) -> HirnResult<HashMap<MemoryId, EpisodicRecord>> {
279 if ids.is_empty() {
280 return Ok(HashMap::new());
281 }
282
283 const ID_SCAN_CHUNK: usize = 256;
284 let mut records = HashMap::with_capacity(ids.len());
285
286 for chunk in ids.chunks(ID_SCAN_CHUNK) {
287 let exact_filter = hirn_storage::store::ExactMatchFilter::utf8_values(
288 "id",
289 chunk.iter().map(ToString::to_string),
290 )
291 .expect("episodic batch chunks are non-empty");
292 let opts = hirn_storage::store::ScanOptions {
293 exact_filter: Some(exact_filter),
294 ..Default::default()
295 };
296
297 let mut batches = self
298 .storage_runtime
299 .scan_stream(hirn_storage::datasets::episodic::DATASET_NAME, opts)
300 .await
301 .map_err(HirnError::storage)?;
302
303 while let Some(batch) = batches.try_next().await.map_err(HirnError::storage)? {
304 let recs = hirn_storage::datasets::episodic::from_batch(&batch)
305 .map_err(HirnError::storage)?;
306 for rec in recs {
307 records.insert(rec.id, rec);
308 }
309 }
310 }
311
312 Ok(records)
313 }
314
315 pub(super) fn episodic_logical_exact_filter(
316 logical_memory_id: LogicalMemoryId,
317 ) -> hirn_storage::store::ExactMatchFilter {
318 hirn_storage::store::ExactMatchFilter::utf8_value(
319 "logical_memory_id",
320 logical_memory_id.to_string(),
321 )
322 }
323
324 pub(super) fn episodic_logical_exact_filter_many(
325 logical_memory_ids: &[LogicalMemoryId],
326 ) -> Option<hirn_storage::store::ExactMatchFilter> {
327 hirn_storage::store::ExactMatchFilter::utf8_values(
328 "logical_memory_id",
329 logical_memory_ids
330 .iter()
331 .map(ToString::to_string)
332 .collect::<Vec<_>>(),
333 )
334 }
335
336 fn cached_episodic_head(&self, logical_memory_id: LogicalMemoryId) -> Option<EpisodicRecord> {
337 self.episodic_head_cache_get(logical_memory_id)
338 }
339
340 fn cache_episodic_head(&self, record: &EpisodicRecord) {
341 if let Some(current) = self.cached_episodic_head(record.logical_memory_id) {
342 if !episodic_revision_is_newer(record, ¤t) {
343 return;
344 }
345 }
346
347 self.episodic_head_cache_put(record.clone());
348 }
349
350 fn evict_episodic_head(&self, logical_memory_id: LogicalMemoryId) {
351 self.episodic_head_cache_evict(logical_memory_id);
352 }
353
354 pub(super) async fn read_episodic_history(
355 &self,
356 logical_memory_id: LogicalMemoryId,
357 ) -> HirnResult<Vec<EpisodicRecord>> {
358 let mut batches = self
359 .storage_runtime
360 .scan_stream(
361 hirn_storage::datasets::episodic::DATASET_NAME,
362 hirn_storage::store::ScanOptions {
363 exact_filter: Some(Self::episodic_logical_exact_filter(logical_memory_id)),
364 order_by: Some(vec![
365 hirn_storage::store::ScanOrdering::desc("version"),
366 hirn_storage::store::ScanOrdering::desc("created_at_ms"),
367 hirn_storage::store::ScanOrdering::desc("revision_id"),
368 ]),
369 ..Default::default()
370 },
371 )
372 .await
373 .map_err(HirnError::storage)?;
374
375 let mut history = Vec::new();
376 while let Some(batch) = batches.try_next().await.map_err(HirnError::storage)? {
377 let recs =
378 hirn_storage::datasets::episodic::from_batch(&batch).map_err(HirnError::storage)?;
379 history.extend(recs);
380 }
381
382 Ok(history)
383 }
384
385 async fn load_episodic_head_from_storage(
386 &self,
387 logical_memory_id: LogicalMemoryId,
388 ) -> HirnResult<EpisodicRecord> {
389 let mut batches = self
390 .storage_runtime
391 .scan_stream(
392 hirn_storage::datasets::episodic::DATASET_NAME,
393 hirn_storage::store::ScanOptions {
394 exact_filter: Some(Self::episodic_logical_exact_filter(logical_memory_id)),
395 order_by: Some(vec![
396 hirn_storage::store::ScanOrdering::desc("version"),
397 hirn_storage::store::ScanOrdering::desc("created_at_ms"),
398 hirn_storage::store::ScanOrdering::desc("revision_id"),
399 ]),
400 limit: Some(1),
401 ..Default::default()
402 },
403 )
404 .await
405 .map_err(HirnError::storage)?;
406
407 while let Some(batch) = batches.try_next().await.map_err(HirnError::storage)? {
408 let recs =
409 hirn_storage::datasets::episodic::from_batch(&batch).map_err(HirnError::storage)?;
410 if let Some(record) = recs.into_iter().next() {
411 return Ok(record);
412 }
413 }
414
415 Err(HirnError::NotFound(format!(
416 "episodic logical memory {logical_memory_id}"
417 )))
418 }
419
420 pub(super) async fn episodic_head_for_logical_id(
421 &self,
422 logical_memory_id: LogicalMemoryId,
423 ) -> HirnResult<EpisodicRecord> {
424 if let Some(record) = self.cached_episodic_head(logical_memory_id) {
425 return Ok(record);
426 }
427
428 match self
429 .load_episodic_head_from_storage(logical_memory_id)
430 .await
431 {
432 Ok(record) => {
433 self.cache_episodic_head(&record);
434 Ok(record)
435 }
436 Err(HirnError::NotFound(_)) => {
437 self.evict_episodic_head(logical_memory_id);
438 Err(HirnError::NotFound(format!(
439 "episodic logical memory {logical_memory_id}"
440 )))
441 }
442 Err(error) => Err(error),
443 }
444 }
445
446 pub(super) async fn live_episodic_heads_for_logical_ids(
447 &self,
448 logical_memory_ids: &[LogicalMemoryId],
449 ) -> HirnResult<HashMap<LogicalMemoryId, EpisodicRecord>> {
450 let mut heads = HashMap::with_capacity(logical_memory_ids.len());
451 let mut missing = Vec::new();
452
453 for &logical_memory_id in logical_memory_ids {
454 if let Some(record) = self.cached_episodic_head(logical_memory_id) {
455 if record.is_live() {
456 heads.insert(logical_memory_id, record);
457 }
458 } else {
459 missing.push(logical_memory_id);
460 }
461 }
462
463 let Some(exact_filter) = Self::episodic_logical_exact_filter_many(&missing) else {
464 return Ok(heads);
465 };
466
467 let mut batches = self
468 .storage_runtime
469 .scan_stream(
470 hirn_storage::datasets::episodic::DATASET_NAME,
471 hirn_storage::store::ScanOptions {
472 exact_filter: Some(exact_filter),
473 order_by: Some(vec![
474 hirn_storage::store::ScanOrdering::desc("version"),
475 hirn_storage::store::ScanOrdering::desc("created_at_ms"),
476 hirn_storage::store::ScanOrdering::desc("revision_id"),
477 ]),
478 ..Default::default()
479 },
480 )
481 .await
482 .map_err(|error| {
483 HirnError::storage(format!(
484 "failed to scan current episodic heads dataset `episodic`: {error}"
485 ))
486 })?;
487
488 let mut loaded = HashMap::with_capacity(missing.len());
489 while let Some(batch) = batches.try_next().await.map_err(|error| {
490 HirnError::storage(format!(
491 "failed to stream current episodic heads dataset `episodic`: {error}"
492 ))
493 })? {
494 let recs = hirn_storage::datasets::episodic::from_batch(&batch).map_err(|error| {
495 HirnError::storage(format!(
496 "failed to decode current episodic heads dataset `episodic`: {error}"
497 ))
498 })?;
499 for record in recs {
500 loaded
501 .entry(record.logical_memory_id)
502 .and_modify(|current| {
503 if episodic_revision_is_newer(&record, current) {
504 *current = record.clone();
505 }
506 })
507 .or_insert(record);
508 }
509 }
510
511 for (logical_memory_id, record) in loaded {
512 self.cache_episodic_head(&record);
513 if record.is_live() {
514 heads.insert(logical_memory_id, record);
515 }
516 }
517
518 for &logical_memory_id in &missing {
519 if !heads.contains_key(&logical_memory_id) {
520 self.evict_episodic_head(logical_memory_id);
521 }
522 }
523
524 Ok(heads)
525 }
526
527 async fn current_episodic_heads(&self) -> HirnResult<HashMap<LogicalMemoryId, EpisodicRecord>> {
528 let mut batches = self
529 .storage_runtime
530 .scan_stream(
531 hirn_storage::datasets::episodic::DATASET_NAME,
532 hirn_storage::store::ScanOptions::default(),
533 )
534 .await
535 .map_err(HirnError::storage)?;
536
537 let mut records = Vec::new();
538 while let Some(batch) = batches.try_next().await.map_err(HirnError::storage)? {
539 let recs =
540 hirn_storage::datasets::episodic::from_batch(&batch).map_err(HirnError::storage)?;
541 records.extend(recs);
542 }
543
544 Ok(collapse_episodic_heads(records))
545 }
546
547 pub(super) async fn episodic_revision_for_logical_id_at_snapshot(
548 &self,
549 logical_memory_id: LogicalMemoryId,
550 snapshot: RecallSnapshot,
551 ) -> HirnResult<Option<EpisodicRecord>> {
552 let history = self.read_episodic_history(logical_memory_id).await?;
553 if history.is_empty() {
554 return Ok(None);
555 }
556
557 let resolved_snapshot = self.resolve_recall_snapshot(snapshot).await?;
558 let revision = match resolved_snapshot {
559 super::semantic::ResolvedRecallSnapshot::Observed(cutoff) => {
560 episodic_snapshot_head_as_of(&history, cutoff)
561 }
562 recorded_snapshot => {
563 episodic_snapshot_head_recorded_at_snapshot(&history, recorded_snapshot)
564 }
565 };
566
567 Ok(revision)
568 }
569
570 pub(super) async fn episodic_edit_target(&self, id: MemoryId) -> HirnResult<EpisodicRecord> {
571 let record = self.read_episodic_record(id).await?;
572 let head = self
573 .episodic_head_for_logical_id(record.logical_memory_id)
574 .await?;
575
576 if head.is_live() {
577 Ok(head)
578 } else {
579 Err(HirnError::InvalidInput(format!(
580 "episodic logical memory {} is not live",
581 head.logical_memory_id
582 )))
583 }
584 }
585
586 pub(super) async fn append_episodic_record(&self, record: &EpisodicRecord) -> HirnResult<()> {
587 let dims = self.config.embedding_dimensions.as_usize();
588 let batch = hirn_storage::datasets::episodic::to_batch(std::slice::from_ref(record), dims)
589 .map_err(HirnError::storage)?;
590 self.storage_runtime
591 .append(hirn_storage::datasets::episodic::DATASET_NAME, batch)
592 .await
593 .map_err(HirnError::storage)?;
594 self.cache_episodic_head(record);
595 Ok(())
596 }
597
598 pub(super) async fn append_episodic_successor<F>(
599 &self,
600 current: &EpisodicRecord,
601 operation: RevisionOperation,
602 reason: Option<String>,
603 update: F,
604 ) -> HirnResult<EpisodicRecord>
605 where
606 F: FnOnce(&mut EpisodicRecord),
607 {
608 let now = Timestamp::now();
609 let new_id = MemoryId::new();
610
611 let mut next = current.clone();
612 next.id = new_id;
613 next.revision_id = RevisionId::from_memory_id(new_id);
614 next.version = current.version + 1;
615 next.revision_operation = operation;
616 next.revision_reason = reason;
617 next.revision_causation_id = Some(current.id);
618 next.created_at = now;
619 next.updated_at = now;
620 next.last_accessed = now;
621 next.superseded_by = None;
622
623 update(&mut next);
624
625 if next.content != current.content || next.multi_content != current.multi_content {
626 let embedding_text = next
627 .multi_content
628 .as_ref()
629 .map(|content| content.text_for_embedding().into_owned())
630 .unwrap_or_else(|| next.content.clone());
631 next.embedding = Some(self.embed_text(&embedding_text).await?);
632 }
633
634 self.cached_graph()
635 .add_node(
636 next.id,
637 Layer::Episodic,
638 next.importance,
639 next.timestamp,
640 next.namespace,
641 )
642 .await?;
643
644 if let Err(error) = self
645 .rebind_graph_edges_excluding(current.id, next.id, &[EdgeRelation::SimilarTo])
646 .await
647 {
648 let _ = self.cached_graph().remove_node(next.id).await;
649 return Err(error);
650 }
651
652 if next.is_live() {
653 if let Some(ref emb) = next.embedding {
654 let candidates = self.find_similarity_candidates(emb).await;
655 if let Err(error) = self.apply_similarity_edges(next.id, &candidates).await {
656 let _ = self.cached_graph().remove_node(next.id).await;
657 return Err(error);
658 }
659 }
660 }
661
662 if let Err(error) = self.append_episodic_record(&next).await {
663 let _ = self.cached_graph().remove_node(next.id).await;
664 return Err(error);
665 }
666
667 if let Err(error) = self.cached_graph().remove_node(current.id).await {
668 tracing::warn!(id = %current.id, error = %error, "failed to remove superseded episodic graph node");
669 }
670
671 Ok(next)
672 }
673
674 pub(super) async fn write_episodic_record(&self, record: &EpisodicRecord) -> HirnResult<()> {
676 let id = record.id;
677 let exact_filter = hirn_storage::store::ExactMatchFilter::utf8_value("id", id.to_string());
678 self.storage_runtime
679 .delete_exact(
680 hirn_storage::datasets::episodic::DATASET_NAME,
681 &exact_filter,
682 )
683 .await
684 .map_err(|e| HirnError::storage(e))?;
685 let dims = self.config.embedding_dimensions.as_usize();
686 let batch = hirn_storage::datasets::episodic::to_batch(std::slice::from_ref(record), dims)
687 .map_err(|e| HirnError::storage(e))?;
688 self.storage_runtime
689 .append(hirn_storage::datasets::episodic::DATASET_NAME, batch)
690 .await
691 .map_err(|e| HirnError::storage(e))?;
692 self.evict_episodic_head(record.logical_memory_id);
693 Ok(())
694 }
695
696 pub(crate) async fn remember(&self, record: EpisodicRecord) -> HirnResult<MemoryId> {
705 self.remember_with_explanation(record)
706 .await
707 .map(|(id, _)| id)
708 .map_err(|failure| failure.error)
709 }
710
711 pub(crate) async fn remember_with_explanation(
712 &self,
713 record: EpisodicRecord,
714 ) -> Result<(MemoryId, crate::RememberExplanation), crate::RememberFailure> {
715 let realm = self.config.default_realm.clone();
716 let event_namespace = record.namespace;
717 let event_agent = record.provenance.created_by;
718 let start = std::time::Instant::now();
719 match self.remember_inner_with_explanation(record, false).await {
720 Ok((id, explanation)) => {
721 metrics::counter!(crate::metrics::REMEMBER_TOTAL, "realm" => realm.clone(), "status" => "success").increment(1);
722 metrics::histogram!(crate::metrics::STORE_DURATION_SECONDS, "realm" => realm)
723 .record(start.elapsed().as_secs_f64());
724 Ok((id, explanation))
725 }
726 Err(failure) => {
727 let status = if matches!(
728 failure.error,
729 HirnError::AccessDenied(_) | HirnError::InvalidInput(_)
730 ) {
731 "client_error"
732 } else {
733 "server_error"
734 };
735 metrics::counter!(crate::metrics::REMEMBER_TOTAL, "realm" => realm.clone(), "status" => status).increment(1);
736 self.emit_scoped(
737 event_namespace.as_str(),
738 event_agent.as_str(),
739 crate::event::MemoryEvent::Error {
740 operation: "remember".to_string(),
741 message: failure.error.to_string(),
742 },
743 )
744 .await;
745 Err(failure)
746 }
747 }
748 }
749
750 pub async fn remember_bypass_admission(&self, record: EpisodicRecord) -> HirnResult<MemoryId> {
754 self.remember_inner(record, true).await
755 }
756
757 pub(crate) async fn batch_remember(
764 &self,
765 records: Vec<EpisodicRecord>,
766 ) -> Vec<HirnResult<MemoryId>> {
767 if records.is_empty() {
768 return Vec::new();
769 }
770
771 let n = records.len();
772 let realm = self.config.default_realm.clone();
773 let record_batch_stage = |stage: &'static str, elapsed: std::time::Duration| {
774 metrics::histogram!(
775 crate::metrics::BATCH_REMEMBER_STAGE_DURATION_SECONDS,
776 "realm" => realm.clone(),
777 "stage" => stage,
778 )
779 .record(elapsed.as_secs_f64());
780 };
781
782 let agent_id = records[0].provenance.created_by;
784 for rec in records.iter().skip(1) {
785 if rec.provenance.created_by != agent_id {
786 return (0..n)
787 .map(|_| {
788 Err(HirnError::InvalidInput(
789 "batch_remember: all records must have the same agent_id".into(),
790 ))
791 })
792 .collect();
793 }
794 }
795
796 {
798 let stage_start = std::time::Instant::now();
799 let mut checked_namespaces = HashSet::new();
800 for rec in &records {
801 if checked_namespaces.insert(rec.namespace) {
802 if let Err(e) = self
803 .enforce(
804 agent_id.as_str(),
805 crate::policy::Action::Remember,
806 &realm,
807 rec.namespace.as_str(),
808 )
809 .await
810 {
811 let msg = format!("{e}");
812 return (0..n)
813 .map(|_| Err(HirnError::AccessDenied(msg.clone())))
814 .collect();
815 }
816 }
817 }
818 record_batch_stage("authorize", stage_start.elapsed());
819 }
820
821 let mut results: Vec<Option<HirnResult<MemoryId>>> = (0..n).map(|_| None).collect();
823
824 let stage_start = std::time::Instant::now();
826 let mut admitted: Vec<(usize, EpisodicRecord)> = Vec::with_capacity(n);
827 for (idx, mut rec) in records.into_iter().enumerate() {
828 match self.admission_runtime().evaluate_record(&rec).await {
829 Ok(Some(result)) => {
830 let candidate = crate::admission::MemoryCandidate::from_record(&rec);
831 let controllers_consulted: Vec<String> = result
832 .verdicts
833 .iter()
834 .map(|v| v.controller.clone())
835 .collect();
836 let decision_str = format!("{:?}", result.decision);
837 self.emit_scoped(
838 rec.namespace.as_str(),
839 rec.provenance.created_by.as_str(),
840 MemoryEvent::AdmissionEvaluated {
841 candidate_id: candidate.id,
842 decision: decision_str,
843 controllers_consulted,
844 },
845 )
846 .await;
847
848 match apply_admission_decision(&mut rec, result.decision, &realm) {
849 Ok(()) => admitted.push((idx, rec)),
850 Err(error) => results[idx] = Some(Err(error)),
851 }
852 }
853 Ok(None) => admitted.push((idx, rec)),
854 Err(error) => results[idx] = Some(Err(error)),
855 }
856 }
857 record_batch_stage("admission", stage_start.elapsed());
858
859 if admitted.is_empty() {
860 return results
861 .into_iter()
862 .map(|r| r.unwrap_or_else(|| Err(HirnError::InvalidInput("unreachable".into()))))
863 .collect();
864 }
865
866 let stage_start = std::time::Instant::now();
868 let mut need_embed: Vec<(usize, String)> = Vec::new();
870 for (i, (_, rec)) in admitted.iter().enumerate() {
871 if rec.embedding.is_none() {
872 if let Some(ref mc) = rec.multi_content {
873 need_embed.push((i, mc.text_for_embedding().into_owned()));
874 }
875 }
876 }
877
878 if !need_embed.is_empty() {
879 let texts: Vec<&str> = need_embed.iter().map(|(_, t)| t.as_str()).collect();
880 let embedding_slots = if let Some(embedder) = self.provider_runtime().embedder() {
881 match embedder.embed(&texts).await {
882 Ok(embs) => embs.into_iter().map(Some).collect(),
883 Err(e) => {
884 let error_display = e.to_string();
885 if let Some(partial) = e.into_partial_embedding_batch() {
886 let mut failures = 0u64;
887
888 tracing::warn!(
889 completed = partial.completed(),
890 failed = partial.failed(),
891 count = texts.len(),
892 "batch_remember: preserving partial embed successes and requeuing only failed items"
893 );
894
895 for (local_idx, maybe_embedding) in
896 partial.embeddings.iter().enumerate()
897 {
898 if maybe_embedding.is_none() {
899 failures += 1;
900 let admitted_idx = need_embed[local_idx].0;
901 self.write_runtime()
902 .enqueue_pending_embed(admitted[admitted_idx].1.id);
903 }
904 }
905
906 if failures > 0 {
907 metrics::counter!(
908 crate::metrics::PROVIDER_FALLBACK_TOTAL,
909 "realm" => realm.clone(),
910 "provider_type" => "embed"
911 )
912 .increment(failures);
913 }
914
915 partial.embeddings
916 } else {
917 tracing::warn!(
919 error = %error_display,
920 count = texts.len(),
921 "batch_remember: embed failed, storing without embeddings (provider fallback)"
922 );
923 metrics::counter!(
924 crate::metrics::PROVIDER_FALLBACK_TOTAL,
925 "realm" => realm.clone(),
926 "provider_type" => "embed"
927 )
928 .increment(texts.len() as u64);
929 for (idx, _) in &need_embed {
931 self.write_runtime()
932 .enqueue_pending_embed(admitted[*idx].1.id);
933 }
934 vec![None; texts.len()]
935 }
936 }
937 }
938 } else {
939 let pseudo = hirn_provider::PseudoEmbedder::new(self.embedding_dims());
940 match pseudo.embed(&texts).await {
941 Ok(embs) => embs.into_iter().map(Some).collect(),
942 Err(_) => vec![None; texts.len()],
943 }
944 };
945
946 for (i, (admitted_idx, _)) in need_embed.iter().enumerate() {
947 if let Some(Some(emb)) = embedding_slots.get(i) {
948 admitted[*admitted_idx].1.embedding = Some(emb.vector.clone());
949 }
950 }
951 }
952 record_batch_stage("embedding", stage_start.elapsed());
953
954 let stage_start = std::time::Instant::now();
956 struct WritePathInfo {
959 content: Option<String>, max_similarity: Option<f32>, namespace: hirn_core::types::Namespace,
962 }
963 let mut write_path_infos: Vec<Option<WritePathInfo>> = (0..n).map(|_| None).collect();
964
965 if self.config.rpe_enabled {
966 let mut partition_stats: HashMap<
967 super::write_path::RpePartitionKey,
968 super::write_path::RunningRpeStats,
969 > = HashMap::new();
970 let mut partition_deltas: HashMap<
971 super::write_path::RpePartitionKey,
972 super::write_path::RunningRpeStats,
973 > = HashMap::new();
974
975 let indexed_embeddings: Vec<(usize, usize, Vec<f32>)> = admitted
977 .iter()
978 .enumerate()
979 .filter_map(|(ai, (orig_idx, rec))| {
980 rec.embedding.as_ref().map(|e| (ai, *orig_idx, e.clone()))
981 })
982 .collect();
983
984 let batch_breakers: Vec<(usize, std::sync::Arc<super::write_path::RpeCircuitBreaker>)> =
987 indexed_embeddings
988 .iter()
989 .map(|(ai, _, _)| {
990 let ns = admitted[*ai].1.namespace;
991 let key = self.write_runtime().rpe_partition_key(
992 ns,
993 &self.rpe_model_id(),
994 hirn_core::types::Layer::Episodic,
995 );
996 (*ai, self.write_runtime().rpe_circuit_breaker_for(&key))
997 })
998 .collect();
999 let any_circuit_open = batch_breakers.iter().any(|(_, b)| b.is_open());
1000
1001 let max_sims: Vec<f32> = if indexed_embeddings.is_empty() {
1002 Vec::new()
1003 } else if any_circuit_open {
1004 tracing::warn!(
1005 count = indexed_embeddings.len(),
1006 "RPE circuit open — batch vector search skipped, defaulting to max similarity"
1007 );
1008 vec![1.0_f32; indexed_embeddings.len()]
1009 } else {
1010 let embeddings: Vec<Vec<f32>> =
1011 indexed_embeddings.iter().map(|(_, _, e)| e.clone()).collect();
1012 match super::write_path::batch_vector_search_max_sim(
1013 self.storage_backend(),
1014 &embeddings,
1015 self.config.rpe_similarity_search_limit,
1016 )
1017 .await
1018 {
1019 None => Vec::new(),
1020 Some(result) => {
1021 for (_, breaker) in &batch_breakers {
1023 if result.had_storage_error {
1024 breaker.record_failure(super::write_path::RPE_CIRCUIT_OPEN_SECS);
1025 } else {
1026 breaker.record_success();
1027 }
1028 }
1029 result.max_sims
1030 }
1031 }
1032 };
1033
1034 for ((ai, orig_idx, _), max_sim) in
1036 indexed_embeddings.iter().zip(max_sims.iter())
1037 {
1038 let (_, rec) = &mut admitted[*ai];
1039 let key = self
1040 .write_runtime()
1041 .rpe_partition_key(rec.namespace, &self.rpe_model_id(), hirn_core::types::Layer::Episodic);
1042 let stats = partition_stats
1043 .entry(key.clone())
1044 .or_insert_with(|| self.write_runtime().snapshot_rpe_stats(&key));
1045
1046 let max_sim = *max_sim;
1047 let distance = 1.0_f32 - max_sim;
1048 let z_score = stats.z_score(f64::from(distance)) as f32;
1049 stats.update(f64::from(distance));
1051 let rpe_score = (distance * (1.0 + z_score)).clamp(0.0, 2.0);
1052 let is_fast_path = rpe_score < self.config.rpe_fast_path_threshold;
1053
1054 let rpe = super::write_path::RpeResult {
1055 score: rpe_score,
1056 max_similarity: max_sim,
1057 is_fast_path,
1058 };
1059 self.write_runtime().record_rpe_routing_metric(
1060 &key,
1061 &rpe,
1062 self.config.rpe_fast_path_threshold,
1063 );
1064 partition_deltas
1065 .entry(key)
1066 .or_default()
1067 .update(f64::from(distance));
1068 if is_fast_path {
1069 rec.importance = 0.3 + 0.2 * rpe_score;
1070 tracing::debug!(id = %rec.id, rpe_score, "batch RPE fast-path");
1071 }
1072 write_path_infos[*orig_idx] = Some(WritePathInfo {
1073 content: if is_fast_path {
1074 None
1075 } else {
1076 Some(rec.content.clone())
1077 },
1078 max_similarity: Some(max_sim),
1079 namespace: rec.namespace,
1080 });
1081 }
1082 for (key, delta) in partition_deltas {
1083 self.write_runtime().merge_rpe_stats(&key, &delta);
1084 }
1085 } else {
1086 for (orig_idx, rec) in &admitted {
1088 write_path_infos[*orig_idx] = Some(WritePathInfo {
1089 content: Some(rec.content.clone()),
1090 max_similarity: None,
1091 namespace: rec.namespace,
1092 });
1093 }
1094 }
1095
1096 for (orig_idx, rec) in &admitted {
1098 if let Some(ref emb) = rec.embedding {
1099 if emb.len() != self.config.embedding_dimensions.as_usize() {
1100 results[*orig_idx] = Some(Err(HirnError::InvalidInput(format!(
1101 "embedding dimension mismatch: expected {}, got {}",
1102 self.config.embedding_dimensions.as_usize(),
1103 emb.len()
1104 ))));
1105 }
1106 }
1107 }
1108
1109 let mut admitted: Vec<(usize, EpisodicRecord)> = admitted
1111 .into_iter()
1112 .filter(|(idx, _)| results[*idx].is_none())
1113 .collect();
1114
1115 for (_orig_idx, rec) in &mut admitted {
1117 match self.config.text_retention {
1118 hirn_core::TextRetention::Full => {}
1119 hirn_core::TextRetention::SummaryOnly => {
1120 rec.content = String::new();
1121 }
1122 hirn_core::TextRetention::None => {
1123 rec.content = String::new();
1124 rec.summary = String::new();
1125 }
1126 }
1127 }
1128
1129 if admitted.is_empty() {
1130 return results
1131 .into_iter()
1132 .map(|r| r.unwrap_or_else(|| Err(HirnError::InvalidInput("unreachable".into()))))
1133 .collect();
1134 }
1135 record_batch_stage("prepare", stage_start.elapsed());
1136
1137 struct PreparedRecord {
1139 idx: usize,
1140 record: EpisodicRecord,
1141 content_preview: String,
1142 edge_requests: Vec<EdgeInsert>,
1143 episode_envelope: hirn_storage::MutationEnvelopeRecord,
1144 }
1145 let mut prepared: Vec<PreparedRecord> = Vec::with_capacity(admitted.len());
1146 let stage_start = std::time::Instant::now();
1147 let fallback_entity_candidates = if admitted
1148 .iter()
1149 .any(|(_, rec)| rec.embedding.is_none() && !rec.entities.is_empty())
1150 {
1151 match self.fetch_recent_entity_candidate_records().await {
1152 Ok(records) => Some(records),
1153 Err(error) => {
1154 tracing::warn!(error = %error, "batch_remember: failed to prefetch entity-edge candidates; falling back to per-record scan");
1155 None
1156 }
1157 }
1158 } else {
1159 None
1160 };
1161 let mut unique_embeddings: Vec<Vec<f32>> = Vec::new();
1162 let mut unique_embedding_result_indices: HashMap<Vec<u32>, usize> = HashMap::new();
1163 let mut record_embedding_result_indices: HashMap<MemoryId, usize> = HashMap::new();
1164 let mut embedded_candidate_ids = Vec::new();
1165 let mut embedded_candidate_id_set = HashSet::new();
1166 for (_idx, rec) in &admitted {
1167 let Some(embedding) = rec.embedding.as_deref() else {
1168 continue;
1169 };
1170 let embedding_key: Vec<u32> = embedding.iter().map(|value| value.to_bits()).collect();
1171 let result_index =
1172 if let Some(&index) = unique_embedding_result_indices.get(&embedding_key) {
1173 index
1174 } else {
1175 let index = unique_embeddings.len();
1176 unique_embeddings.push(embedding.to_vec());
1177 unique_embedding_result_indices.insert(embedding_key, index);
1178 index
1179 };
1180 record_embedding_result_indices.insert(rec.id, result_index);
1181 }
1182 let embedded_auto_edge_candidate_results = self
1183 .find_auto_edge_candidates_many(&unique_embeddings)
1184 .await;
1185 for (_idx, rec) in &admitted {
1186 let Some(&result_index) = record_embedding_result_indices.get(&rec.id) else {
1187 continue;
1188 };
1189 let Some(candidates) = embedded_auto_edge_candidate_results.get(result_index) else {
1190 continue;
1191 };
1192 for &(uid, _sim) in candidates {
1193 let candidate_id = MemoryId::from_ulid(ulid::Ulid(uid));
1194 if candidate_id != rec.id && embedded_candidate_id_set.insert(candidate_id) {
1195 embedded_candidate_ids.push(candidate_id);
1196 }
1197 }
1198 }
1199 let prefetched_embedded_candidate_records = if embedded_candidate_ids.is_empty() {
1200 None
1201 } else {
1202 let include_entities = admitted.iter().any(|(_, rec)| !rec.entities.is_empty());
1203 match self
1204 .fetch_hydrated_candidate_records_by_ids(&embedded_candidate_ids, include_entities)
1205 .await
1206 {
1207 Ok(records_by_id) => Some(records_by_id),
1208 Err(error) => {
1209 tracing::warn!(error = %error, "batch_remember: failed to prefetch embedded auto-edge candidate records; falling back to per-record hydration");
1210 None
1211 }
1212 }
1213 };
1214 record_batch_stage("auto_edge_prefetch", stage_start.elapsed());
1215
1216 let stage_start = std::time::Instant::now();
1217
1218 for (idx, mut rec) in admitted {
1219 let namespace = rec.namespace;
1220 let content_preview = rec.content.chars().take(120).collect::<String>();
1221 let entities: Vec<String> = rec.entities.iter().map(|e| e.name.clone()).collect();
1222
1223 if let Some(ref mc) = rec.multi_content {
1225 match self
1226 .extract_and_store_resources(namespace, rec.provenance.created_by, mc)
1227 .await
1228 {
1229 Ok(extracted) => {
1230 rec.multi_content = Some(extracted.content);
1231 rec.provenance
1232 .evidence_links
1233 .extend(extracted.evidence_links);
1234 }
1235 Err(e) => {
1236 results[idx] = Some(Err(e));
1237 continue;
1238 }
1239 }
1240 }
1241 let edge_requests = match self
1242 .plan_auto_episode_edge_requests(
1243 rec.id,
1244 rec.namespace,
1245 rec.embedding.as_deref(),
1246 &rec.content,
1247 &entities,
1248 record_embedding_result_indices
1249 .get(&rec.id)
1250 .and_then(|&result_index| {
1251 embedded_auto_edge_candidate_results
1252 .get(result_index)
1253 .map(Vec::as_slice)
1254 }),
1255 prefetched_embedded_candidate_records.as_ref(),
1256 fallback_entity_candidates.as_deref(),
1257 )
1258 .await
1259 {
1260 Ok(edge_requests) => edge_requests,
1261 Err(error) => {
1262 results[idx] = Some(Err(error));
1263 continue;
1264 }
1265 };
1266 let episode_envelope =
1267 match build_episode_remember_envelope(&rec, &content_preview, &edge_requests) {
1268 Ok(envelope) => envelope,
1269 Err(error) => {
1270 results[idx] = Some(Err(error));
1271 continue;
1272 }
1273 };
1274
1275 prepared.push(PreparedRecord {
1276 idx,
1277 record: rec,
1278 content_preview,
1279 edge_requests,
1280 episode_envelope,
1281 });
1282 }
1283
1284 let envelope_list: Vec<_> = prepared.iter().map(|r| r.episode_envelope.clone()).collect();
1285 let graph_nodes = prepared
1286 .iter()
1287 .map(|prepared_record| crate::graph::GraphNodeData {
1288 id: prepared_record.record.id,
1289 layer: Layer::Episodic,
1290 importance: prepared_record.record.importance,
1291 created_at: prepared_record.record.timestamp,
1292 namespace: prepared_record.record.namespace,
1293 access_count: 0,
1294 })
1295 .collect::<Vec<_>>();
1296
1297 let (envelope_result, node_result) = tokio::join!(
1301 hirn_storage::append_mutation_envelopes(self.storage_backend(), &envelope_list),
1302 self.cached_graph().add_nodes(&graph_nodes),
1303 );
1304 match (envelope_result, node_result) {
1305 (Ok(()), Ok(())) => {}
1306 (Err(error), Ok(())) => {
1307 for node in &graph_nodes {
1309 let _ = self.cached_graph().remove_node(node.id).await;
1310 }
1311 let message = error.to_string();
1312 for record in &prepared {
1313 results[record.idx] = Some(Err(HirnError::storage(message.clone())));
1314 }
1315 record_batch_stage("graph_prepare", stage_start.elapsed());
1316 return results
1317 .into_iter()
1318 .map(|r| {
1319 r.unwrap_or_else(|| {
1320 Err(HirnError::storage("episode envelope append failed"))
1321 })
1322 })
1323 .collect();
1324 }
1325 (Ok(()), Err(error)) => {
1326 let message = error.to_string();
1327 for prepared_record in &prepared {
1328 results[prepared_record.idx] =
1329 Some(Err(HirnError::storage(message.clone())));
1330 }
1331 record_batch_stage("graph_prepare", stage_start.elapsed());
1332 return results
1333 .into_iter()
1334 .map(|r| {
1335 r.unwrap_or_else(|| {
1336 Err(HirnError::storage("graph node batch persist failed"))
1337 })
1338 })
1339 .collect();
1340 }
1341 (Err(env_err), Err(node_err)) => {
1342 tracing::warn!(
1343 node_error = %node_err,
1344 "batch_remember: add_nodes also failed during envelope error"
1345 );
1346 let message = env_err.to_string();
1347 for record in &prepared {
1348 results[record.idx] = Some(Err(HirnError::storage(message.clone())));
1349 }
1350 record_batch_stage("graph_prepare", stage_start.elapsed());
1351 return results
1352 .into_iter()
1353 .map(|r| {
1354 r.unwrap_or_else(|| {
1355 Err(HirnError::storage("episode envelope append failed"))
1356 })
1357 })
1358 .collect();
1359 }
1360 }
1361
1362 let edge_request_batches = prepared
1363 .iter()
1364 .map(|prepared_record| {
1365 (
1366 prepared_record.record.namespace,
1367 prepared_record.record.provenance.created_by,
1368 prepared_record.edge_requests.as_slice(),
1369 )
1370 })
1371 .collect::<Vec<_>>();
1372 if let Err(error) = self
1373 .apply_episode_edge_request_batches(&edge_request_batches)
1374 .await
1375 {
1376 let message = error.to_string();
1377 for prepared_record in &prepared {
1378 if let Err(cleanup_err) = self
1379 .cached_graph()
1380 .remove_node(prepared_record.record.id)
1381 .await
1382 {
1383 tracing::warn!(
1384 id = %prepared_record.record.id,
1385 error = %cleanup_err,
1386 "batch_remember: failed to remove graph node after batched edge application error"
1387 );
1388 }
1389 results[prepared_record.idx] =
1390 Some(Err(HirnError::StorageError(message.clone().into())));
1391 }
1392 record_batch_stage("graph_prepare", stage_start.elapsed());
1393 return results
1394 .into_iter()
1395 .map(|r| {
1396 r.unwrap_or_else(|| {
1397 Err(HirnError::storage(
1398 "batched episode edge application failed",
1399 ))
1400 })
1401 })
1402 .collect();
1403 }
1404 let mut prepared = prepared;
1405 record_batch_stage("graph_prepare", stage_start.elapsed());
1406
1407 let stage_start = std::time::Instant::now();
1409 if !prepared.is_empty() {
1410 let lance_records = prepared
1411 .iter()
1412 .map(|record| record.record.clone())
1413 .collect::<Vec<_>>();
1414 let dims = self.config.embedding_dimensions.as_usize();
1415 match hirn_storage::datasets::episodic::to_batch(&lance_records, dims) {
1416 Ok(batch) => {
1417 if let Err(e) = self
1418 .storage_runtime
1419 .append(hirn_storage::datasets::episodic::DATASET_NAME, batch)
1420 .await
1421 {
1422 tracing::error!(
1423 count = lance_records.len(),
1424 error = %e,
1425 "batch_remember: LanceDB batch append failed"
1426 );
1427 for record in &prepared {
1429 if let Err(cleanup_err) =
1430 self.cached_graph().remove_node(record.record.id).await
1431 {
1432 tracing::warn!(
1433 id = %record.record.id,
1434 error = %cleanup_err,
1435 "batch_remember: failed to remove orphaned graph node"
1436 );
1437 }
1438 }
1439 let msg = format!("{e}");
1440 for record in &prepared {
1441 results[record.idx] =
1442 Some(Err(HirnError::StorageError(msg.clone().into())));
1443 }
1444 record_batch_stage("append", stage_start.elapsed());
1445 return results
1446 .into_iter()
1447 .map(|r| {
1448 r.unwrap_or_else(|| {
1449 Err(HirnError::storage("LanceDB append failed"))
1450 })
1451 })
1452 .collect();
1453 }
1454 }
1455 Err(e) => {
1456 tracing::error!(
1457 count = lance_records.len(),
1458 error = %e,
1459 "batch_remember: LanceDB to_batch failed"
1460 );
1461 for record in &prepared {
1463 if let Err(cleanup_err) =
1464 self.cached_graph().remove_node(record.record.id).await
1465 {
1466 tracing::warn!(
1467 id = %record.record.id,
1468 error = %cleanup_err,
1469 "batch_remember: failed to remove orphaned graph node"
1470 );
1471 }
1472 }
1473 let msg = format!("{e}");
1474 for record in &prepared {
1475 results[record.idx] =
1476 Some(Err(HirnError::StorageError(msg.clone().into())));
1477 }
1478 record_batch_stage("append", stage_start.elapsed());
1479 return results
1480 .into_iter()
1481 .map(|r| {
1482 r.unwrap_or_else(|| Err(HirnError::storage("LanceDB to_batch failed")))
1483 })
1484 .collect();
1485 }
1486 }
1487 }
1488 record_batch_stage("append", stage_start.elapsed());
1489
1490 let stage_start = std::time::Instant::now();
1494 let mut temporal_edge_requests = Vec::with_capacity(prepared.len().saturating_sub(1));
1495 let mut temporal_envelope_updates = Vec::new();
1496 for prepared_record in &mut prepared {
1497 let arrival = self
1498 .write_runtime()
1499 .record_arrival(prepared_record.record.namespace, prepared_record.record.id);
1500 let temporal_edge_request =
1501 temporal_edge_request_for_arrival(prepared_record.record.id, &arrival);
1502 if temporal_edge_request.is_some() {
1503 match update_episode_envelope_temporal_edge(
1504 &mut prepared_record.episode_envelope,
1505 temporal_edge_request.clone(),
1506 ) {
1507 Ok(()) => {
1508 temporal_envelope_updates.push(prepared_record.episode_envelope.clone());
1509 }
1510 Err(error) => {
1511 tracing::warn!(
1512 id = %prepared_record.record.id,
1513 error = %error,
1514 "batch_remember: failed to encode TemporalNext episode envelope update"
1515 );
1516 }
1517 }
1518 }
1519 if let Some(request) = temporal_edge_request {
1520 temporal_edge_requests.push(request);
1521 }
1522 }
1523 let (envelope_result, _) = tokio::join!(
1526 hirn_storage::replace_mutation_envelopes(
1527 self.storage_backend(),
1528 &temporal_envelope_updates,
1529 ),
1530 self.cached_graph().add_edges_best_effort(&temporal_edge_requests),
1531 );
1532 if let Err(error) = envelope_result {
1533 tracing::warn!(
1534 count = temporal_envelope_updates.len(),
1535 error = %error,
1536 "batch_remember: failed to persist TemporalNext episode envelope updates"
1537 );
1538 }
1539 record_batch_stage("temporal_next", stage_start.elapsed());
1540
1541 let stage_start = std::time::Instant::now();
1543 let mut finalized_envelopes = Vec::new();
1544 let mut group_start = 0usize;
1545 while group_start < prepared.len() {
1546 let group_namespace = prepared[group_start].record.namespace;
1547 let group_agent_id = prepared[group_start].record.provenance.created_by;
1548 let mut group_end = group_start + 1;
1549 while group_end < prepared.len()
1550 && prepared[group_end].record.namespace == group_namespace
1551 && prepared[group_end].record.provenance.created_by == group_agent_id
1552 {
1553 group_end += 1;
1554 }
1555
1556 let events = prepared[group_start..group_end]
1557 .iter()
1558 .map(|prepared_record| MemoryEvent::EpisodeCreated {
1559 id: prepared_record.record.id,
1560 content_preview: prepared_record.content_preview.clone(),
1561 })
1562 .collect::<Vec<_>>();
1563
1564 match self
1565 .event_runtime()
1566 .emit_checked_batch(
1567 &self.config.default_realm,
1568 group_namespace.as_str(),
1569 group_agent_id.as_str(),
1570 events,
1571 )
1572 .await
1573 {
1574 Ok(()) => {
1575 for prepared_record in &mut prepared[group_start..group_end] {
1576 results[prepared_record.idx] = Some(Ok(prepared_record.record.id));
1577 prepared_record.episode_envelope.state =
1578 hirn_storage::MutationEnvelopeState::Applied;
1579 prepared_record.episode_envelope.last_error = None;
1580 prepared_record.episode_envelope.updated_at = Timestamp::now();
1581 finalized_envelopes.push(prepared_record.episode_envelope.clone());
1582 }
1583 }
1584 Err(error) => {
1585 let message = error.to_string();
1586 for prepared_record in &prepared[group_start..group_end] {
1587 results[prepared_record.idx] =
1588 Some(Err(HirnError::storage(message.clone())));
1589 }
1590 }
1591 }
1592
1593 group_start = group_end;
1594 }
1595 if let Err(error) =
1596 hirn_storage::replace_mutation_envelopes(self.storage_backend(), &finalized_envelopes)
1597 .await
1598 {
1599 tracing::warn!(
1600 count = finalized_envelopes.len(),
1601 error = %error,
1602 "batch_remember: episode mutation envelope finalize failed; recovery will retry cleanup"
1603 );
1604 }
1605 for prepared_record in &prepared {
1606 if results[prepared_record.idx]
1607 .as_ref()
1608 .is_some_and(Result::is_ok)
1609 {
1610 self.cache_episodic_head(&prepared_record.record);
1611 }
1612 }
1613 record_batch_stage("events", stage_start.elapsed());
1614
1615 let stage_start = std::time::Instant::now();
1617 let mut prospective_batches = Vec::new();
1618 let mut prospective_rows = 0usize;
1619 let mut svo_batches = Vec::new();
1620 let mut svo_rows = 0usize;
1621
1622 for p in &prepared {
1623 let info = write_path_infos[p.idx].take();
1624 if let Some(info) = info {
1625 if let Some(ref content) = info.content {
1627 if self.config.prospective_indexing_enabled {
1628 if let Some(embedder) = self.provider_runtime().embedder() {
1629 if let Some(batch) =
1630 super::write_path::prepare_prospective_implications_batch(
1631 &*embedder,
1632 p.record.id,
1633 content,
1634 self.config.prospective_indexing_num_questions,
1635 self.config.prospective_indexing_timeout_secs,
1636 &self.config.prospective_indexing_templates,
1637 info.namespace.as_str(),
1638 )
1639 .await
1640 {
1641 let count = batch.num_rows();
1642 prospective_rows += count;
1643 prospective_batches.push(batch);
1644 tracing::debug!(id = %p.record.id, count, "batch: prospective implications prepared");
1645 }
1646 }
1647 }
1648
1649 if self.config.svo_extraction_enabled {
1650 if let Some(batch) = super::write_path::prepare_svo_events_batch(
1651 p.record.id,
1652 content,
1653 self.config.svo_confidence_threshold,
1654 info.namespace.as_str(),
1655 self.config.embedding_dimensions.as_usize(),
1656 ) {
1657 let count = batch.num_rows();
1658 svo_rows += count;
1659 svo_batches.push(batch);
1660 tracing::debug!(id = %p.record.id, count, "batch: SVO events prepared");
1661 }
1662 }
1663 }
1664
1665 if let Some(max_sim) = info.max_similarity {
1667 let interference =
1668 super::write_path::interference_score_from_similarity(max_sim);
1669 if interference > 0.0 {
1670 let action = self.write_runtime().accumulate_interference(
1671 interference,
1672 info.namespace,
1673 self.config.interference_consolidation_threshold,
1674 self.config.interference_consolidation_cooldown_secs,
1675 );
1676 match action {
1677 super::write_path::InterferenceAction::TriggerConsolidation {
1678 namespaces,
1679 backlog_score,
1680 cause,
1681 } => {
1682 tracing::info!(
1683 namespace_count = namespaces.len(),
1684 backlog_score,
1685 cause = cause.as_str(),
1686 "batch: interference threshold exceeded, consolidation requested"
1687 );
1688 }
1689 super::write_path::InterferenceAction::Suppressed {
1690 reason,
1691 backlog_score,
1692 } => {
1693 tracing::debug!(
1694 reason = reason.as_str(),
1695 backlog_score,
1696 "batch: interference request suppressed"
1697 );
1698 }
1699 super::write_path::InterferenceAction::None => {}
1700 }
1701 }
1702 }
1703 }
1704 }
1705
1706 if !prospective_batches.is_empty() {
1707 let batch_count = prospective_batches.len();
1708 if let Err(e) = self
1709 .storage_runtime
1710 .append_batches(
1711 hirn_storage::datasets::prospective_implications::DATASET_NAME,
1712 prospective_batches,
1713 )
1714 .await
1715 {
1716 tracing::warn!(error = %e, "batch: failed to write prospective implications");
1717 } else {
1718 tracing::debug!(
1719 batch_count,
1720 count = prospective_rows,
1721 "batch: prospective implications stored"
1722 );
1723 }
1724 }
1725
1726 if !svo_batches.is_empty() {
1727 let batch_count = svo_batches.len();
1728 if let Err(e) = self
1729 .storage_runtime
1730 .append_batches("svo_events", svo_batches)
1731 .await
1732 {
1733 tracing::warn!(error = %e, "batch: failed to write SVO events");
1734 } else {
1735 tracing::debug!(batch_count, count = svo_rows, "batch: SVO events stored");
1736 }
1737 }
1738 record_batch_stage("slow_path", stage_start.elapsed());
1739
1740 let success_count = results
1742 .iter()
1743 .filter(|result| matches!(result, Some(Ok(_))))
1744 .count() as u64;
1745 let fail_count = (n as u64).saturating_sub(success_count);
1746 if success_count > 0 {
1747 metrics::counter!(crate::metrics::REMEMBER_TOTAL, "realm" => realm.clone(), "status" => "success")
1748 .increment(success_count);
1749 }
1750 if fail_count > 0 {
1751 metrics::counter!(crate::metrics::REMEMBER_TOTAL, "realm" => realm, "status" => "client_error")
1752 .increment(fail_count);
1753 }
1754
1755 self.write_runtime().flush_rpe_stats_if_due(self.path());
1757
1758 results
1759 .into_iter()
1760 .map(|r| r.unwrap_or_else(|| Err(HirnError::InvalidInput("unreachable".into()))))
1761 .collect()
1762 }
1763
1764 async fn remember_inner(
1766 &self,
1767 record: EpisodicRecord,
1768 bypass_admission: bool,
1769 ) -> HirnResult<MemoryId> {
1770 self.remember_inner_with_explanation(record, bypass_admission)
1771 .await
1772 .map(|(id, _)| id)
1773 .map_err(|failure| failure.error)
1774 }
1775
1776 async fn remember_inner_with_explanation(
1777 &self,
1778 mut record: EpisodicRecord,
1779 bypass_admission: bool,
1780 ) -> Result<(MemoryId, crate::RememberExplanation), crate::RememberFailure> {
1781 let actor_id = record.provenance.created_by;
1782 let namespace = record.namespace;
1783 let initial_embedding = if record.embedding.is_some() {
1784 crate::EmbeddingDisposition::Provided
1785 } else {
1786 crate::EmbeddingDisposition::Missing
1787 };
1788 let mut explanation = crate::RememberExplanation::new(
1789 actor_id,
1790 namespace,
1791 bypass_admission,
1792 initial_embedding,
1793 self.config.text_retention,
1794 );
1795
1796 if let Err(error) = self
1798 .enforce(
1799 record.provenance.created_by.as_str(),
1800 crate::policy::Action::Remember,
1801 &self.config.default_realm,
1802 record.namespace.as_str(),
1803 )
1804 .await
1805 {
1806 explanation.status = crate::RememberStatus::Rejected;
1807 explanation.error = Some(error.to_string());
1808 return Err(crate::RememberFailure::new(error, explanation));
1809 }
1810
1811 if !bypass_admission {
1813 let admission_result = self
1814 .admission_runtime()
1815 .evaluate_record(&record)
1816 .await
1817 .map_err(|error| {
1818 explanation.status = crate::RememberStatus::Failed;
1819 explanation.error = Some(error.to_string());
1820 crate::RememberFailure::new(error, explanation.clone())
1821 })?;
1822 if let Some(result) = admission_result {
1823 let candidate = crate::admission::MemoryCandidate::from_record(&record);
1824 let controllers_consulted: Vec<String> = result
1825 .verdicts
1826 .iter()
1827 .map(|v| v.controller.clone())
1828 .collect();
1829 let decision = result.decision.clone();
1830 let decision_str = format!("{:?}", decision);
1831 self.emit_scoped(
1832 record.namespace.as_str(),
1833 record.provenance.created_by.as_str(),
1834 MemoryEvent::AdmissionEvaluated {
1835 candidate_id: candidate.id,
1836 decision: decision_str,
1837 controllers_consulted: controllers_consulted.clone(),
1838 },
1839 )
1840 .await;
1841
1842 explanation.admission = Some(crate::AdmissionExplanation {
1843 decision: decision.clone(),
1844 controllers_consulted,
1845 });
1846 if let Err(error) = apply_admission_decision(
1847 &mut record,
1848 result.decision,
1849 &self.config.default_realm,
1850 ) {
1851 explanation.status = remember_status_for_admission(&decision);
1852 explanation.error = Some(error.to_string());
1853 return Err(crate::RememberFailure::new(error, explanation));
1854 }
1855 }
1856 }
1857
1858 if record.embedding.is_none() {
1861 if let Some(ref mc) = record.multi_content {
1862 match self.embed_content(mc).await {
1863 Ok(emb) => {
1864 record.embedding = Some(emb);
1865 explanation.embedding = crate::EmbeddingDisposition::Generated;
1866 }
1867 Err(e) => {
1868 tracing::warn!(error = %e, id = %record.id, "Embed failed, storing without embedding (provider fallback)");
1869 metrics::counter!(
1870 crate::metrics::PROVIDER_FALLBACK_TOTAL,
1871 "realm" => self.config.default_realm.clone(),
1872 "provider_type" => "embed"
1873 )
1874 .increment(1);
1875 self.write_runtime().enqueue_pending_embed(record.id);
1876 explanation.embedding = crate::EmbeddingDisposition::PendingRetry;
1877 }
1878 }
1879 }
1880 }
1881
1882 let (is_fast_path, rpe_max_similarity) = if self.config.rpe_enabled {
1884 if let Some(ref emb) = record.embedding {
1885 let key = self
1886 .write_runtime()
1887 .rpe_partition_key(record.namespace, &self.rpe_model_id(), hirn_core::types::Layer::Episodic);
1888 let mut stats_snapshot = self.write_runtime().snapshot_rpe_stats(&key);
1889 let rpe = super::write_path::compute_rpe(
1890 self.storage_backend(),
1891 emb,
1892 self.config.rpe_fast_path_threshold,
1893 self.config.rpe_similarity_search_limit,
1894 &mut stats_snapshot,
1895 &self.write_runtime().rpe_circuit_breaker_for(&key),
1896 )
1897 .await;
1898 self.write_runtime()
1899 .record_rpe_distance(&key, f64::from(1.0 - rpe.max_similarity));
1900 self.write_runtime().record_rpe_routing_metric(
1901 &key,
1902 &rpe,
1903 self.config.rpe_fast_path_threshold,
1904 );
1905 tracing::debug!(
1906 rpe_score = rpe.score,
1907 max_similarity = rpe.max_similarity,
1908 fast_path = rpe.is_fast_path,
1909 "RPE admission score"
1910 );
1911 explanation.rpe = Some(crate::RpeExplanation {
1912 enabled: true,
1913 score: Some(rpe.score),
1914 max_similarity: Some(rpe.max_similarity),
1915 threshold: self.config.rpe_fast_path_threshold,
1916 is_fast_path: rpe.is_fast_path,
1917 });
1918 if rpe.is_fast_path {
1919 record.importance = 0.3 + 0.2 * rpe.score;
1920 tracing::info!(id = %record.id, rpe_score = rpe.score, "RPE fast-path: skipping LLM analysis");
1921 }
1922 (rpe.is_fast_path, Some(rpe.max_similarity))
1923 } else {
1924 explanation.rpe = Some(crate::RpeExplanation {
1925 enabled: true,
1926 score: None,
1927 max_similarity: None,
1928 threshold: self.config.rpe_fast_path_threshold,
1929 is_fast_path: false,
1930 });
1931 (false, None)
1932 }
1933 } else {
1934 (false, None)
1935 };
1936
1937 if let Some(ref emb) = record.embedding {
1938 if emb.len() != self.config.embedding_dimensions.as_usize() {
1939 let error = HirnError::InvalidInput(format!(
1940 "embedding dimension mismatch: expected {}, got {}",
1941 self.config.embedding_dimensions.as_usize(),
1942 emb.len()
1943 ));
1944 explanation.status = crate::RememberStatus::Failed;
1945 explanation.error = Some(error.to_string());
1946 return Err(crate::RememberFailure::new(error, explanation));
1947 }
1948 }
1949
1950 let content_for_write_path = if !is_fast_path {
1951 Some(record.content.clone())
1952 } else {
1953 None
1954 };
1955
1956 match self.config.text_retention {
1957 hirn_core::TextRetention::Full => {}
1958 hirn_core::TextRetention::SummaryOnly => {
1959 record.content = String::new();
1960 }
1961 hirn_core::TextRetention::None => {
1962 record.content = String::new();
1963 record.summary = String::new();
1964 }
1965 }
1966
1967 let id = record.id;
1968 let importance = record.importance;
1969 let timestamp = record.timestamp;
1970 let namespace = record.namespace;
1971 let content_preview = record.content.chars().take(120).collect::<String>();
1972 let entities: Vec<String> = record.entities.iter().map(|e| e.name.clone()).collect();
1973
1974 if let Some(ref mc) = record.multi_content {
1975 let extracted = self
1976 .extract_and_store_resources(namespace, record.provenance.created_by, mc)
1977 .await
1978 .map_err(|error| {
1979 explanation.status = crate::RememberStatus::Failed;
1980 explanation.error = Some(error.to_string());
1981 crate::RememberFailure::new(error, explanation.clone())
1982 })?;
1983 record.multi_content = Some(extracted.content);
1984 record
1985 .provenance
1986 .evidence_links
1987 .extend(extracted.evidence_links);
1988 explanation.resources_extracted = true;
1989 }
1990
1991 let edge_requests = self
1992 .plan_auto_episode_edge_requests(
1993 id,
1994 record.namespace,
1995 record.embedding.as_deref(),
1996 &record.content,
1997 &entities,
1998 None,
1999 None,
2000 None,
2001 )
2002 .await
2003 .map_err(|error| {
2004 explanation.status = crate::RememberStatus::Failed;
2005 explanation.error = Some(error.to_string());
2006 crate::RememberFailure::new(error, explanation.clone())
2007 })?;
2008 let episode_envelope =
2009 build_episode_remember_envelope(&record, &content_preview, &edge_requests).map_err(
2010 |error| {
2011 explanation.status = crate::RememberStatus::Failed;
2012 explanation.error = Some(error.to_string());
2013 crate::RememberFailure::new(error, explanation.clone())
2014 },
2015 )?;
2016 hirn_storage::append_mutation_envelope(self.storage_backend(), &episode_envelope)
2017 .await
2018 .map_err(HirnError::storage)
2019 .map_err(|error| {
2020 explanation.status = crate::RememberStatus::Failed;
2021 explanation.error = Some(error.to_string());
2022 crate::RememberFailure::new(error, explanation.clone())
2023 })?;
2024
2025 self.cached_graph()
2026 .add_node(id, Layer::Episodic, importance, timestamp, namespace)
2027 .await
2028 .map_err(|error| {
2029 explanation.status = crate::RememberStatus::Failed;
2030 explanation.error = Some(error.to_string());
2031 crate::RememberFailure::new(error, explanation.clone())
2032 })?;
2033
2034 if let Err(e) = self
2035 .apply_episode_edge_requests(
2036 record.namespace,
2037 record.provenance.created_by,
2038 &edge_requests,
2039 )
2040 .await
2041 {
2042 let _ = self.cached_graph().remove_node(id).await;
2043 explanation.status = crate::RememberStatus::Failed;
2044 explanation.error = Some(e.to_string());
2045 return Err(crate::RememberFailure::new(e, explanation));
2046 }
2047
2048 {
2049 let dims = self.config.embedding_dimensions.as_usize();
2050 let batch =
2051 hirn_storage::datasets::episodic::to_batch(std::slice::from_ref(&record), dims)
2052 .map_err(|e| HirnError::storage(e))
2053 .map_err(|error| {
2054 explanation.status = crate::RememberStatus::Failed;
2055 explanation.error = Some(error.to_string());
2056 crate::RememberFailure::new(error, explanation.clone())
2057 })?;
2058 self.storage_runtime
2059 .append(hirn_storage::datasets::episodic::DATASET_NAME, batch)
2060 .await
2061 .map_err(|e| HirnError::storage(e))
2062 .map_err(|error| {
2063 explanation.status = crate::RememberStatus::Failed;
2064 explanation.error = Some(error.to_string());
2065 crate::RememberFailure::new(error, explanation.clone())
2066 })?;
2067 }
2068
2069 let arrival = self.write_runtime().record_arrival(namespace, id);
2070 explanation.arrival_sequence = u64::try_from(arrival.sequence).ok();
2071 if let Some(prev_id) = arrival.previous_id {
2072 let _ = self
2073 .cached_graph()
2074 .add_edge(
2075 prev_id,
2076 id,
2077 EdgeRelation::TemporalNext,
2078 1.0,
2079 temporal_next_metadata(&arrival),
2080 )
2081 .await;
2082 }
2083
2084 if let Some(ref content) = content_for_write_path {
2085 if self.config.prospective_indexing_enabled {
2086 if let Some(embedder) = self.provider_runtime().embedder() {
2087 let count = super::write_path::store_prospective_implications(
2088 self.storage_backend(),
2089 &*embedder,
2090 id,
2091 content,
2092 self.config.prospective_indexing_num_questions,
2093 self.config.prospective_indexing_timeout_secs,
2094 &self.config.prospective_indexing_templates,
2095 namespace.as_str(),
2096 )
2097 .await;
2098 explanation.prospective_indexing =
2099 crate::WritePathOperationExplanation::applied(count);
2100 if count > 0 {
2101 tracing::debug!(id = %id, count, "Prospective implications stored");
2102 }
2103 } else {
2104 explanation.prospective_indexing =
2105 crate::WritePathOperationExplanation::unavailable();
2106 }
2107 } else {
2108 explanation.prospective_indexing = crate::WritePathOperationExplanation::disabled();
2109 }
2110
2111 if self.config.svo_extraction_enabled {
2112 let count = super::write_path::extract_and_store_svo_events(
2113 self.storage_backend(),
2114 id,
2115 content,
2116 self.config.svo_confidence_threshold,
2117 namespace.as_str(),
2118 self.config.embedding_dimensions.as_usize(),
2119 )
2120 .await;
2121 explanation.svo_extraction = crate::WritePathOperationExplanation::applied(count);
2122 if count > 0 {
2123 tracing::debug!(id = %id, count, "SVO events extracted and stored");
2124 }
2125 } else {
2126 explanation.svo_extraction = crate::WritePathOperationExplanation::disabled();
2127 }
2128 } else {
2129 explanation.prospective_indexing =
2130 crate::WritePathOperationExplanation::skipped_fast_path();
2131 explanation.svo_extraction = crate::WritePathOperationExplanation::skipped_fast_path();
2132 }
2133
2134 if let Some(max_sim) = rpe_max_similarity {
2135 let interference = super::write_path::interference_score_from_similarity(max_sim);
2136 let disposition = if interference > 0.0 {
2137 let action = self.write_runtime().accumulate_interference(
2138 interference,
2139 namespace,
2140 self.config.interference_consolidation_threshold,
2141 self.config.interference_consolidation_cooldown_secs,
2142 );
2143 match action {
2144 super::write_path::InterferenceAction::TriggerConsolidation {
2145 namespaces,
2146 backlog_score,
2147 cause,
2148 } => {
2149 tracing::info!(
2150 namespace_count = namespaces.len(),
2151 backlog_score,
2152 cause = cause.as_str(),
2153 "Interference threshold exceeded, consolidation requested"
2154 );
2155 crate::InterferenceDisposition::TriggerConsolidation {
2156 namespaces,
2157 backlog_score,
2158 cause: cause.as_str(),
2159 }
2160 }
2161 super::write_path::InterferenceAction::Suppressed {
2162 reason,
2163 backlog_score,
2164 } => {
2165 tracing::debug!(
2166 reason = reason.as_str(),
2167 backlog_score,
2168 "Interference request suppressed"
2169 );
2170 crate::InterferenceDisposition::Suppressed {
2171 reason: reason.as_str(),
2172 backlog_score,
2173 }
2174 }
2175 super::write_path::InterferenceAction::None => {
2176 crate::InterferenceDisposition::None
2177 }
2178 }
2179 } else {
2180 crate::InterferenceDisposition::None
2181 };
2182 explanation.interference = Some(crate::InterferenceExplanation {
2183 score: interference,
2184 disposition,
2185 });
2186 }
2187
2188 self.emit_scoped_checked(
2189 namespace.as_str(),
2190 record.provenance.created_by.as_str(),
2191 MemoryEvent::EpisodeCreated {
2192 id,
2193 content_preview,
2194 },
2195 )
2196 .await
2197 .map_err(|error| {
2198 explanation.status = crate::RememberStatus::Failed;
2199 explanation.error = Some(error.to_string());
2200 crate::RememberFailure::new(error, explanation.clone())
2201 })?;
2202 if let Err(error) = hirn_storage::update_mutation_envelope_state(
2203 self.storage_backend(),
2204 &episode_envelope.id,
2205 hirn_storage::MutationEnvelopeState::Applied,
2206 None,
2207 )
2208 .await
2209 {
2210 tracing::warn!(
2211 id = %id,
2212 error = %error,
2213 "episode mutation envelope finalize failed; recovery will retry cleanup"
2214 );
2215 }
2216 explanation.status = crate::RememberStatus::Accepted;
2217 explanation.memory_id = Some(id);
2218
2219 match self.config.evolution_mode {
2223 hirn_core::EvolutionMode::Async { .. } => {
2224 let target = hirn_core::offline::OfflineJobTarget {
2225 namespace: Some(namespace),
2226 memory_ids: vec![id],
2227 ..Default::default()
2228 };
2229 let job = hirn_core::CognitiveJob::new(hirn_core::CognitiveJobKind::Evolve, target);
2230 if let Err(e) = self.offline_scheduler_runtime().submit_job(job).await {
2231 tracing::warn!(id = %id, error = %e, "backward evolution job enqueue failed; skipping");
2232 }
2233 }
2234 hirn_core::EvolutionMode::Synchronous { max_neighbors } => {
2235 let evo_config = crate::consolidation::EvolutionConfig {
2236 evolution_top_k: max_neighbors,
2237 ..Default::default()
2238 };
2239 if let Err(e) = crate::consolidation::evolve_on_new_memory(
2240 self,
2241 &record,
2242 &evo_config,
2243 )
2244 .await
2245 {
2246 tracing::warn!(id = %id, error = %e, "synchronous backward evolution failed; skipping");
2247 }
2248 }
2249 hirn_core::EvolutionMode::None => {}
2250 }
2251
2252 Ok((id, explanation))
2253 }
2254
2255 pub(crate) async fn reconcile_pending_episode_mutations(&self) -> HirnResult<usize> {
2256 let envelopes = hirn_storage::list_pending_mutation_envelopes(
2257 self.storage_backend(),
2258 Some(EPISODE_REMEMBER_MUTATION_KIND),
2259 )
2260 .await
2261 .map_err(HirnError::storage)?;
2262 let mut reconciled = 0usize;
2263
2264 for envelope in envelopes {
2265 match self
2266 .reconcile_single_pending_episode_mutation(&envelope)
2267 .await
2268 {
2269 Ok(true) => reconciled += 1,
2270 Ok(false) => {}
2271 Err(error) => {
2272 hirn_storage::update_mutation_envelope_state(
2273 self.storage_backend(),
2274 &envelope.id,
2275 hirn_storage::MutationEnvelopeState::Failed,
2276 Some(error.to_string()),
2277 )
2278 .await
2279 .map_err(HirnError::storage)?;
2280 }
2281 }
2282 }
2283
2284 Ok(reconciled)
2285 }
2286
2287 async fn reconcile_single_pending_episode_mutation(
2288 &self,
2289 envelope: &hirn_storage::MutationEnvelopeRecord,
2290 ) -> HirnResult<bool> {
2291 let payload = decode_episode_remember_envelope(envelope)?;
2292
2293 match self.read_episodic_record(payload.record_id).await {
2294 Ok(_record) => {
2295 if !self.cached_graph().has_node(payload.record_id).await? {
2296 self.cached_graph()
2297 .add_node(
2298 payload.record_id,
2299 Layer::Episodic,
2300 payload.importance,
2301 Timestamp::from_millis(payload.timestamp_ms),
2302 payload.namespace,
2303 )
2304 .await?;
2305 }
2306
2307 self.apply_episode_edge_requests(
2308 payload.namespace,
2309 payload.agent_id,
2310 &payload.edge_requests,
2311 )
2312 .await?;
2313 if let Some(temporal_edge_request) = payload.temporal_edge_request.as_ref() {
2314 self.apply_episode_edge_requests(
2315 payload.namespace,
2316 payload.agent_id,
2317 std::slice::from_ref(temporal_edge_request),
2318 )
2319 .await?;
2320 }
2321
2322 if !self.episode_created_event_logged(&payload).await? {
2323 self.emit_scoped_checked(
2324 payload.namespace.as_str(),
2325 payload.agent_id.as_str(),
2326 MemoryEvent::EpisodeCreated {
2327 id: payload.record_id,
2328 content_preview: payload.content_preview.clone(),
2329 },
2330 )
2331 .await?;
2332 }
2333
2334 hirn_storage::update_mutation_envelope_state(
2335 self.storage_backend(),
2336 &envelope.id,
2337 hirn_storage::MutationEnvelopeState::Applied,
2338 None,
2339 )
2340 .await
2341 .map_err(HirnError::storage)?;
2342 Ok(true)
2343 }
2344 Err(HirnError::NotFound(_)) => {
2345 if self.cached_graph().has_node(payload.record_id).await? {
2346 let _ = self.cached_graph().remove_node(payload.record_id).await;
2347 }
2348 hirn_storage::update_mutation_envelope_state(
2349 self.storage_backend(),
2350 &envelope.id,
2351 hirn_storage::MutationEnvelopeState::Failed,
2352 Some(format!(
2353 "episode record missing during recovery: {}",
2354 payload.record_id
2355 )),
2356 )
2357 .await
2358 .map_err(HirnError::storage)?;
2359 Ok(true)
2360 }
2361 Err(error) => Err(error),
2362 }
2363 }
2364
2365 async fn episode_created_event_logged(
2366 &self,
2367 payload: &EpisodeRememberEnvelope,
2368 ) -> HirnResult<bool> {
2369 let Some(event_log) = self.event_log() else {
2370 return Ok(false);
2371 };
2372
2373 let events = event_log
2374 .read_with_filter(&crate::event_log::EventFilter {
2375 realm: Some(self.config.default_realm.clone()),
2376 namespace: Some(payload.namespace.as_str().to_string()),
2377 event_type: Some("episode_created".to_string()),
2378 agent_id: Some(payload.agent_id.as_str().to_string()),
2379 after_us: None,
2380 before_us: None,
2381 })
2382 .await?;
2383
2384 Ok(events.into_iter().any(|env| {
2385 matches!(
2386 env.event,
2387 MemoryEvent::EpisodeCreated { id, .. } if id == payload.record_id
2388 )
2389 }))
2390 }
2391
2392 pub(crate) async fn hydrate_temporal_arrival_cursors(&self) -> HirnResult<()> {
2393 let mut latest_by_namespace: HashMap<Namespace, (MemoryId, i64)> = HashMap::new();
2394
2395 for edge in self.cached_graph().all_edges().await? {
2396 if edge.relation != EdgeRelation::TemporalNext {
2397 continue;
2398 }
2399 let Some(sequence) = target_arrival_sequence(&edge.metadata) else {
2400 continue;
2401 };
2402 latest_by_namespace
2403 .entry(edge.namespace)
2404 .and_modify(|current| {
2405 if sequence > current.1 || (sequence == current.1 && edge.target > current.0) {
2406 *current = (edge.target, sequence);
2407 }
2408 })
2409 .or_insert((edge.target, sequence));
2410 }
2411
2412 let mut seed_records_by_namespace: HashMap<Namespace, Vec<EpisodicRecord>> = HashMap::new();
2413 for record in self.list_episode_arrival_seed_records().await? {
2414 seed_records_by_namespace
2415 .entry(record.namespace)
2416 .or_default()
2417 .push(record);
2418 }
2419
2420 for (namespace, mut records) in seed_records_by_namespace {
2421 latest_by_namespace.entry(namespace).or_insert_with(|| {
2422 records.sort_by(|left, right| {
2423 left.created_at
2424 .cmp(&right.created_at)
2425 .then_with(|| left.revision_id.cmp(&right.revision_id))
2426 });
2427 let last = records
2428 .last()
2429 .expect("namespace seed records should be non-empty");
2430 (last.id, i64::try_from(records.len()).unwrap_or(i64::MAX))
2431 });
2432 }
2433
2434 for (namespace, (id, sequence)) in latest_by_namespace {
2435 self.write_runtime().seed_arrival(namespace, id, sequence);
2436 }
2437
2438 Ok(())
2439 }
2440
2441 async fn list_episode_arrival_seed_records(&self) -> HirnResult<Vec<EpisodicRecord>> {
2442 let mut batches = self
2443 .storage_runtime
2444 .scan_stream(
2445 hirn_storage::datasets::episodic::DATASET_NAME,
2446 hirn_storage::store::ScanOptions {
2447 filter: Some("version = 1".to_string()),
2448 ..Default::default()
2449 },
2450 )
2451 .await
2452 .map_err(HirnError::storage)?;
2453 let mut records = Vec::new();
2454
2455 while let Some(batch) = batches.try_next().await.map_err(HirnError::storage)? {
2456 records.extend(
2457 hirn_storage::datasets::episodic::from_batch(&batch).map_err(HirnError::storage)?,
2458 );
2459 }
2460
2461 Ok(records)
2462 }
2463
2464 pub(crate) async fn get_episode(&self, id: MemoryId) -> HirnResult<EpisodicRecord> {
2466 self.read_episodic_record(id).await
2467 }
2468
2469 async fn apply_episode_access_counts(
2470 &self,
2471 counts: HashMap<MemoryId, usize>,
2472 ) -> HirnResult<()> {
2473 if counts.is_empty() {
2474 return Ok(());
2475 }
2476
2477 let unique_ids: Vec<MemoryId> = counts.keys().copied().collect();
2478 let records = self.read_episodic_records_batch(&unique_ids).await?;
2479 let mut updated = Vec::with_capacity(unique_ids.len());
2480
2481 for id in unique_ids {
2482 let Some(mut record) = records.get(&id).cloned() else {
2483 continue;
2484 };
2485 for _ in 0..counts[&id] {
2486 record.record_access();
2487 }
2488 updated.push(record);
2489 }
2490
2491 if updated.is_empty() {
2492 return Ok(());
2493 }
2494
2495 const UPDATE_CHUNK: usize = 256;
2496 for chunk in updated.chunks(UPDATE_CHUNK) {
2497 let in_list = chunk
2498 .iter()
2499 .map(|record| format!("'{}'", record.id.to_string().replace('\'', "''")))
2500 .collect::<Vec<_>>()
2501 .join(", ");
2502 self.storage_runtime
2503 .delete(
2504 hirn_storage::datasets::episodic::DATASET_NAME,
2505 &format!("id IN ({in_list})"),
2506 )
2507 .await
2508 .map_err(HirnError::storage)?;
2509 }
2510
2511 let dims = self.config.embedding_dimensions.as_usize();
2512 let batch = hirn_storage::datasets::episodic::to_batch(&updated, dims)
2513 .map_err(HirnError::storage)?;
2514 self.storage_runtime
2515 .append(hirn_storage::datasets::episodic::DATASET_NAME, batch)
2516 .await
2517 .map_err(HirnError::storage)?;
2518
2519 Ok(())
2520 }
2521
2522 pub(crate) async fn flush_episodic_access(&self) -> HirnResult<()> {
2523 self.apply_episode_access_counts(self.graph_runtime().drain_episodic_access())
2524 .await
2525 }
2526
2527 pub(crate) fn buffer_episode_access(&self, id: MemoryId) {
2528 self.graph_runtime().buffer_episodic_access(id);
2529 }
2530
2531 pub async fn record_episode_access(&self, id: MemoryId) -> HirnResult<()> {
2534 self.apply_episode_access_counts(HashMap::from([(id, 1)]))
2535 .await
2536 }
2537
2538 pub(crate) async fn list_episodes(
2541 &self,
2542 filter: &EpisodicFilter,
2543 ) -> HirnResult<Vec<EpisodicRecord>> {
2544 let now = Timestamp::now();
2545 let requested_limit = filter.limit.unwrap_or(usize::MAX);
2546 if requested_limit == 0 {
2547 return Ok(Vec::new());
2548 }
2549
2550 let mut batches = self
2551 .storage_runtime
2552 .scan_stream(
2553 hirn_storage::datasets::episodic::DATASET_NAME,
2554 hirn_storage::store::ScanOptions {
2555 filter: build_episodic_scan_filter(filter),
2556 order_by: Some(vec![
2557 hirn_storage::store::ScanOrdering::asc("timestamp_ms"),
2558 hirn_storage::store::ScanOrdering::asc("id"),
2559 ]),
2560 ..Default::default()
2561 },
2562 )
2563 .await
2564 .map_err(HirnError::storage)?;
2565
2566 let mut heads = HashMap::new();
2567
2568 while let Some(batch) = batches.try_next().await.map_err(HirnError::storage)? {
2569 let recs =
2570 hirn_storage::datasets::episodic::from_batch(&batch).map_err(HirnError::storage)?;
2571 for rec in recs {
2572 heads
2573 .entry(rec.logical_memory_id)
2574 .and_modify(|current| {
2575 if episodic_revision_is_newer(&rec, current) {
2576 *current = rec.clone();
2577 }
2578 })
2579 .or_insert(rec);
2580 }
2581 }
2582
2583 let mut results = heads
2584 .into_values()
2585 .filter(|rec| self.episode_matches_filter_at(rec, filter, now))
2586 .collect::<Vec<_>>();
2587 results.sort_by(|left, right| {
2588 left.timestamp
2589 .cmp(&right.timestamp)
2590 .then_with(|| left.id.cmp(&right.id))
2591 });
2592
2593 let offset = filter.offset.unwrap_or(0);
2594 if offset >= results.len() {
2595 return Ok(Vec::new());
2596 }
2597 let end = offset.saturating_add(requested_limit).min(results.len());
2598 results = results[offset..end].to_vec();
2599
2600 Ok(results)
2601 }
2602
2603 fn episode_matches_filter_at(
2604 &self,
2605 rec: &EpisodicRecord,
2606 filter: &EpisodicFilter,
2607 now: Timestamp,
2608 ) -> bool {
2609 if !filter.include_archived && rec.archived {
2610 return false;
2611 }
2612 if rec.is_expired(now) {
2614 return false;
2615 }
2616 if let Some(et) = &filter.event_type {
2617 if rec.event_type != *et {
2618 return false;
2619 }
2620 }
2621 if let Some(after) = &filter.after {
2622 if rec.timestamp <= *after {
2623 return false;
2624 }
2625 }
2626 if let Some(before) = &filter.before {
2627 if rec.timestamp >= *before {
2628 return false;
2629 }
2630 }
2631 if let Some(min_imp) = filter.min_importance {
2632 if rec.importance < min_imp {
2633 return false;
2634 }
2635 }
2636 if let Some(entity) = &filter.entity_name {
2637 if !rec.entities.iter().any(|e| e.name == *entity) {
2638 return false;
2639 }
2640 }
2641 if let Some(ns) = &filter.namespace {
2642 if rec.namespace != *ns {
2643 return false;
2644 }
2645 }
2646 true
2647 }
2648
2649 pub(crate) async fn delete_episode(&self, id: MemoryId) -> HirnResult<()> {
2651 let record = self.read_episodic_record(id).await?;
2652 let head = self
2653 .episodic_head_for_logical_id(record.logical_memory_id)
2654 .await?;
2655
2656 self.enforce(
2658 head.provenance.created_by.as_str(),
2659 crate::policy::Action::Forget,
2660 &self.config.default_realm,
2661 head.namespace.as_str(),
2662 )
2663 .await?;
2664
2665 let history = self.read_episodic_history(record.logical_memory_id).await?;
2666 for revision in &history {
2667 let _ = self.cached_graph().remove_node(revision.id).await;
2668 }
2669
2670 let exact_filter = Self::episodic_logical_exact_filter(record.logical_memory_id);
2672 self.storage_runtime
2673 .delete_exact(
2674 hirn_storage::datasets::episodic::DATASET_NAME,
2675 &exact_filter,
2676 )
2677 .await
2678 .map_err(|e| HirnError::storage(e))?;
2679
2680 self.emit_scoped(
2681 head.namespace.as_str(),
2682 head.provenance.created_by.as_str(),
2683 MemoryEvent::Forgotten { id },
2684 )
2685 .await;
2686 Ok(())
2687 }
2688
2689 pub(crate) async fn archive_episode(&self, id: MemoryId) -> HirnResult<()> {
2691 let record = self.episodic_edit_target(id).await?;
2692
2693 self.enforce(
2695 record.provenance.created_by.as_str(),
2696 crate::policy::Action::Forget,
2697 &self.config.default_realm,
2698 record.namespace.as_str(),
2699 )
2700 .await?;
2701
2702 let archived = self
2703 .append_episodic_successor(
2704 &record,
2705 RevisionOperation::Retract,
2706 Some("episodic record archived".to_string()),
2707 |next| {
2708 next.archived = true;
2709 },
2710 )
2711 .await?;
2712 self.emit_scoped(
2713 archived.namespace.as_str(),
2714 archived.provenance.created_by.as_str(),
2715 MemoryEvent::Archived { id: archived.id },
2716 )
2717 .await;
2718 Ok(())
2719 }
2720
2721 pub(crate) async fn decay_memories(&self) -> HirnResult<usize> {
2731 let decay_factor = self.config.memory_decay_factor;
2732 let half_life_hours = self.config.memory_half_life_hours;
2733 let min_importance = self.config.memory_min_importance;
2734 let now = Timestamp::now();
2735 let now_dt = now.as_datetime();
2736 let mut archived_count = 0;
2737
2738 for record in self.current_episodic_heads().await?.into_values() {
2739 if !record.is_live() || record.is_expired(now) {
2740 continue;
2741 }
2742
2743 let last_dt = record.last_accessed.as_datetime();
2744 let hours_elapsed = (now_dt - last_dt).num_seconds().max(0) as f64 / 3600.0;
2745
2746 let hours_since_creation = now_dt
2747 .signed_duration_since(record.timestamp.as_datetime())
2748 .num_hours() as f64;
2749 if hours_since_creation < 1.0 {
2750 continue;
2751 }
2752
2753 let exponent = hours_elapsed / half_life_hours as f64;
2754 let new_importance = record.importance * (decay_factor as f64).powf(exponent) as f32;
2755
2756 if new_importance < min_importance {
2757 let archived = self
2758 .append_episodic_successor(
2759 &record,
2760 RevisionOperation::Retract,
2761 Some("episodic importance decayed below archival threshold".to_string()),
2762 |next| {
2763 next.importance = new_importance;
2764 next.archived = true;
2765 },
2766 )
2767 .await?;
2768 archived_count += 1;
2769 self.emit_scoped(
2770 archived.namespace.as_str(),
2771 archived.provenance.created_by.as_str(),
2772 MemoryEvent::Archived { id: archived.id },
2773 )
2774 .await;
2775 } else if new_importance < record.importance * 0.999 {
2776 let _ = self
2777 .append_episodic_successor(
2778 &record,
2779 RevisionOperation::Correct,
2780 Some("episodic importance decayed".to_string()),
2781 |next| {
2782 next.importance = new_importance;
2783 },
2784 )
2785 .await?;
2786 }
2787 }
2788
2789 Ok(archived_count)
2790 }
2791
2792 pub(crate) async fn purge_expired(&self) -> HirnResult<usize> {
2796 let now = Timestamp::now();
2797 let expired_ids: Vec<MemoryId> = self
2798 .current_episodic_heads()
2799 .await?
2800 .into_values()
2801 .filter(|record| record.is_expired(now))
2802 .map(|record| record.id)
2803 .collect();
2804
2805 let count = expired_ids.len();
2807 for id in expired_ids {
2808 let _ = self.delete_episode(id).await;
2810 }
2811
2812 Ok(count)
2813 }
2814
2815 pub fn start_decay_task(
2821 self: &std::sync::Arc<Self>,
2822 interval: std::time::Duration,
2823 ) -> tokio::task::JoinHandle<()> {
2824 let db = std::sync::Arc::clone(self);
2825 tokio::spawn(async move {
2826 let mut ticker = tokio::time::interval(interval);
2827 ticker.tick().await; loop {
2829 ticker.tick().await;
2830 if let Err(e) = db.decay_memories().await {
2831 tracing::warn!(error = %e, "memory decay task failed");
2832 }
2833 if let Err(e) = db.purge_expired().await {
2834 tracing::warn!(error = %e, "TTL purge task failed");
2835 }
2836 }
2837 })
2838 }
2839}
2840
2841#[cfg(test)]
2842mod tests {
2843 use std::sync::Arc;
2844
2845 use super::*;
2846 use hirn_core::HirnConfig;
2847 use hirn_core::id::MemoryId;
2848 use hirn_core::revision::{LogicalMemoryId, RevisionId};
2849 use hirn_core::types::{AgentId, EdgeRelation, EventType};
2850 use hirn_storage::memory_store::MemoryStore;
2851
2852 fn agent() -> AgentId {
2853 AgentId::new("episodic-tests").unwrap()
2854 }
2855
2856 async fn temp_db() -> (HirnDB, tempfile::TempDir) {
2857 let dir = tempfile::tempdir().unwrap();
2858 let path = dir.path().join("episodic-tests");
2859 let config = HirnConfig::builder()
2860 .db_path(&path)
2861 .embedding_dimensions(4)
2862 .working_memory_token_limit(1000)
2863 .memory_decay_factor(0.5)
2864 .memory_half_life_hours(1)
2865 .memory_min_importance(0.05)
2866 .build()
2867 .unwrap();
2868 let db = HirnDB::open_with_config(config, Arc::new(MemoryStore::new()))
2869 .await
2870 .unwrap();
2871 (db, dir)
2872 }
2873
2874 fn episodic_record(
2875 id: MemoryId,
2876 logical_memory_id: LogicalMemoryId,
2877 created_at: Timestamp,
2878 version: u32,
2879 ) -> EpisodicRecord {
2880 let mut record = EpisodicRecord::builder()
2881 .event_type(EventType::Observation)
2882 .content("deployment note")
2883 .summary("deployment note")
2884 .importance(0.7)
2885 .agent_id(agent())
2886 .build()
2887 .unwrap();
2888 record.id = id;
2889 record.logical_memory_id = logical_memory_id;
2890 record.revision_id = RevisionId::from_memory_id(id);
2891 record.version = version;
2892 record.timestamp = created_at;
2893 record.created_at = created_at;
2894 record.updated_at = created_at;
2895 record.last_accessed = created_at;
2896 record
2897 }
2898
2899 #[test]
2900 fn revision_snapshot_preserves_exact_recorded_boundary_when_timestamps_tie() {
2901 let created_at = Timestamp::from_millis(1_700_000_000_000);
2902 let original_id = MemoryId::parse("01ARZ3NDEKTSV4RRFFQ69G5FAW").unwrap();
2903 let successor_id = MemoryId::parse("01ARZ3NDEKTSV4RRFFQ69G5FAV").unwrap();
2904 let logical_memory_id = LogicalMemoryId::from_memory_id(original_id);
2905
2906 let original = episodic_record(original_id, logical_memory_id, created_at, 1);
2907 let mut successor = episodic_record(successor_id, logical_memory_id, created_at, 2);
2908 successor.revision_operation = RevisionOperation::Correct;
2909 successor.revision_reason = Some("content refined".to_string());
2910 successor.revision_causation_id = Some(original.id);
2911
2912 let revision = episodic_snapshot_head_recorded_at_snapshot(
2913 &[original.clone(), successor],
2914 super::super::semantic::ResolvedRecallSnapshot::Revision {
2915 cutoff: created_at,
2916 revision_id: original.revision_id,
2917 logical_memory_id,
2918 version: original.version,
2919 },
2920 )
2921 .unwrap();
2922
2923 assert_eq!(revision.id, original.id);
2924 assert_eq!(revision.revision_id, original.revision_id);
2925 assert_eq!(revision.version, 1);
2926 }
2927
2928 #[tokio::test(flavor = "multi_thread")]
2929 async fn decay_advances_only_the_active_revision_head() {
2930 let (db, _dir) = temp_db().await;
2931 let stale_timestamp =
2932 Timestamp::from_millis(Timestamp::now().millis() - (6 * 60 * 60 * 1000));
2933
2934 let id = db
2935 .remember(
2936 EpisodicRecord::builder()
2937 .event_type(EventType::Observation)
2938 .content("stale chain")
2939 .summary("stale chain")
2940 .importance(0.8)
2941 .timestamp(stale_timestamp)
2942 .agent_id(agent())
2943 .build()
2944 .unwrap(),
2945 )
2946 .await
2947 .unwrap();
2948
2949 let original = db.read_episodic_record(id).await.unwrap();
2950 let active = db
2951 .append_episodic_successor(
2952 &original,
2953 RevisionOperation::Correct,
2954 Some("prepare stale active head".to_string()),
2955 |next| {
2956 next.importance = 0.8;
2957 },
2958 )
2959 .await
2960 .unwrap();
2961
2962 let mut stale_active = db.read_episodic_record(active.id).await.unwrap();
2963 stale_active.last_accessed = stale_timestamp;
2964 stale_active.updated_at = stale_timestamp;
2965 db.write_episodic_record(&stale_active).await.unwrap();
2966
2967 let archived = db.decay_memories().await.unwrap();
2968 assert_eq!(archived, 1);
2969
2970 let history = db
2971 .read_episodic_history(original.logical_memory_id)
2972 .await
2973 .unwrap();
2974 assert_eq!(history.len(), 3);
2975
2976 let original_after = db.read_episodic_record(id).await.unwrap();
2977 assert_eq!(original_after.version, 1);
2978 assert_eq!(original_after.importance, 0.8);
2979 assert!(original_after.superseded_by.is_none());
2980
2981 let head = db
2982 .episodic_head_for_logical_id(original.logical_memory_id)
2983 .await
2984 .unwrap();
2985 assert_eq!(head.version, 3);
2986 assert_eq!(head.revision_operation, RevisionOperation::Retract);
2987 assert!(head.archived);
2988 }
2989
2990 #[tokio::test(flavor = "multi_thread")]
2991 async fn batch_remember_primes_episodic_head_cache() {
2992 let (db, _dir) = temp_db().await;
2993
2994 let records = vec![
2995 EpisodicRecord::builder()
2996 .event_type(EventType::Observation)
2997 .content("first cached episode")
2998 .summary("first cached episode")
2999 .importance(0.7)
3000 .agent_id(agent())
3001 .build()
3002 .unwrap(),
3003 EpisodicRecord::builder()
3004 .event_type(EventType::Observation)
3005 .content("second cached episode")
3006 .summary("second cached episode")
3007 .importance(0.6)
3008 .agent_id(agent())
3009 .build()
3010 .unwrap(),
3011 ];
3012
3013 let logical_ids = records
3014 .iter()
3015 .map(|record| record.logical_memory_id)
3016 .collect::<Vec<_>>();
3017
3018 let results = db.batch_remember(records).await;
3019 assert!(results.iter().all(Result::is_ok));
3020
3021 for logical_memory_id in logical_ids {
3022 let cached = db.cached_episodic_head(logical_memory_id);
3023 assert!(
3024 cached.is_some(),
3025 "expected cached head for {logical_memory_id}"
3026 );
3027 }
3028 }
3029
3030 #[tokio::test(flavor = "multi_thread")]
3031 async fn open_reconciles_pending_episode_mutation_envelopes() {
3032 let dir = tempfile::tempdir().unwrap();
3033 let path = dir.path().join("episodic-envelope-recovery");
3034 let store = Arc::new(MemoryStore::new());
3035 let config = HirnConfig::builder()
3036 .db_path(&path)
3037 .embedding_dimensions(4)
3038 .working_memory_token_limit(1000)
3039 .build()
3040 .unwrap();
3041
3042 let db = HirnDB::open_with_config(config.clone(), store.clone())
3043 .await
3044 .unwrap();
3045 let record = episodic_record(MemoryId::new(), LogicalMemoryId::new(), Timestamp::now(), 1);
3046 let preview = record.content.chars().take(120).collect::<String>();
3047 let envelope = build_episode_remember_envelope(&record, &preview, &[]).unwrap();
3048
3049 let batch = hirn_storage::datasets::episodic::to_batch(
3050 std::slice::from_ref(&record),
3051 db.config.embedding_dimensions.as_usize(),
3052 )
3053 .unwrap();
3054 db.storage_runtime
3055 .append(hirn_storage::datasets::episodic::DATASET_NAME, batch)
3056 .await
3057 .unwrap();
3058 hirn_storage::append_mutation_envelope(store.as_ref(), &envelope)
3059 .await
3060 .unwrap();
3061
3062 assert!(!db.cached_graph().has_node(record.id).await.unwrap());
3063 drop(db);
3064
3065 let reopened = HirnDB::open_with_config(config, store.clone())
3066 .await
3067 .unwrap();
3068
3069 assert!(reopened.cached_graph().has_node(record.id).await.unwrap());
3070
3071 let events = reopened.event_log().unwrap().read_all().await.unwrap();
3072 assert!(events.into_iter().any(|env| {
3073 matches!(env.event, MemoryEvent::EpisodeCreated { id, .. } if id == record.id)
3074 }));
3075
3076 let stored_envelope = hirn_storage::get_mutation_envelope(store.as_ref(), &envelope.id)
3077 .await
3078 .unwrap()
3079 .unwrap();
3080 assert_eq!(
3081 stored_envelope.state,
3082 hirn_storage::MutationEnvelopeState::Applied
3083 );
3084 }
3085
3086 #[tokio::test(flavor = "multi_thread")]
3087 async fn open_reconciles_pending_episode_mutation_temporal_next_edges() {
3088 let dir = tempfile::tempdir().unwrap();
3089 let path = dir.path().join("episodic-envelope-temporal-recovery");
3090 let store = Arc::new(MemoryStore::new());
3091 let config = HirnConfig::builder()
3092 .db_path(&path)
3093 .embedding_dimensions(4)
3094 .working_memory_token_limit(1000)
3095 .build()
3096 .unwrap();
3097
3098 let db = HirnDB::open_with_config(config.clone(), store.clone())
3099 .await
3100 .unwrap();
3101 let previous_id = db
3102 .remember(
3103 EpisodicRecord::builder()
3104 .event_type(EventType::Observation)
3105 .content("first recovery edge")
3106 .summary("first recovery edge")
3107 .importance(0.7)
3108 .agent_id(agent())
3109 .build()
3110 .unwrap(),
3111 )
3112 .await
3113 .unwrap();
3114 let record = episodic_record(MemoryId::new(), LogicalMemoryId::new(), Timestamp::now(), 1);
3115 let preview = record.content.chars().take(120).collect::<String>();
3116 let mut envelope = build_episode_remember_envelope(&record, &preview, &[]).unwrap();
3117 let arrival = super::write_runtime::TemporalArrival {
3118 previous_id: Some(previous_id),
3119 previous_sequence: Some(1),
3120 sequence: 2,
3121 };
3122 update_episode_envelope_temporal_edge(
3123 &mut envelope,
3124 temporal_edge_request_for_arrival(record.id, &arrival),
3125 )
3126 .unwrap();
3127
3128 let batch = hirn_storage::datasets::episodic::to_batch(
3129 std::slice::from_ref(&record),
3130 db.config.embedding_dimensions.as_usize(),
3131 )
3132 .unwrap();
3133 db.storage_runtime
3134 .append(hirn_storage::datasets::episodic::DATASET_NAME, batch)
3135 .await
3136 .unwrap();
3137 hirn_storage::append_mutation_envelope(store.as_ref(), &envelope)
3138 .await
3139 .unwrap();
3140
3141 drop(db);
3142
3143 let reopened = HirnDB::open_with_config(config, store.clone())
3144 .await
3145 .unwrap();
3146
3147 let temporal_edges = reopened
3148 .cached_graph()
3149 .get_edges_between(previous_id, record.id)
3150 .await
3151 .unwrap();
3152 let edge = temporal_edges
3153 .into_iter()
3154 .find(|edge| edge.relation == EdgeRelation::TemporalNext)
3155 .expect("pending episode recovery should restore TemporalNext edge");
3156 assert_eq!(target_arrival_sequence(&edge.metadata), Some(2));
3157
3158 let stored_envelope = hirn_storage::get_mutation_envelope(store.as_ref(), &envelope.id)
3159 .await
3160 .unwrap()
3161 .unwrap();
3162 assert_eq!(
3163 stored_envelope.state,
3164 hirn_storage::MutationEnvelopeState::Applied
3165 );
3166 }
3167
3168 #[tokio::test(flavor = "multi_thread")]
3169 async fn restart_hydrates_temporal_arrival_cursor_for_next_remember() {
3170 let dir = tempfile::tempdir().unwrap();
3171 let path = dir.path().join("episodic-temporal-restart");
3172 let store = Arc::new(MemoryStore::new());
3173 let config = HirnConfig::builder()
3174 .db_path(&path)
3175 .embedding_dimensions(4)
3176 .working_memory_token_limit(1000)
3177 .build()
3178 .unwrap();
3179
3180 let first_id = {
3181 let db = HirnDB::open_with_config(config.clone(), store.clone())
3182 .await
3183 .unwrap();
3184 let first_id = db
3185 .remember(
3186 EpisodicRecord::builder()
3187 .event_type(EventType::Observation)
3188 .content("first after boot")
3189 .summary("first after boot")
3190 .importance(0.7)
3191 .agent_id(agent())
3192 .build()
3193 .unwrap(),
3194 )
3195 .await
3196 .unwrap();
3197 drop(db);
3198 first_id
3199 };
3200
3201 let reopened = HirnDB::open_with_config(config, store).await.unwrap();
3202 let second_id = reopened
3203 .remember(
3204 EpisodicRecord::builder()
3205 .event_type(EventType::Observation)
3206 .content("second after restart")
3207 .summary("second after restart")
3208 .importance(0.8)
3209 .agent_id(agent())
3210 .build()
3211 .unwrap(),
3212 )
3213 .await
3214 .unwrap();
3215
3216 let temporal_edges = reopened
3217 .cached_graph()
3218 .get_edges_of_type(second_id, EdgeRelation::TemporalNext)
3219 .await
3220 .unwrap();
3221 let edge = temporal_edges
3222 .into_iter()
3223 .find(|edge| edge.source == first_id && edge.target == second_id)
3224 .expect("restarted remember should continue the TemporalNext chain");
3225 assert_eq!(target_arrival_sequence(&edge.metadata), Some(2));
3226 }
3227
3228 #[tokio::test(flavor = "multi_thread")]
3229 async fn purge_expired_only_considers_the_active_revision_head() {
3230 let (db, _dir) = temp_db().await;
3231 let now = Timestamp::now();
3232 let expired_timestamp = Timestamp::from_millis(now.millis().saturating_sub(1_000));
3233 let future_timestamp = Timestamp::from_millis(now.millis() + (60 * 60 * 1000));
3234
3235 let id = db
3236 .remember(
3237 EpisodicRecord::builder()
3238 .event_type(EventType::Observation)
3239 .content("ttl chain")
3240 .summary("ttl chain")
3241 .expires_at(expired_timestamp)
3242 .agent_id(agent())
3243 .build()
3244 .unwrap(),
3245 )
3246 .await
3247 .unwrap();
3248
3249 let original = db.read_episodic_record(id).await.unwrap();
3250 let active = db
3251 .append_episodic_successor(
3252 &original,
3253 RevisionOperation::Correct,
3254 Some("extend ttl on active head".to_string()),
3255 |next| {
3256 next.expires_at = Some(future_timestamp);
3257 },
3258 )
3259 .await
3260 .unwrap();
3261
3262 let purged = db.purge_expired().await.unwrap();
3263 assert_eq!(purged, 0);
3264
3265 let history = db
3266 .read_episodic_history(original.logical_memory_id)
3267 .await
3268 .unwrap();
3269 assert_eq!(history.len(), 2);
3270 assert!(history.iter().any(|record| record.id == id));
3271 assert!(history.iter().any(|record| record.id == active.id));
3272 }
3273
3274 #[tokio::test(flavor = "multi_thread")]
3275 async fn archive_canonicalizes_stale_revision_ids_to_the_live_head() {
3276 let (db, _dir) = temp_db().await;
3277
3278 let id = db
3279 .remember(
3280 EpisodicRecord::builder()
3281 .event_type(EventType::Observation)
3282 .content("draft note")
3283 .summary("draft note")
3284 .importance(0.7)
3285 .agent_id(agent())
3286 .build()
3287 .unwrap(),
3288 )
3289 .await
3290 .unwrap();
3291
3292 let original = db.read_episodic_record(id).await.unwrap();
3293 let current = db
3294 .append_episodic_successor(
3295 &original,
3296 RevisionOperation::Correct,
3297 Some("refresh content".to_string()),
3298 |next| {
3299 next.content = "fresh note".to_string();
3300 next.summary = "fresh note".to_string();
3301 },
3302 )
3303 .await
3304 .unwrap();
3305
3306 db.archive_episode(id).await.unwrap();
3307
3308 let history = db
3309 .read_episodic_history(original.logical_memory_id)
3310 .await
3311 .unwrap();
3312 assert_eq!(history.len(), 3);
3313
3314 let original_after = db.read_episodic_record(id).await.unwrap();
3315 assert_eq!(original_after.version, 1);
3316 assert!(!original_after.archived);
3317
3318 let current_after = db.read_episodic_record(current.id).await.unwrap();
3319 assert_eq!(current_after.version, 2);
3320 assert!(!current_after.archived);
3321
3322 let archived = db
3323 .episodic_head_for_logical_id(original.logical_memory_id)
3324 .await
3325 .unwrap();
3326 assert_eq!(archived.version, 3);
3327 assert!(archived.archived);
3328 assert_eq!(archived.revision_operation, RevisionOperation::Retract);
3329 }
3330
3331 #[tokio::test(flavor = "multi_thread")]
3332 async fn batch_remember_preserves_auto_edges_against_existing_records() {
3333 let (db, _dir) = temp_db().await;
3334 let embedding = vec![1.0, 0.0, 0.0, 0.0];
3335
3336 let existing_id = db
3337 .remember(
3338 EpisodicRecord::builder()
3339 .event_type(EventType::Observation)
3340 .content("deploy service alpha")
3341 .summary("deploy service alpha")
3342 .embedding(embedding.clone())
3343 .entity("deploy", "topic")
3344 .entity("service", "topic")
3345 .agent_id(agent())
3346 .build()
3347 .unwrap(),
3348 )
3349 .await
3350 .unwrap();
3351
3352 let new_id = db
3353 .batch_remember(vec![
3354 EpisodicRecord::builder()
3355 .event_type(EventType::Observation)
3356 .content("deploy service beta")
3357 .summary("deploy service beta")
3358 .embedding(embedding)
3359 .entity("deploy", "topic")
3360 .entity("service", "topic")
3361 .agent_id(agent())
3362 .build()
3363 .unwrap(),
3364 ])
3365 .await
3366 .into_iter()
3367 .next()
3368 .unwrap()
3369 .unwrap();
3370
3371 let edges = db
3372 .cached_graph()
3373 .get_edges_between(existing_id, new_id)
3374 .await
3375 .unwrap();
3376
3377 assert!(
3378 edges
3379 .iter()
3380 .any(|edge| edge.relation == EdgeRelation::SimilarTo),
3381 "expected SimilarTo edge between existing and batched record"
3382 );
3383 assert!(
3384 edges
3385 .iter()
3386 .any(|edge| edge.relation == EdgeRelation::RelatedTo),
3387 "expected RelatedTo edge between existing and batched record"
3388 );
3389 }
3390
3391 #[tokio::test(flavor = "multi_thread")]
3392 async fn max_auto_edges_zero_disables_episode_auto_edges() {
3393 let dir = tempfile::tempdir().unwrap();
3394 let path = dir.path().join("episodic-tests-no-auto-edges");
3395 let config = HirnConfig::builder()
3396 .db_path(&path)
3397 .embedding_dimensions(4)
3398 .working_memory_token_limit(1000)
3399 .max_auto_edges_per_record(0)
3400 .entity_overlap_threshold(1)
3401 .build()
3402 .unwrap();
3403 let db = HirnDB::open_with_config(config, Arc::new(MemoryStore::new()))
3404 .await
3405 .unwrap();
3406
3407 let embedding = vec![1.0, 0.0, 0.0, 0.0];
3408 let existing_id = db
3409 .remember(
3410 EpisodicRecord::builder()
3411 .event_type(EventType::Observation)
3412 .content("deploy service alpha")
3413 .summary("deploy service alpha")
3414 .embedding(embedding.clone())
3415 .entity("deploy", "topic")
3416 .agent_id(agent())
3417 .build()
3418 .unwrap(),
3419 )
3420 .await
3421 .unwrap();
3422
3423 let new_id = db
3424 .remember(
3425 EpisodicRecord::builder()
3426 .event_type(EventType::Observation)
3427 .content("deploy service beta")
3428 .summary("deploy service beta")
3429 .embedding(embedding)
3430 .entity("deploy", "topic")
3431 .agent_id(agent())
3432 .build()
3433 .unwrap(),
3434 )
3435 .await
3436 .unwrap();
3437
3438 let edges = db
3439 .cached_graph()
3440 .get_edges_between(existing_id, new_id)
3441 .await
3442 .unwrap();
3443 assert!(
3444 !edges.iter().any(|edge| {
3445 matches!(
3446 edge.relation,
3447 EdgeRelation::SimilarTo
3448 | EdgeRelation::RelatedTo
3449 | EdgeRelation::ParticipatesIn
3450 | EdgeRelation::Contradicts
3451 )
3452 }),
3453 "expected no auto-edge relations when max_auto_edges_per_record is zero"
3454 );
3455 }
3456
3457 #[tokio::test(flavor = "multi_thread")]
3458 async fn batch_remember_entity_only_records_preserve_auto_edges_against_existing_records() {
3459 let (db, _dir) = temp_db().await;
3460
3461 let existing_id = db
3462 .remember(
3463 EpisodicRecord::builder()
3464 .event_type(EventType::Observation)
3465 .content("deploy service alpha")
3466 .summary("deploy service alpha")
3467 .entity("deploy", "topic")
3468 .entity("service", "topic")
3469 .agent_id(agent())
3470 .build()
3471 .unwrap(),
3472 )
3473 .await
3474 .unwrap();
3475
3476 let new_id = db
3477 .batch_remember(vec![
3478 EpisodicRecord::builder()
3479 .event_type(EventType::Observation)
3480 .content("deploy service beta")
3481 .summary("deploy service beta")
3482 .entity("deploy", "topic")
3483 .entity("service", "topic")
3484 .agent_id(agent())
3485 .build()
3486 .unwrap(),
3487 ])
3488 .await
3489 .into_iter()
3490 .next()
3491 .unwrap()
3492 .unwrap();
3493
3494 let edges = db
3495 .cached_graph()
3496 .get_edges_between(existing_id, new_id)
3497 .await
3498 .unwrap();
3499
3500 assert!(
3501 edges
3502 .iter()
3503 .any(|edge| edge.relation == EdgeRelation::RelatedTo),
3504 "expected RelatedTo edge between existing and batched entity-only record"
3505 );
3506 }
3507
3508 #[tokio::test(flavor = "multi_thread")]
3509 async fn batch_remember_multiple_embedded_records_preserve_auto_edges_against_existing_records()
3510 {
3511 let (db, _dir) = temp_db().await;
3512 let embedding = vec![1.0, 0.0, 0.0, 0.0];
3513
3514 let existing_id = db
3515 .remember(
3516 EpisodicRecord::builder()
3517 .event_type(EventType::Observation)
3518 .content("deploy service alpha")
3519 .summary("deploy service alpha")
3520 .embedding(embedding.clone())
3521 .entity("deploy", "topic")
3522 .entity("service", "topic")
3523 .agent_id(agent())
3524 .build()
3525 .unwrap(),
3526 )
3527 .await
3528 .unwrap();
3529
3530 let batched_ids = db
3531 .batch_remember(vec![
3532 EpisodicRecord::builder()
3533 .event_type(EventType::Observation)
3534 .content("deploy service beta")
3535 .summary("deploy service beta")
3536 .embedding(embedding.clone())
3537 .entity("deploy", "topic")
3538 .entity("service", "topic")
3539 .agent_id(agent())
3540 .build()
3541 .unwrap(),
3542 EpisodicRecord::builder()
3543 .event_type(EventType::Observation)
3544 .content("deploy service gamma")
3545 .summary("deploy service gamma")
3546 .embedding(embedding)
3547 .entity("deploy", "topic")
3548 .entity("service", "topic")
3549 .agent_id(agent())
3550 .build()
3551 .unwrap(),
3552 ])
3553 .await
3554 .into_iter()
3555 .map(Result::unwrap)
3556 .collect::<Vec<_>>();
3557
3558 for new_id in batched_ids {
3559 let edges = db
3560 .cached_graph()
3561 .get_edges_between(existing_id, new_id)
3562 .await
3563 .unwrap();
3564
3565 assert!(
3566 edges
3567 .iter()
3568 .any(|edge| edge.relation == EdgeRelation::SimilarTo),
3569 "expected SimilarTo edge between existing and batched embedded record"
3570 );
3571 assert!(
3572 edges
3573 .iter()
3574 .any(|edge| edge.relation == EdgeRelation::RelatedTo),
3575 "expected RelatedTo edge between existing and batched embedded record"
3576 );
3577 }
3578 }
3579
3580 #[tokio::test(flavor = "multi_thread")]
3581 async fn batch_remember_distinct_embedded_records_preserve_auto_edges_against_matching_existing_records()
3582 {
3583 let (db, _dir) = temp_db().await;
3584 let deploy_embedding = vec![1.0, 0.0, 0.0, 0.0];
3585 let cache_embedding = vec![0.0, 1.0, 0.0, 0.0];
3586
3587 let deploy_existing_id = db
3588 .remember(
3589 EpisodicRecord::builder()
3590 .event_type(EventType::Observation)
3591 .content("deploy service alpha")
3592 .summary("deploy service alpha")
3593 .embedding(deploy_embedding.clone())
3594 .entity("deploy", "topic")
3595 .entity("service", "topic")
3596 .agent_id(agent())
3597 .build()
3598 .unwrap(),
3599 )
3600 .await
3601 .unwrap();
3602
3603 let cache_existing_id = db
3604 .remember(
3605 EpisodicRecord::builder()
3606 .event_type(EventType::Observation)
3607 .content("cache index warmup")
3608 .summary("cache index warmup")
3609 .embedding(cache_embedding.clone())
3610 .entity("cache", "topic")
3611 .entity("index", "topic")
3612 .agent_id(agent())
3613 .build()
3614 .unwrap(),
3615 )
3616 .await
3617 .unwrap();
3618
3619 let batched_ids = db
3620 .batch_remember(vec![
3621 EpisodicRecord::builder()
3622 .event_type(EventType::Observation)
3623 .content("deploy service beta")
3624 .summary("deploy service beta")
3625 .embedding(deploy_embedding)
3626 .entity("deploy", "topic")
3627 .entity("service", "topic")
3628 .agent_id(agent())
3629 .build()
3630 .unwrap(),
3631 EpisodicRecord::builder()
3632 .event_type(EventType::Observation)
3633 .content("cache index refresh")
3634 .summary("cache index refresh")
3635 .embedding(cache_embedding)
3636 .entity("cache", "topic")
3637 .entity("index", "topic")
3638 .agent_id(agent())
3639 .build()
3640 .unwrap(),
3641 ])
3642 .await
3643 .into_iter()
3644 .map(Result::unwrap)
3645 .collect::<Vec<_>>();
3646
3647 let deploy_edges = db
3648 .cached_graph()
3649 .get_edges_between(deploy_existing_id, batched_ids[0])
3650 .await
3651 .unwrap();
3652 assert!(
3653 deploy_edges
3654 .iter()
3655 .any(|edge| edge.relation == EdgeRelation::SimilarTo),
3656 "expected SimilarTo edge between deploy records"
3657 );
3658 assert!(
3659 deploy_edges
3660 .iter()
3661 .any(|edge| edge.relation == EdgeRelation::RelatedTo),
3662 "expected RelatedTo edge between deploy records"
3663 );
3664
3665 let cache_edges = db
3666 .cached_graph()
3667 .get_edges_between(cache_existing_id, batched_ids[1])
3668 .await
3669 .unwrap();
3670 assert!(
3671 cache_edges
3672 .iter()
3673 .any(|edge| edge.relation == EdgeRelation::SimilarTo),
3674 "expected SimilarTo edge between cache records"
3675 );
3676 assert!(
3677 cache_edges
3678 .iter()
3679 .any(|edge| edge.relation == EdgeRelation::RelatedTo),
3680 "expected RelatedTo edge between cache records"
3681 );
3682 }
3683}