1use std::collections::{BTreeMap, HashMap, HashSet};
14use std::sync::Arc;
15
16const DEFAULT_BATCH_SEQUENTIAL_WEIGHT: f64 = 0.7;
18
19const ALL_STORES: [StoreType; 5] = [
21 StoreType::Episodic,
22 StoreType::Semantic,
23 StoreType::Procedural,
24 StoreType::Emotional,
25 StoreType::Working,
26];
27
28struct TimerGuard {
30 name: &'static str,
31 start: std::time::Instant,
32}
33
34struct DreamGroup {
35 session_id: String,
36 raw_topic_id: Option<String>,
37 topic_hint: Option<String>,
38 inferred_topic: bool,
39 records: Vec<RawJournalRecord>,
40}
41
42impl TimerGuard {
43 fn new(name: &'static str) -> Self {
44 Self {
45 name,
46 start: std::time::Instant::now(),
47 }
48 }
49}
50
51impl Drop for TimerGuard {
52 fn drop(&mut self) {
53 metrics::histogram!(self.name).record(self.start.elapsed().as_secs_f64());
54 }
55}
56
57macro_rules! dispatch_store {
59 ($self:expr, $store_type:expr, $method:ident ( $($arg:expr),* )) => {
60 match $store_type {
61 StoreType::Episodic => $self.episodic.$method($($arg),*).await,
62 StoreType::Semantic => $self.semantic.$method($($arg),*).await,
63 StoreType::Procedural => $self.procedural.$method($($arg),*).await,
64 StoreType::Emotional => $self.emotional.$method($($arg),*).await,
65 StoreType::Working => $self.working.$method($($arg),*).await,
66 }
67 };
68}
69
70use chrono::Utc;
71use tracing::{info, warn};
72use uuid::Uuid;
73
74use cerememory_association::SpreadingActivationEngine;
75use cerememory_core::error::CerememoryError;
76use cerememory_core::protocol::*;
77use cerememory_core::traits::*;
78use cerememory_core::types::*;
79use cerememory_decay::{DecayParams, PowerLawDecayEngine};
80use cerememory_evolution::EvolutionEngine;
81use cerememory_index::text_index::TextIndex;
82use cerememory_index::vector_index::VectorIndex;
83use cerememory_index::HippocampalCoordinator;
84use cerememory_store_emotional::EmotionalStore;
85use cerememory_store_episodic::EpisodicStore;
86use cerememory_store_procedural::ProceduralStore;
87use cerememory_store_raw::RawJournalStore;
88use cerememory_store_semantic::SemanticStore;
89use cerememory_store_working::WorkingMemoryStore;
90
91pub struct EngineConfig {
93 pub raw_journal_path: Option<String>,
94 pub episodic_path: Option<String>,
95 pub semantic_path: Option<String>,
96 pub procedural_path: Option<String>,
97 pub emotional_path: Option<String>,
98 pub working_capacity: usize,
99 pub decay_params: DecayParams,
100 pub recall_mode: RecallMode,
101 pub index_path: Option<String>,
103 pub vector_index_path: Option<String>,
105 pub background_decay_interval_secs: Option<u64>,
107 pub background_dream_interval_secs: Option<u64>,
109 pub hnsw_threshold: usize,
112 pub llm_provider: Option<Arc<dyn LLMProvider>>,
115}
116
117impl Default for EngineConfig {
118 fn default() -> Self {
119 Self {
120 raw_journal_path: None,
121 episodic_path: None,
122 semantic_path: None,
123 procedural_path: None,
124 emotional_path: None,
125 working_capacity: 7,
126 decay_params: DecayParams::default(),
127 recall_mode: RecallMode::Human,
128 index_path: None,
129 vector_index_path: None,
130 background_decay_interval_secs: None,
131 background_dream_interval_secs: None,
132 hnsw_threshold: cerememory_index::vector_index::DEFAULT_HNSW_THRESHOLD,
133 llm_provider: None,
134 }
135 }
136}
137
138pub struct CerememoryEngine {
140 raw_journal: RawJournalStore,
142 episodic: EpisodicStore,
143 semantic: SemanticStore,
144 procedural: ProceduralStore,
145 emotional: EmotionalStore,
146 working: WorkingMemoryStore,
147
148 decay: PowerLawDecayEngine,
150 activation: SpreadingActivationEngine<HippocampalCoordinator>,
151 evolution: EvolutionEngine,
152
153 coordinator: Arc<HippocampalCoordinator>,
155
156 text_index: TextIndex,
158 vector_index: VectorIndex,
159
160 recall_mode: tokio::sync::RwLock<RecallMode>,
162
163 llm_provider: Option<Arc<dyn LLMProvider>>,
165
166 background_decay_interval_secs: Option<u64>,
168 decay_state: tokio::sync::Mutex<Option<BackgroundDecayState>>,
169 background_dream_interval_secs: Option<u64>,
170 dream_state: tokio::sync::Mutex<Option<BackgroundDreamState>>,
171}
172
173struct BackgroundDecayState {
175 shutdown_tx: tokio::sync::watch::Sender<bool>,
176 handle: tokio::task::JoinHandle<()>,
177}
178
179struct BackgroundDreamState {
181 shutdown_tx: tokio::sync::watch::Sender<bool>,
182 handle: tokio::task::JoinHandle<()>,
183}
184
185impl CerememoryEngine {
186 pub fn new(config: EngineConfig) -> Result<Self, CerememoryError> {
188 let raw_journal = match &config.raw_journal_path {
189 Some(p) => RawJournalStore::open(p)?,
190 None => RawJournalStore::open_in_memory()?,
191 };
192
193 let episodic = match &config.episodic_path {
194 Some(p) => EpisodicStore::open(p)?,
195 None => EpisodicStore::open_in_memory()?,
196 };
197
198 let semantic = match &config.semantic_path {
199 Some(p) => SemanticStore::open(p)?,
200 None => SemanticStore::open_in_memory()?,
201 };
202
203 let text_index = match &config.index_path {
204 Some(p) => match TextIndex::open(p) {
205 Ok(idx) => idx,
206 Err(e) => {
207 warn!(
208 error = %e,
209 path = %p,
210 "Corrupted text index detected, recreating (data will be repopulated via rebuild_coordinator)"
211 );
212 let _ = std::fs::remove_dir_all(p);
214 TextIndex::open(p).map_err(|e2| {
215 CerememoryError::Storage(format!(
216 "Failed to recreate text index after corruption: {e2}"
217 ))
218 })?
219 }
220 },
221 None => TextIndex::open_in_memory()?,
222 };
223
224 let vector_index = match &config.vector_index_path {
225 Some(p) => VectorIndex::open_with_threshold(p, config.hnsw_threshold)?,
226 None => VectorIndex::open_in_memory_with_threshold(config.hnsw_threshold)?,
227 };
228
229 let procedural = match &config.procedural_path {
230 Some(p) => ProceduralStore::open(p)?,
231 None => ProceduralStore::open_in_memory()?,
232 };
233
234 let emotional = match &config.emotional_path {
235 Some(p) => EmotionalStore::open(p)?,
236 None => EmotionalStore::open_in_memory()?,
237 };
238
239 let coordinator = Arc::new(HippocampalCoordinator::new());
240 let activation = SpreadingActivationEngine::new(Arc::clone(&coordinator));
241
242 Ok(Self {
243 raw_journal,
244 episodic,
245 semantic,
246 procedural,
247 emotional,
248 working: WorkingMemoryStore::with_capacity(config.working_capacity.max(1)),
249 decay: PowerLawDecayEngine::new(config.decay_params),
250 activation,
251 evolution: EvolutionEngine::new(),
252 coordinator,
253 text_index,
254 vector_index,
255 recall_mode: tokio::sync::RwLock::new(config.recall_mode),
256 llm_provider: config.llm_provider,
257 background_decay_interval_secs: config.background_decay_interval_secs,
258 decay_state: tokio::sync::Mutex::new(None),
259 background_dream_interval_secs: config.background_dream_interval_secs,
260 dream_state: tokio::sync::Mutex::new(None),
261 })
262 }
263
264 pub fn in_memory() -> Result<Self, CerememoryError> {
266 Self::new(EngineConfig::default())
267 }
268
269 pub fn start_background_decay(self: &Arc<Self>) {
272 let Some(interval_secs) = self.background_decay_interval_secs else {
273 return;
274 };
275
276 let mut guard = match self.decay_state.try_lock() {
278 Ok(g) => g,
279 Err(_) => return,
280 };
281 if guard.is_some() {
282 return; }
284
285 let (tx, rx) = tokio::sync::watch::channel(false);
286 let engine = Arc::clone(self);
287
288 let handle = tokio::spawn(async move {
289 let mut backoff_secs = 1u64;
290 const MAX_BACKOFF_SECS: u64 = 60;
291
292 loop {
293 let engine_ref = Arc::clone(&engine);
294 let mut rx_ref = rx.clone();
295
296 let result = tokio::spawn(async move {
297 let mut interval =
298 tokio::time::interval(std::time::Duration::from_secs(interval_secs));
299 interval.tick().await; loop {
302 tokio::select! {
303 _ = interval.tick() => {
304 let req = DecayTickRequest {
305 header: None,
306 tick_duration_seconds: Some(interval_secs.min(u32::MAX as u64) as u32),
307 };
308 if let Err(e) = engine_ref.lifecycle_decay_tick(req).await {
309 warn!(error = %e, "Background decay tick failed");
310 }
311 }
312 _ = rx_ref.changed() => {
313 if *rx_ref.borrow() {
314 info!("Background decay stopped");
315 return;
316 }
317 }
318 }
319 }
320 })
321 .await;
322
323 if *rx.borrow() {
325 return;
326 }
327
328 match result {
329 Ok(()) => return, Err(e) => {
331 tracing::error!(
332 error = %e,
333 backoff_secs,
334 "Background decay task panicked, restarting"
335 );
336 metrics::counter!("cerememory_decay_panics_total").increment(1);
337 tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)).await;
338 backoff_secs = (backoff_secs * 2).min(MAX_BACKOFF_SECS);
339 }
340 }
341 }
342 });
343
344 *guard = Some(BackgroundDecayState {
345 shutdown_tx: tx,
346 handle,
347 });
348 }
349
350 pub async fn stop_background_decay(&self) {
352 let state = {
353 let mut guard = self.decay_state.lock().await;
354 guard.take()
355 };
356 if let Some(state) = state {
357 let _ = state.shutdown_tx.send(true);
358 let _ = state.handle.await;
359 }
360 }
361
362 pub async fn is_background_decay_enabled(&self) -> bool {
364 self.decay_state.lock().await.is_some()
365 }
366
367 pub fn start_background_dream(self: &Arc<Self>) {
370 let Some(interval_secs) = self.background_dream_interval_secs else {
371 return;
372 };
373
374 let mut guard = match self.dream_state.try_lock() {
375 Ok(g) => g,
376 Err(_) => return,
377 };
378 if guard.is_some() {
379 return;
380 }
381
382 let (tx, rx) = tokio::sync::watch::channel(false);
383 let engine = Arc::clone(self);
384
385 let handle = tokio::spawn(async move {
386 let mut backoff_secs = 1u64;
387 const MAX_BACKOFF_SECS: u64 = 300;
388
389 loop {
390 let engine_ref = Arc::clone(&engine);
391 let mut rx_ref = rx.clone();
392
393 let result = tokio::spawn(async move {
394 let mut interval =
395 tokio::time::interval(std::time::Duration::from_secs(interval_secs));
396 interval.tick().await;
397
398 loop {
399 tokio::select! {
400 _ = interval.tick() => {
401 let req = DreamTickRequest {
402 header: None,
403 session_id: None,
404 dry_run: false,
405 max_groups: 50,
406 include_private_scratch: false,
407 include_sealed: false,
408 promote_semantic: true,
409 secrecy_levels: None,
410 };
411 if let Err(e) = engine_ref.lifecycle_dream_tick(req).await {
412 warn!(error = %e, "Background dream tick failed");
413 }
414 }
415 _ = rx_ref.changed() => {
416 if *rx_ref.borrow() {
417 info!("Background dream processing stopped");
418 return;
419 }
420 }
421 }
422 }
423 })
424 .await;
425
426 if *rx.borrow() {
427 return;
428 }
429
430 match result {
431 Ok(()) => return,
432 Err(e) => {
433 tracing::error!(
434 error = %e,
435 backoff_secs,
436 "Background dream task panicked, restarting"
437 );
438 tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)).await;
439 backoff_secs = (backoff_secs * 2).min(MAX_BACKOFF_SECS);
440 }
441 }
442 }
443 });
444
445 *guard = Some(BackgroundDreamState {
446 shutdown_tx: tx,
447 handle,
448 });
449 }
450
451 pub async fn stop_background_dream(&self) {
453 let state = {
454 let mut guard = self.dream_state.lock().await;
455 guard.take()
456 };
457
458 if let Some(state) = state {
459 let _ = state.shutdown_tx.send(true);
460 if let Err(e) = state.handle.await {
461 tracing::error!(error = %e, "Background dream task join failed");
462 }
463 }
464 }
465
466 pub async fn is_background_dream_enabled(&self) -> bool {
468 self.dream_state.lock().await.is_some()
469 }
470
471 pub async fn rebuild_coordinator(&self) -> Result<(), CerememoryError> {
474 self.vector_index.clear()?;
476
477 let mut entries = Vec::new();
478 let mut text_records = Vec::new();
479
480 for store_type in [
481 StoreType::Episodic,
482 StoreType::Semantic,
483 StoreType::Procedural,
484 StoreType::Emotional,
485 ] {
486 let records = dispatch_store!(self, store_type, get_all())?;
487 for record in records {
488 entries.push((record.id, store_type, record.associations.clone()));
489
490 let text = Self::build_searchable_text(&record).unwrap_or_default();
492 if Self::has_indexable_content(&text, &record) {
493 text_records.push((
494 record.id,
495 store_type,
496 text,
497 record.content.summary.clone(),
498 ));
499 }
500
501 if let Some(embedding) = Self::primary_embedding(&record) {
502 let _ = self.vector_index.upsert(record.id, embedding);
503 }
504 }
505 }
506
507 self.coordinator.rebuild(entries).await;
508 self.text_index.rebuild(&text_records)?;
509 self.vector_index.rebuild_hnsw()?;
510
511 info!(
512 records = self.coordinator.total_records().await,
513 hnsw_active = self.vector_index.is_hnsw_active(),
514 "Coordinator and indexes rebuilt from persistent stores"
515 );
516 Ok(())
517 }
518
519 fn has_indexable_content(searchable_text: &str, record: &MemoryRecord) -> bool {
524 !searchable_text.is_empty()
525 || record
526 .content
527 .summary
528 .as_deref()
529 .map(str::trim)
530 .is_some_and(|summary| !summary.is_empty())
531 }
532
533 fn build_searchable_text(record: &MemoryRecord) -> Option<String> {
535 let mut chunks = Vec::new();
536
537 for block in &record.content.blocks {
538 match block.modality {
539 Modality::Text => match std::str::from_utf8(&block.data) {
540 Ok(text) => {
541 let trimmed = text.trim();
542 if !trimmed.is_empty() {
543 chunks.push(trimmed.to_string());
544 }
545 }
546 Err(error) => {
547 warn!(
548 error = %error,
549 record_id = %record.id,
550 "Skipping invalid UTF-8 text block during indexing"
551 );
552 }
553 },
554 Modality::Structured => {
555 match cerememory_index::structured_index::flatten_json_to_text(&block.data) {
556 Ok(flat) if !flat.is_empty() => chunks.push(flat),
557 Ok(_) => {}
558 Err(error) => {
559 warn!(
560 error = %error,
561 record_id = %record.id,
562 "Skipping invalid structured block during indexing"
563 );
564 }
565 }
566 }
567 _ => {}
568 }
569 }
570
571 if chunks.is_empty() {
572 None
573 } else {
574 Some(chunks.join("\n"))
575 }
576 }
577
578 fn primary_embedding(record: &MemoryRecord) -> Option<&[f32]> {
579 record
580 .content
581 .blocks
582 .iter()
583 .find_map(|block| block.embedding.as_deref())
584 }
585
586 fn detect_image_format(data: &[u8]) -> Option<&'static str> {
587 if data.len() >= 8 && data.starts_with(b"\x89PNG\r\n\x1a\n") {
588 return Some("png");
589 }
590 if data.len() >= 3 && data.starts_with(&[0xFF, 0xD8, 0xFF]) {
591 return Some("jpeg");
592 }
593 if data.len() >= 6 && (data.starts_with(b"GIF87a") || data.starts_with(b"GIF89a")) {
594 return Some("gif");
595 }
596 if data.len() >= 12 && data.starts_with(b"RIFF") && &data[8..12] == b"WEBP" {
597 return Some("webp");
598 }
599 None
600 }
601
602 fn detect_audio_format(data: &[u8]) -> Option<&'static str> {
603 if data.len() >= 12 && data.starts_with(b"RIFF") && &data[8..12] == b"WAVE" {
604 return Some("wav");
605 }
606 if data.len() >= 4 && data.starts_with(b"fLaC") {
607 return Some("flac");
608 }
609 if data.len() >= 4 && data.starts_with(b"OggS") {
610 return Some("ogg");
611 }
612 if data.len() >= 3 && data.starts_with(b"ID3") {
613 return Some("mp3");
614 }
615 if data.len() >= 2 && data[0] == 0xFF && (data[1] & 0xE0) == 0xE0 {
616 return Some("mp3");
617 }
618 if data.len() >= 12 && &data[4..8] == b"ftyp" {
619 return Some("mp4");
620 }
621 if data.len() >= 4 && data.starts_with(&[0x1A, 0x45, 0xDF, 0xA3]) {
622 return Some("webm");
623 }
624 None
625 }
626
627 async fn resolve_recall_cues(
628 &self,
629 cue: &RecallCue,
630 ) -> Result<(Option<String>, Option<Vec<f32>>), CerememoryError> {
631 let mut text = cue
632 .text
633 .as_deref()
634 .map(str::trim)
635 .filter(|value| !value.is_empty())
636 .map(str::to_owned);
637 let mut embedding = cue.embedding.clone();
638
639 if let Some(image) = cue.image.as_ref() {
640 if image.is_empty() {
641 return Err(CerememoryError::Validation(
642 "Recall image cue must not be empty".to_string(),
643 ));
644 }
645 if image.len() > MAX_IMAGE_SIZE {
646 return Err(CerememoryError::ContentTooLarge {
647 size: image.len(),
648 limit: MAX_IMAGE_SIZE,
649 });
650 }
651
652 if embedding.is_none() {
653 let provider = self.llm_provider.as_ref().ok_or_else(|| {
654 CerememoryError::ModalityUnsupported(
655 "Image recall requires an LLM provider with image embedding support"
656 .to_string(),
657 )
658 })?;
659 let caps = provider.capabilities();
660 if !caps.image_embedding {
661 return Err(CerememoryError::ModalityUnsupported(
662 "Configured LLM provider does not support image recall".to_string(),
663 ));
664 }
665
666 let format = Self::detect_image_format(image).ok_or_else(|| {
667 CerememoryError::Validation("Unsupported image recall cue format".to_string())
668 })?;
669 let generated = provider.embed_image(image, format).await?;
670 if generated.is_empty() {
671 return Err(CerememoryError::Internal(
672 "Image recall provider returned an empty embedding".to_string(),
673 ));
674 }
675 embedding = Some(generated);
676 }
677 }
678
679 if let Some(audio) = cue.audio.as_ref() {
680 if audio.is_empty() {
681 return Err(CerememoryError::Validation(
682 "Recall audio cue must not be empty".to_string(),
683 ));
684 }
685 if audio.len() > MAX_AUDIO_SIZE {
686 return Err(CerememoryError::ContentTooLarge {
687 size: audio.len(),
688 limit: MAX_AUDIO_SIZE,
689 });
690 }
691
692 let provider = self.llm_provider.as_ref().ok_or_else(|| {
693 CerememoryError::ModalityUnsupported(
694 "Audio recall requires an LLM provider with transcription support".to_string(),
695 )
696 })?;
697 let caps = provider.capabilities();
698 if !caps.audio_transcription {
699 return Err(CerememoryError::ModalityUnsupported(
700 "Configured LLM provider does not support audio recall".to_string(),
701 ));
702 }
703
704 let format = Self::detect_audio_format(audio).ok_or_else(|| {
705 CerememoryError::Validation("Unsupported audio recall cue format".to_string())
706 })?;
707 let transcript = provider.transcribe_audio(audio, format).await?;
708 let transcript = transcript.trim();
709 if transcript.is_empty() {
710 return Err(CerememoryError::Validation(
711 "Audio recall cue produced an empty transcript".to_string(),
712 ));
713 }
714 match &mut text {
715 Some(existing) => {
716 existing.push('\n');
717 existing.push_str(transcript);
718 }
719 None => text = Some(transcript.to_string()),
720 }
721 if embedding.is_none() && caps.text_embedding {
722 let generated = provider.embed(transcript).await?;
723 if !generated.is_empty() {
724 embedding = Some(generated);
725 }
726 }
727 }
728
729 if embedding.is_none() {
730 if let (Some(provider), Some(query_text)) =
731 (self.llm_provider.as_ref(), text.as_deref())
732 {
733 if provider.capabilities().text_embedding {
734 match provider.embed(query_text).await {
735 Ok(generated) if !generated.is_empty() => embedding = Some(generated),
736 Ok(_) => warn!("Text recall cue embedding returned an empty vector"),
737 Err(error) => warn!(error = %error, "Failed to embed text recall cue"),
738 }
739 }
740 }
741 }
742
743 Ok((text, embedding))
744 }
745
746 fn index_record(&self, record: &MemoryRecord) -> Result<(), CerememoryError> {
749 let text = Self::build_searchable_text(record).unwrap_or_default();
750 if Self::has_indexable_content(&text, record) {
751 self.text_index.add(
752 record.id,
753 record.store,
754 &text,
755 record.content.summary.as_deref(),
756 )?;
757 }
758
759 if let Some(embedding) = Self::primary_embedding(record) {
761 self.vector_index.upsert(record.id, embedding)?;
762 }
763
764 Ok(())
765 }
766
767 fn unindex_record(&self, id: Uuid) {
769 let _ = self.text_index.remove(id);
770 let _ = self.vector_index.remove(id);
771 }
772
773 fn route_store(&self, content: &MemoryContent) -> StoreType {
776 if content.summary.is_some() {
777 StoreType::Semantic
778 } else {
779 StoreType::Episodic
780 }
781 }
782
783 async fn get_store_record(
784 &self,
785 id: &Uuid,
786 ) -> Result<Option<(MemoryRecord, StoreType)>, CerememoryError> {
787 if let Some(st) = self.coordinator.get_record_store_type(id).await? {
789 let record = dispatch_store!(self, st, get(id))?;
790 return Ok(record.map(|r| (r, st)));
791 }
792
793 for st in [
795 StoreType::Working,
796 StoreType::Episodic,
797 StoreType::Semantic,
798 StoreType::Procedural,
799 StoreType::Emotional,
800 ] {
801 if let Some(r) = dispatch_store!(self, st, get(id))? {
802 return Ok(Some((r, st)));
803 }
804 }
805 Ok(None)
806 }
807
808 fn build_record_metadata(
809 context: Option<EncodeContext>,
810 metadata: Option<serde_json::Value>,
811 ) -> serde_json::Value {
812 let context_value = context.and_then(|ctx| serde_json::to_value(ctx).ok());
813 match (metadata, context_value) {
814 (Some(serde_json::Value::Object(mut map)), Some(context)) => {
815 map.insert("_context".to_string(), context);
816 serde_json::Value::Object(map)
817 }
818 (Some(other), Some(context)) => serde_json::json!({
819 "_metadata": other,
820 "_context": context,
821 }),
822 (Some(metadata), None) => metadata,
823 (None, Some(context)) => serde_json::json!({ "_context": context }),
824 (None, None) => serde_json::Value::Object(serde_json::Map::new()),
825 }
826 }
827
828 async fn persist_associations_for_record(
829 &self,
830 record_id: &Uuid,
831 store_type: StoreType,
832 associations: Vec<Association>,
833 ) -> Result<(), CerememoryError> {
834 dispatch_store!(
835 self,
836 store_type,
837 replace_associations(record_id, associations.clone())
838 )?;
839 self.coordinator
840 .update_associations(record_id, associations)
841 .await?;
842 Ok(())
843 }
844
845 async fn add_persisted_association(
846 &self,
847 record_id: &Uuid,
848 association: Association,
849 ) -> Result<(), CerememoryError> {
850 let Some((record, store_type)) = self.get_store_record(record_id).await? else {
851 return Err(CerememoryError::RecordNotFound(record_id.to_string()));
852 };
853
854 let mut associations = self.coordinator.get_associations(record_id).await?;
855 for persisted in &record.associations {
856 if !associations.iter().any(|existing| {
857 existing.target_id == persisted.target_id
858 && existing.association_type == persisted.association_type
859 }) {
860 associations.push(persisted.clone());
861 }
862 }
863
864 if associations.iter().any(|existing| {
865 existing.target_id == association.target_id
866 && existing.association_type == association.association_type
867 }) {
868 return Ok(());
869 }
870
871 associations.push(association);
872 self.persist_associations_for_record(record_id, store_type, associations)
873 .await
874 }
875
876 async fn remove_deleted_targets_from_records(
877 &self,
878 deleted_ids: &HashSet<Uuid>,
879 ) -> Result<(), CerememoryError> {
880 if deleted_ids.is_empty() {
881 return Ok(());
882 }
883
884 for store_type in ALL_STORES {
885 let records = dispatch_store!(self, store_type, get_all())?;
886 for record in records {
887 let filtered: Vec<_> = record
888 .associations
889 .iter()
890 .filter(|association| !deleted_ids.contains(&association.target_id))
891 .cloned()
892 .collect();
893 if filtered.len() != record.associations.len() {
894 self.persist_associations_for_record(&record.id, store_type, filtered)
895 .await?;
896 }
897 }
898 }
899
900 Ok(())
901 }
902
903 async fn cleanup_deleted_records(
904 &self,
905 deleted_records: &[(Uuid, StoreType)],
906 ) -> Result<(), CerememoryError> {
907 if deleted_records.is_empty() {
908 return Ok(());
909 }
910
911 let mut deleted_ids = HashSet::with_capacity(deleted_records.len());
912 for (record_id, _) in deleted_records {
913 deleted_ids.insert(*record_id);
914 self.coordinator.unregister(record_id).await;
915 self.unindex_record(*record_id);
916 }
917 self.remove_deleted_targets_from_records(&deleted_ids).await
918 }
919
920 fn normalize_export_format(format: &str) -> Result<&'static str, CerememoryError> {
921 let format = format.trim();
922 if format.eq_ignore_ascii_case("cma") || format.eq_ignore_ascii_case("jsonl") {
923 Ok("cma")
924 } else {
925 Err(CerememoryError::Validation(format!(
926 "Unsupported export format '{format}'. Valid options: cma, jsonl"
927 )))
928 }
929 }
930
931 async fn delete_records(
932 &self,
933 delete_targets: Vec<(Uuid, StoreType)>,
934 ) -> Result<u32, CerememoryError> {
935 let mut deleted_records = Vec::new();
936 for (id, store_type) in delete_targets {
937 if dispatch_store!(self, store_type, delete(&id))? {
938 deleted_records.push((id, store_type));
939 }
940 }
941 let deleted = deleted_records.len() as u32;
942 self.cleanup_deleted_records(&deleted_records).await?;
943 Ok(deleted)
944 }
945
946 async fn clear_all_records(&self) -> Result<u32, CerememoryError> {
947 let mut delete_targets = Vec::new();
948 for store_type in ALL_STORES {
949 for id in dispatch_store!(self, store_type, list_ids())? {
950 delete_targets.push((id, store_type));
951 }
952 }
953 self.delete_records(delete_targets).await
954 }
955
956 async fn restore_records(&self, records: &[MemoryRecord]) -> Result<(), CerememoryError> {
957 self.clear_all_records().await?;
958 for record in records {
959 let store_type = record.store;
960 dispatch_store!(self, store_type, store(record.clone()))?;
961 self.coordinator
962 .register(record.id, store_type, record.associations.clone())
963 .await;
964 let _ = self.index_record(record);
965 }
966 Ok(())
967 }
968
969 async fn import_records_with_conflict_resolution(
970 &self,
971 records: Vec<MemoryRecord>,
972 conflict_resolution: ConflictResolution,
973 ) -> Result<u32, CerememoryError> {
974 let mut imported = 0u32;
975
976 for record in records {
977 let store_type = record.store;
978 let mut replaced_cross_store: Option<StoreType> = None;
979
980 if let Some((existing, existing_store)) = self.get_store_record(&record.id).await? {
981 match conflict_resolution {
982 ConflictResolution::KeepExisting => continue,
983 ConflictResolution::KeepImported => {
984 if existing_store != store_type {
985 replaced_cross_store = Some(existing_store);
986 }
987 }
988 ConflictResolution::KeepNewer => {
989 if record.updated_at <= existing.updated_at {
990 continue;
991 }
992 if existing_store != store_type {
993 replaced_cross_store = Some(existing_store);
994 }
995 }
996 }
997 }
998
999 dispatch_store!(self, store_type, store(record.clone()))?;
1000 if let Some(existing_store) = replaced_cross_store {
1001 if !dispatch_store!(self, existing_store, delete(&record.id))? {
1002 if let Err(e) = dispatch_store!(self, store_type, delete(&record.id)) {
1003 warn!(
1004 record_id = %record.id,
1005 store = %store_type,
1006 error = %e,
1007 "Failed to clean up newly stored record during import conflict rollback"
1008 );
1009 }
1010 return Err(CerememoryError::ImportConflict(format!(
1011 "Failed to replace cross-store record {} from {} to {}",
1012 record.id, existing_store, store_type
1013 )));
1014 }
1015 }
1016 self.coordinator
1017 .register(record.id, store_type, record.associations.clone())
1018 .await;
1019 if let Err(e) = self.index_record(&record) {
1020 warn!(error = %e, record_id = %record.id, "Failed to index imported record");
1021 }
1022 imported += 1;
1023 }
1024
1025 Ok(imported)
1026 }
1027
1028 async fn collect_all_raw_journal_records(
1029 &self,
1030 ) -> Result<Vec<RawJournalRecord>, CerememoryError> {
1031 self.raw_journal.get_all().await
1032 }
1033
1034 async fn clear_raw_journal(&self) -> Result<u32, CerememoryError> {
1035 let records = self.raw_journal.get_all().await?;
1036 let mut deleted = 0u32;
1037 for record in records {
1038 if self.raw_journal.delete(&record.id).await? {
1039 deleted += 1;
1040 }
1041 }
1042 Ok(deleted)
1043 }
1044
1045 async fn restore_raw_journal(
1046 &self,
1047 raw_records: &[RawJournalRecord],
1048 ) -> Result<(), CerememoryError> {
1049 self.clear_raw_journal().await?;
1050 for record in raw_records {
1051 self.raw_journal.append(record.clone()).await?;
1052 }
1053 Ok(())
1054 }
1055
1056 async fn import_raw_records_with_conflict_resolution(
1057 &self,
1058 raw_records: Vec<RawJournalRecord>,
1059 conflict_resolution: ConflictResolution,
1060 ) -> Result<u32, CerememoryError> {
1061 let mut imported = 0u32;
1062 for record in raw_records {
1063 if let Some(existing) = self.raw_journal.get(&record.id).await? {
1064 match conflict_resolution {
1065 ConflictResolution::KeepExisting => continue,
1066 ConflictResolution::KeepImported => {
1067 self.raw_journal.update(record.clone()).await?;
1068 imported += 1;
1069 }
1070 ConflictResolution::KeepNewer => {
1071 if record.updated_at > existing.updated_at {
1072 self.raw_journal.update(record.clone()).await?;
1073 imported += 1;
1074 }
1075 }
1076 }
1077 } else {
1078 self.raw_journal.append(record).await?;
1079 imported += 1;
1080 }
1081 }
1082 Ok(imported)
1083 }
1084
1085 pub async fn append_raw_journal(
1087 &self,
1088 record: RawJournalRecord,
1089 ) -> Result<Uuid, CerememoryError> {
1090 self.raw_journal.append(record).await
1091 }
1092
1093 pub async fn get_raw_journal_record(
1095 &self,
1096 id: &Uuid,
1097 ) -> Result<Option<RawJournalRecord>, CerememoryError> {
1098 self.raw_journal.get(id).await
1099 }
1100
1101 pub async fn query_raw_journal_by_session(
1103 &self,
1104 session_id: &str,
1105 ) -> Result<Vec<RawJournalRecord>, CerememoryError> {
1106 self.raw_journal.query_session(session_id).await
1107 }
1108
1109 pub async fn query_raw_journal_session_range(
1111 &self,
1112 session_id: &str,
1113 start: chrono::DateTime<Utc>,
1114 end: chrono::DateTime<Utc>,
1115 ) -> Result<Vec<RawJournalRecord>, CerememoryError> {
1116 self.raw_journal
1117 .query_session_range(session_id, start, end)
1118 .await
1119 }
1120
1121 pub async fn raw_journal_count(&self) -> Result<usize, CerememoryError> {
1123 self.raw_journal.count().await
1124 }
1125
1126 fn raw_query_allowed_visibility(
1127 record: &RawJournalRecord,
1128 include_private_scratch: bool,
1129 include_sealed: bool,
1130 ) -> bool {
1131 match record.visibility {
1132 RawVisibility::Normal => true,
1133 RawVisibility::PrivateScratch => include_private_scratch,
1134 RawVisibility::Sealed => include_sealed,
1135 }
1136 }
1137
1138 fn raw_query_allowed_secrecy(
1139 record: &RawJournalRecord,
1140 secrecy_levels: Option<&[SecrecyLevel]>,
1141 ) -> bool {
1142 match secrecy_levels {
1143 Some(levels) => levels.contains(&record.secrecy_level),
1144 None => matches!(
1145 record.secrecy_level,
1146 SecrecyLevel::Public | SecrecyLevel::Sensitive
1147 ),
1148 }
1149 }
1150
1151 fn raw_record_processed(record: &RawJournalRecord) -> bool {
1152 record
1153 .metadata
1154 .get("_dream")
1155 .and_then(|value| value.get("processed_at"))
1156 .and_then(|value| value.as_str())
1157 .is_some()
1158 }
1159
1160 fn mark_raw_record_dream_processed(
1161 record: &mut RawJournalRecord,
1162 summary_id: Uuid,
1163 semantic_id: Option<Uuid>,
1164 dreamed_at: chrono::DateTime<Utc>,
1165 ) {
1166 record.updated_at = dreamed_at;
1167 if !record.derived_memory_ids.contains(&summary_id) {
1168 record.derived_memory_ids.push(summary_id);
1169 }
1170 if let Some(semantic_id) = semantic_id {
1171 if !record.derived_memory_ids.contains(&semantic_id) {
1172 record.derived_memory_ids.push(semantic_id);
1173 }
1174 }
1175
1176 let metadata = if let serde_json::Value::Object(map) = &mut record.metadata {
1177 map
1178 } else {
1179 record.metadata = serde_json::json!({});
1180 match &mut record.metadata {
1181 serde_json::Value::Object(map) => map,
1182 _ => unreachable!("metadata initialized as object"),
1183 }
1184 };
1185
1186 let dream = metadata
1187 .entry("_dream".to_string())
1188 .or_insert_with(|| serde_json::json!({}));
1189 if let serde_json::Value::Object(map) = dream {
1190 map.insert(
1191 "processed_at".to_string(),
1192 serde_json::Value::String(dreamed_at.to_rfc3339()),
1193 );
1194 map.insert(
1195 "last_summary_id".to_string(),
1196 serde_json::Value::String(summary_id.to_string()),
1197 );
1198 }
1199
1200 let derived = metadata
1201 .entry("_derived".to_string())
1202 .or_insert_with(|| serde_json::json!({}));
1203 if let serde_json::Value::Object(map) = derived {
1204 let summary_entry = map
1205 .entry("episodic_summary_ids".to_string())
1206 .or_insert_with(|| serde_json::json!([]));
1207 if let serde_json::Value::Array(ids) = summary_entry {
1208 let summary_value = serde_json::Value::String(summary_id.to_string());
1209 if !ids.iter().any(|existing| existing == &summary_value) {
1210 ids.push(summary_value);
1211 }
1212 }
1213 if let Some(semantic_id) = semantic_id {
1214 let semantic_entry = map
1215 .entry("semantic_ids".to_string())
1216 .or_insert_with(|| serde_json::json!([]));
1217 if let serde_json::Value::Array(ids) = semantic_entry {
1218 let semantic_value = serde_json::Value::String(semantic_id.to_string());
1219 if !ids.iter().any(|existing| existing == &semantic_value) {
1220 ids.push(semantic_value);
1221 }
1222 }
1223 }
1224 }
1225 }
1226
1227 fn dream_stat_u64(summary_stats: &serde_json::Value, key: &str) -> u64 {
1228 summary_stats
1229 .get(key)
1230 .and_then(|value| value.as_u64())
1231 .unwrap_or(0)
1232 }
1233
1234 fn should_promote_dream_group(
1235 raw_topic_id: Option<&str>,
1236 topic_hint: Option<&str>,
1237 summary_stats: &serde_json::Value,
1238 ) -> bool {
1239 let normal_records = Self::dream_stat_u64(summary_stats, "normal_records");
1240 let has_topic_signal = raw_topic_id.filter(|topic| !topic.is_empty()).is_some()
1241 || topic_hint.filter(|topic| !topic.is_empty()).is_some();
1242 normal_records >= 2 && has_topic_signal
1243 }
1244
1245 fn attach_summary_semantic_link(summary_record: &mut MemoryRecord, semantic_id: Uuid) {
1246 if let serde_json::Value::Object(map) = &mut summary_record.metadata {
1247 let derived = map
1248 .entry("_derived".to_string())
1249 .or_insert_with(|| serde_json::json!({}));
1250 if let serde_json::Value::Object(derived_map) = derived {
1251 let semantic_ids = derived_map
1252 .entry("semantic_ids".to_string())
1253 .or_insert_with(|| serde_json::json!([]));
1254 if let serde_json::Value::Array(ids) = semantic_ids {
1255 let semantic_value = serde_json::Value::String(semantic_id.to_string());
1256 if !ids.iter().any(|existing| existing == &semantic_value) {
1257 ids.push(semantic_value);
1258 }
1259 }
1260 }
1261 }
1262 }
1263
1264 fn prepare_dream_summary_inputs(
1265 raw_records: &[RawJournalRecord],
1266 ) -> (Vec<String>, serde_json::Value) {
1267 let mut texts = Vec::new();
1268 let mut normal_count = 0u32;
1269 let mut private_scratch_redacted = 0u32;
1270 let mut sealed_redacted = 0u32;
1271 let mut secret_redacted = 0u32;
1272
1273 for record in raw_records {
1274 match (record.visibility, record.secrecy_level) {
1275 (_, SecrecyLevel::Secret) => {
1276 secret_redacted += 1;
1277 }
1278 (RawVisibility::Sealed, _) => {
1279 sealed_redacted += 1;
1280 }
1281 (RawVisibility::PrivateScratch, _) => {
1282 private_scratch_redacted += 1;
1283 }
1284 (RawVisibility::Normal, _) => {
1285 if let Some(text) = record.text_content() {
1286 texts.push(text.to_string());
1287 }
1288 normal_count += 1;
1289 }
1290 }
1291 }
1292
1293 let stats = serde_json::json!({
1294 "normal_records": normal_count,
1295 "private_scratch_redacted": private_scratch_redacted,
1296 "sealed_redacted": sealed_redacted,
1297 "secret_redacted": secret_redacted,
1298 "redacted_total": private_scratch_redacted + sealed_redacted + secret_redacted,
1299 });
1300
1301 (texts, stats)
1302 }
1303
1304 fn tokenize_topic_text(text: &str) -> HashSet<String> {
1305 const STOPWORDS: &[&str] = &[
1306 "the", "and", "for", "with", "that", "this", "from", "into", "only", "then", "than",
1307 "have", "has", "had", "were", "was", "are", "but", "not", "you", "your", "our", "raw",
1308 "note", "notes", "session", "topic", "summary", "record", "records",
1309 ];
1310
1311 text.chars()
1312 .map(|ch| if ch.is_alphanumeric() { ch } else { ' ' })
1313 .collect::<String>()
1314 .split_whitespace()
1315 .map(|token| token.to_lowercase())
1316 .filter(|token| token.len() >= 3)
1317 .filter(|token| !STOPWORDS.iter().any(|stopword| stopword == token))
1318 .collect()
1319 }
1320
1321 fn topic_token_overlap(left: &HashSet<String>, right: &HashSet<String>) -> f64 {
1322 if left.is_empty() || right.is_empty() {
1323 return 1.0;
1324 }
1325
1326 let intersection = left.intersection(right).count() as f64;
1327 let union = left.union(right).count() as f64;
1328 if union == 0.0 {
1329 1.0
1330 } else {
1331 intersection / union
1332 }
1333 }
1334
1335 fn infer_topic_hint(raw_records: &[RawJournalRecord]) -> Option<String> {
1336 let mut counts: HashMap<String, u32> = HashMap::new();
1337 for record in raw_records {
1338 if matches!(record.visibility, RawVisibility::Normal)
1339 && !matches!(record.secrecy_level, SecrecyLevel::Secret)
1340 {
1341 if let Some(text) = record.text_content() {
1342 for token in Self::tokenize_topic_text(text) {
1343 *counts.entry(token).or_insert(0) += 1;
1344 }
1345 }
1346 }
1347 }
1348
1349 let mut ranked: Vec<(String, u32)> = counts.into_iter().collect();
1350 ranked.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
1351 let top_terms: Vec<String> = ranked.into_iter().take(3).map(|(token, _)| token).collect();
1352 if top_terms.is_empty() {
1353 None
1354 } else {
1355 Some(top_terms.join("+"))
1356 }
1357 }
1358
1359 fn infer_dream_groups(records: Vec<RawJournalRecord>) -> Vec<DreamGroup> {
1360 let mut explicit_groups: BTreeMap<
1361 (chrono::NaiveDate, String, String),
1362 Vec<RawJournalRecord>,
1363 > = BTreeMap::new();
1364 let mut inferred_sources: BTreeMap<(chrono::NaiveDate, String), Vec<RawJournalRecord>> =
1365 BTreeMap::new();
1366
1367 for record in records {
1368 let date = record.created_at.date_naive();
1369 if let Some(topic_id) = record
1370 .topic_id
1371 .as_ref()
1372 .map(|topic| topic.trim())
1373 .filter(|topic| !topic.is_empty())
1374 {
1375 explicit_groups
1376 .entry((date, record.session_id.clone(), topic_id.to_string()))
1377 .or_default()
1378 .push(record);
1379 } else {
1380 inferred_sources
1381 .entry((date, record.session_id.clone()))
1382 .or_default()
1383 .push(record);
1384 }
1385 }
1386
1387 let mut groups = Vec::new();
1388 for ((_, session_id, topic_id), mut records) in explicit_groups {
1389 records.sort_by(|a, b| {
1390 a.created_at
1391 .cmp(&b.created_at)
1392 .then_with(|| a.id.cmp(&b.id))
1393 });
1394 groups.push(DreamGroup {
1395 session_id,
1396 raw_topic_id: Some(topic_id.clone()),
1397 topic_hint: Some(topic_id),
1398 inferred_topic: false,
1399 records,
1400 });
1401 }
1402
1403 for ((date, session_id), mut records) in inferred_sources {
1404 records.sort_by(|a, b| {
1405 a.created_at
1406 .cmp(&b.created_at)
1407 .then_with(|| a.id.cmp(&b.id))
1408 });
1409
1410 let mut current: Vec<RawJournalRecord> = Vec::new();
1411 let mut current_tokens: HashSet<String> = HashSet::new();
1412 let mut segment_index = 0u32;
1413
1414 for record in records {
1415 let record_tokens = record
1416 .text_content()
1417 .map(Self::tokenize_topic_text)
1418 .unwrap_or_default();
1419 let should_split = current.last().is_some_and(|previous: &RawJournalRecord| {
1420 let gap = record.created_at.signed_duration_since(previous.created_at);
1421 let overlap = Self::topic_token_overlap(¤t_tokens, &record_tokens);
1422 gap > chrono::Duration::minutes(45)
1423 || (gap > chrono::Duration::minutes(10) && overlap < 0.08)
1424 });
1425
1426 if should_split && !current.is_empty() {
1427 let topic_hint = Self::infer_topic_hint(¤t);
1428 groups.push(DreamGroup {
1429 session_id: session_id.clone(),
1430 raw_topic_id: None,
1431 topic_hint,
1432 inferred_topic: true,
1433 records: std::mem::take(&mut current),
1434 });
1435 current_tokens.clear();
1436 segment_index += 1;
1437 }
1438
1439 current_tokens.extend(record_tokens);
1440 current.push(record);
1441 }
1442
1443 if !current.is_empty() {
1444 let topic_hint = Self::infer_topic_hint(¤t)
1445 .or_else(|| Some(format!("auto-{}-{segment_index}", date.format("%Y%m%d"))));
1446 groups.push(DreamGroup {
1447 session_id: session_id.clone(),
1448 raw_topic_id: None,
1449 topic_hint,
1450 inferred_topic: true,
1451 records: current,
1452 });
1453 }
1454 }
1455
1456 groups.sort_by(|left, right| {
1457 let left_time = left.records.first().map(|record| record.created_at);
1458 let right_time = right.records.first().map(|record| record.created_at);
1459 left_time
1460 .cmp(&right_time)
1461 .then_with(|| left.session_id.cmp(&right.session_id))
1462 .then_with(|| left.topic_hint.cmp(&right.topic_hint))
1463 });
1464 groups
1465 }
1466
1467 async fn summarize_dream_group(
1468 &self,
1469 session_id: &str,
1470 topic_hint: Option<&str>,
1471 texts: &[String],
1472 summary_stats: &serde_json::Value,
1473 ) -> String {
1474 if let Some(provider) = &self.llm_provider {
1475 match provider.summarize(texts, 300).await {
1476 Ok(summary) if !summary.trim().is_empty() => {
1477 let redacted = summary_stats["redacted_total"].as_u64().unwrap_or(0);
1478 if redacted > 0 {
1479 return format!(
1480 "{summary}\n\n[{} raw record(s) redacted from summary]",
1481 redacted
1482 );
1483 }
1484 return summary;
1485 }
1486 Ok(_) | Err(_) => {}
1487 }
1488 }
1489
1490 let heading = match topic_hint.filter(|topic| !topic.is_empty()) {
1491 Some(topic) => format!("Dream summary for session {session_id} / topic {topic}:"),
1492 None => format!("Dream summary for session {session_id}:"),
1493 };
1494 let snippets: Vec<String> = texts
1495 .iter()
1496 .take(8)
1497 .map(|text| {
1498 if text.len() > 160 {
1499 format!("{}...", truncate_str(text, 160))
1500 } else {
1501 text.clone()
1502 }
1503 })
1504 .collect();
1505 let body = snippets.join(" | ");
1506 let redacted_total = summary_stats["redacted_total"].as_u64().unwrap_or(0);
1507 let suffix = if redacted_total > 0 {
1508 format!(" [Redacted raw records: {redacted_total}]")
1509 } else {
1510 String::new()
1511 };
1512 let combined = format!("{heading} {body}{suffix}");
1513 if combined.len() > 1200 {
1514 format!("{}...", truncate_str(&combined, 1200))
1515 } else {
1516 combined
1517 }
1518 }
1519
1520 fn build_dream_summary_metadata(
1521 session_id: &str,
1522 raw_topic_id: Option<&str>,
1523 topic_hint: Option<&str>,
1524 inferred_topic: bool,
1525 raw_records: &[RawJournalRecord],
1526 dreamed_at: chrono::DateTime<Utc>,
1527 summary_stats: serde_json::Value,
1528 ) -> serde_json::Value {
1529 let raw_ids: Vec<String> = raw_records
1530 .iter()
1531 .map(|record| record.id.to_string())
1532 .collect();
1533 let start = raw_records
1534 .first()
1535 .map(|record| record.created_at.to_rfc3339());
1536 let end = raw_records
1537 .last()
1538 .map(|record| record.created_at.to_rfc3339());
1539
1540 serde_json::json!({
1541 "_origin": {
1542 "raw_session_id": session_id,
1543 "raw_topic_id": raw_topic_id,
1544 "raw_topic_hint": topic_hint,
1545 "raw_topic_inferred": inferred_topic,
1546 "raw_record_ids": raw_ids,
1547 "dream_tick_at": dreamed_at.to_rfc3339(),
1548 "raw_record_count": raw_records.len(),
1549 "range_start": start,
1550 "range_end": end
1551 },
1552 "_dream": {
1553 "kind": "episodic_summary",
1554 "source": "raw_journal",
1555 "summary_stats": summary_stats
1556 }
1557 })
1558 }
1559
1560 #[allow(clippy::too_many_arguments)]
1561 fn build_dream_semantic_metadata(
1562 session_id: &str,
1563 raw_topic_id: Option<&str>,
1564 topic_hint: Option<&str>,
1565 inferred_topic: bool,
1566 raw_records: &[RawJournalRecord],
1567 summary_record_id: Uuid,
1568 dreamed_at: chrono::DateTime<Utc>,
1569 summary_stats: serde_json::Value,
1570 ) -> serde_json::Value {
1571 let raw_ids: Vec<String> = raw_records
1572 .iter()
1573 .map(|record| record.id.to_string())
1574 .collect();
1575
1576 serde_json::json!({
1577 "_origin": {
1578 "raw_session_id": session_id,
1579 "raw_topic_id": raw_topic_id,
1580 "raw_topic_hint": topic_hint,
1581 "raw_topic_inferred": inferred_topic,
1582 "raw_record_ids": raw_ids,
1583 "dream_tick_at": dreamed_at.to_rfc3339(),
1584 "raw_record_count": raw_records.len(),
1585 "raw_summary_record_id": summary_record_id.to_string()
1586 },
1587 "_dream": {
1588 "kind": "semantic_summary",
1589 "source": "raw_journal",
1590 "summary_stats": summary_stats
1591 }
1592 })
1593 }
1594
1595 pub async fn encode_store_raw(
1597 &self,
1598 req: EncodeStoreRawRequest,
1599 ) -> Result<EncodeStoreRawResponse, CerememoryError> {
1600 let metadata = req.metadata.unwrap_or_else(|| serde_json::json!({}));
1601 let record = RawJournalRecord {
1602 id: Uuid::now_v7(),
1603 session_id: req.session_id,
1604 turn_id: req.turn_id,
1605 topic_id: req.topic_id,
1606 source: req.source,
1607 speaker: req.speaker,
1608 visibility: req.visibility,
1609 secrecy_level: req.secrecy_level,
1610 created_at: Utc::now(),
1611 updated_at: Utc::now(),
1612 content: req.content,
1613 metadata,
1614 derived_memory_ids: Vec::new(),
1615 suppressed: false,
1616 };
1617 record.validate()?;
1618 let record_id = self.raw_journal.append(record.clone()).await?;
1619
1620 Ok(EncodeStoreRawResponse {
1621 record_id,
1622 session_id: record.session_id,
1623 visibility: record.visibility,
1624 secrecy_level: record.secrecy_level,
1625 })
1626 }
1627
1628 pub async fn encode_batch_store_raw(
1630 &self,
1631 req: EncodeBatchStoreRawRequest,
1632 ) -> Result<EncodeBatchStoreRawResponse, CerememoryError> {
1633 let mut results = Vec::with_capacity(req.records.len());
1634 for record in req.records {
1635 results.push(self.encode_store_raw(record).await?);
1636 }
1637 Ok(EncodeBatchStoreRawResponse { results })
1638 }
1639
1640 pub async fn recall_raw_query(
1642 &self,
1643 req: RecallRawQueryRequest,
1644 ) -> Result<RecallRawQueryResponse, CerememoryError> {
1645 let secrecy_levels = req.secrecy_levels.as_deref();
1646 let query_lower = req.query.as_ref().map(|query| query.trim().to_lowercase());
1647 let session_filter = req
1648 .session_id
1649 .as_ref()
1650 .map(|session_id| session_id.trim())
1651 .filter(|session_id| !session_id.is_empty())
1652 .map(str::to_string);
1653
1654 let mut records = match (req.query.as_deref(), &session_filter, &req.temporal) {
1655 (Some(query), Some(session_id), _) if !query.trim().is_empty() => {
1656 self.raw_journal
1657 .search_text(
1658 query,
1659 Some(session_id),
1660 (req.limit as usize).saturating_mul(5),
1661 )
1662 .await?
1663 }
1664 (Some(query), None, _) if !query.trim().is_empty() => {
1665 self.raw_journal
1666 .search_text(query, None, (req.limit as usize).saturating_mul(5))
1667 .await?
1668 }
1669 (None, Some(session_id), Some(temporal)) => {
1670 self.raw_journal
1671 .query_session_range(session_id, temporal.start, temporal.end)
1672 .await?
1673 }
1674 (None, Some(session_id), None) => self.raw_journal.query_session(session_id).await?,
1675 _ => self.raw_journal.get_all().await?,
1676 };
1677 records.retain(|record| {
1678 if record.suppressed {
1679 return false;
1680 }
1681 if !Self::raw_query_allowed_visibility(
1682 record,
1683 req.include_private_scratch,
1684 req.include_sealed,
1685 ) {
1686 return false;
1687 }
1688 if !Self::raw_query_allowed_secrecy(record, secrecy_levels) {
1689 return false;
1690 }
1691 if session_filter.is_none() {
1692 if let Some(ref temporal) = req.temporal {
1693 if record.created_at < temporal.start || record.created_at > temporal.end {
1694 return false;
1695 }
1696 }
1697 }
1698 if let Some(ref query_lower) = query_lower {
1699 if !record.matches_text(query_lower) {
1700 return false;
1701 }
1702 }
1703 true
1704 });
1705 records.sort_by(|a, b| {
1706 a.created_at
1707 .cmp(&b.created_at)
1708 .then_with(|| a.id.cmp(&b.id))
1709 });
1710 let total_candidates = records.len() as u32;
1711 records.truncate(req.limit as usize);
1712
1713 Ok(RecallRawQueryResponse {
1714 records,
1715 total_candidates,
1716 })
1717 }
1718
1719 pub async fn encode_store(
1723 &self,
1724 req: EncodeStoreRequest,
1725 ) -> Result<EncodeStoreResponse, CerememoryError> {
1726 let _timer = TimerGuard::new("cerememory_encode_duration_seconds");
1727 let EncodeStoreRequest {
1728 content,
1729 store,
1730 emotion,
1731 context,
1732 metadata,
1733 associations,
1734 ..
1735 } = req;
1736 let store_type = store.unwrap_or_else(|| self.route_store(&content));
1737
1738 let mut record = MemoryRecord {
1739 id: Uuid::now_v7(),
1740 store: store_type,
1741 created_at: Utc::now(),
1742 updated_at: Utc::now(),
1743 last_accessed_at: Utc::now(),
1744 access_count: 0,
1745 content,
1746 fidelity: FidelityState::default(),
1747 emotion: emotion.unwrap_or_default(),
1748 associations: Vec::new(),
1749 metadata: Self::build_record_metadata(context, metadata),
1750 version: 1,
1751 };
1752
1753 let mut assoc_count = 0u32;
1755 if let Some(manual) = associations {
1756 for ma in manual {
1757 record.associations.push(Association {
1758 target_id: ma.target_id,
1759 association_type: ma.association_type,
1760 weight: ma.weight,
1761 created_at: Utc::now(),
1762 last_co_activation: Utc::now(),
1763 });
1764 assoc_count += 1;
1765 }
1766 }
1767
1768 if let Some(ref provider) = self.llm_provider {
1770 let caps = provider.capabilities();
1771
1772 let has_text_embedding = record
1774 .content
1775 .blocks
1776 .iter()
1777 .any(|b| b.modality == Modality::Text && b.embedding.is_some());
1778 if !has_text_embedding && caps.text_embedding {
1779 if let Some(text) = record.text_content().map(|s| s.to_string()) {
1780 match provider.embed(&text).await {
1781 Ok(embedding) if !embedding.is_empty() => {
1782 if let Some(block) = record
1783 .content
1784 .blocks
1785 .iter_mut()
1786 .find(|b| b.modality == Modality::Text)
1787 {
1788 block.embedding = Some(embedding);
1789 }
1790 }
1791 Ok(_) => {}
1792 Err(e) => {
1793 warn!(error = %e, "LLM text auto-embed failed, continuing without embedding");
1794 }
1795 }
1796 }
1797 }
1798
1799 let image_tasks: Vec<(usize, Vec<u8>, String)> = if caps.image_embedding {
1803 record
1804 .content
1805 .blocks
1806 .iter()
1807 .enumerate()
1808 .filter(|(_, b)| b.modality == Modality::Image && b.embedding.is_none())
1809 .map(|(i, b)| (i, b.data.clone(), b.format.clone()))
1810 .collect()
1811 } else {
1812 Vec::new()
1813 };
1814
1815 let audio_tasks: Vec<(Vec<u8>, String)> = if caps.audio_transcription {
1816 record
1817 .content
1818 .blocks
1819 .iter()
1820 .filter(|b| b.modality == Modality::Audio)
1821 .map(|b| (b.data.clone(), b.format.clone()))
1822 .collect()
1823 } else {
1824 Vec::new()
1825 };
1826
1827 if !image_tasks.is_empty() {
1829 let image_results: Vec<(usize, Result<Vec<f32>, _>)> =
1830 futures::future::join_all(image_tasks.iter().map(|(idx, data, fmt)| {
1831 let idx = *idx;
1832 async move { (idx, provider.embed_image(data, fmt).await) }
1833 }))
1834 .await;
1835
1836 for (idx, result) in image_results {
1837 match result {
1838 Ok(embedding) if !embedding.is_empty() => {
1839 record.content.blocks[idx].embedding = Some(embedding);
1840 }
1841 Ok(_) => {}
1842 Err(e) => {
1843 warn!(error = %e, "LLM image auto-embed failed, continuing without embedding");
1844 }
1845 }
1846 }
1847 }
1848
1849 if !audio_tasks.is_empty() {
1851 let audio_results: Vec<Result<String, _>> =
1852 futures::future::join_all(audio_tasks.iter().map(|(data, fmt)| async move {
1853 provider.transcribe_audio(data, fmt).await
1854 }))
1855 .await;
1856
1857 let mut new_text_blocks = Vec::new();
1858 for result in audio_results {
1859 match result {
1860 Ok(transcript) if !transcript.is_empty() => {
1861 let mut text_block = ContentBlock {
1862 modality: Modality::Text,
1863 format: "text/plain".to_string(),
1864 data: transcript.as_bytes().to_vec(),
1865 embedding: None,
1866 };
1867 if caps.text_embedding {
1868 match provider.embed(&transcript).await {
1869 Ok(emb) if !emb.is_empty() => {
1870 text_block.embedding = Some(emb);
1871 }
1872 Err(error) => {
1873 warn!(error = %error, "LLM transcript auto-embed failed, continuing without embedding");
1874 }
1875 Ok(_) => {}
1876 }
1877 }
1878 new_text_blocks.push(text_block);
1879 }
1880 Ok(_) => {}
1881 Err(e) => {
1882 warn!(error = %e, "LLM audio transcription failed, continuing without transcript");
1883 }
1884 }
1885 }
1886 record.content.blocks.extend(new_text_blocks);
1887 }
1888 }
1889
1890 record.validate()?;
1891 let id = record.id;
1892 let fidelity = record.fidelity.score;
1893
1894 if store_type == StoreType::Working {
1896 let (_, evicted) = self.working.store_with_eviction(record.clone()).await?;
1897 if let Some(evicted_id) = evicted {
1898 self.cleanup_deleted_records(&[(evicted_id, StoreType::Working)])
1899 .await?;
1900 }
1901 } else {
1902 dispatch_store!(self, store_type, store(record.clone()))?;
1903 }
1904
1905 self.coordinator
1907 .register(id, store_type, record.associations.clone())
1908 .await;
1909
1910 if let Err(e) = self.index_record(&record) {
1912 warn!(error = %e, record_id = %id, "Failed to index record, will be indexed on rebuild");
1913 }
1914
1915 info!(record_id = %id, store = %store_type, "Encoded memory record");
1916
1917 metrics::counter!("cerememory_encode_total", "store" => store_type.to_string())
1918 .increment(1);
1919
1920 Ok(EncodeStoreResponse {
1921 record_id: id,
1922 store: store_type,
1923 initial_fidelity: fidelity,
1924 associations_created: assoc_count,
1925 })
1926 }
1927
1928 pub async fn encode_batch(
1930 &self,
1931 req: EncodeBatchRequest,
1932 ) -> Result<EncodeBatchResponse, CerememoryError> {
1933 const MAX_BATCH_SIZE: usize = 1000;
1934 if req.records.len() > MAX_BATCH_SIZE {
1935 return Err(CerememoryError::Validation(format!(
1936 "Batch size {} exceeds maximum of {MAX_BATCH_SIZE}",
1937 req.records.len()
1938 )));
1939 }
1940 let mut results = Vec::with_capacity(req.records.len());
1941 let mut total_inferred = 0u32;
1942 let mut prev_id: Option<Uuid> = None;
1943
1944 for store_req in req.records {
1945 let resp = self.encode_store(store_req).await?;
1946
1947 if req.infer_associations {
1949 if let Some(prev) = prev_id {
1950 let assoc_fwd = Association {
1951 target_id: resp.record_id,
1952 association_type: AssociationType::Sequential,
1953 weight: DEFAULT_BATCH_SEQUENTIAL_WEIGHT,
1954 created_at: Utc::now(),
1955 last_co_activation: Utc::now(),
1956 };
1957 let assoc_bwd = Association {
1958 target_id: prev,
1959 association_type: AssociationType::Sequential,
1960 weight: DEFAULT_BATCH_SEQUENTIAL_WEIGHT,
1961 created_at: Utc::now(),
1962 last_co_activation: Utc::now(),
1963 };
1964 self.add_persisted_association(&prev, assoc_fwd).await?;
1965 self.add_persisted_association(&resp.record_id, assoc_bwd)
1966 .await?;
1967 total_inferred += 2;
1968 }
1969 }
1970
1971 prev_id = Some(resp.record_id);
1972 results.push(resp);
1973 }
1974
1975 Ok(EncodeBatchResponse {
1976 results,
1977 associations_inferred: total_inferred,
1978 })
1979 }
1980
1981 pub async fn encode_update(&self, req: EncodeUpdateRequest) -> Result<(), CerememoryError> {
1983 let (mut record, store_type) = self
1984 .get_store_record(&req.record_id)
1985 .await?
1986 .ok_or_else(|| CerememoryError::RecordNotFound(req.record_id.to_string()))?;
1987
1988 record.apply_updates(
1990 req.content.clone(),
1991 req.emotion.clone(),
1992 req.metadata.clone(),
1993 );
1994 record.validate()?;
1995
1996 dispatch_store!(
1998 self,
1999 store_type,
2000 update_record(
2001 &req.record_id,
2002 req.content.clone(),
2003 req.emotion,
2004 req.metadata
2005 )
2006 )?;
2007
2008 if req.content.is_some() {
2010 let text = Self::build_searchable_text(&record).unwrap_or_default();
2011 if Self::has_indexable_content(&text, &record) {
2012 if let Err(e) = self.text_index.update(
2013 req.record_id,
2014 store_type,
2015 &text,
2016 record.content.summary.as_deref(),
2017 ) {
2018 warn!(error = %e, record_id = %req.record_id, "Failed to update text index");
2019 }
2020 } else if let Err(e) = self.text_index.remove(req.record_id) {
2021 warn!(error = %e, record_id = %req.record_id, "Failed to clear text index");
2022 }
2023
2024 let _ = self.vector_index.remove(req.record_id);
2026 if let Some(embedding) = Self::primary_embedding(&record) {
2027 if let Err(e) = self.vector_index.upsert(req.record_id, embedding) {
2028 warn!(error = %e, record_id = %req.record_id, "Failed to update vector index");
2029 }
2030 }
2031 }
2032
2033 Ok(())
2034 }
2035
2036 pub async fn recall_query(
2048 &self,
2049 req: RecallQueryRequest,
2050 ) -> Result<RecallQueryResponse, CerememoryError> {
2051 let _timer = TimerGuard::new("cerememory_recall_duration_seconds");
2052 let mode = *self.recall_mode.read().await;
2053 let recall_mode = req.recall_mode;
2054 let effective_mode = if mode == RecallMode::Perfect {
2055 RecallMode::Perfect
2056 } else {
2057 recall_mode
2058 };
2059 let (cue_text, cue_embedding) = self.resolve_recall_cues(&req.cue).await?;
2060
2061 let stores = req.stores.clone().unwrap_or_else(|| {
2062 vec![
2063 StoreType::Episodic,
2064 StoreType::Semantic,
2065 StoreType::Procedural,
2066 StoreType::Emotional,
2067 StoreType::Working,
2068 ]
2069 });
2070
2071 let mut candidates: Vec<(MemoryRecord, f64)> = Vec::new();
2072 let mut seen_ids: HashSet<Uuid> = HashSet::new();
2073
2074 let mut text_scores: HashMap<Uuid, f64> = HashMap::new();
2076 let mut vec_scores: HashMap<Uuid, f64> = HashMap::new();
2077
2078 if let Some(ref text) = cue_text {
2080 let search_limit = req.limit as usize * 3;
2081 match self.text_index.search(text, Some(&stores), search_limit) {
2082 Ok(hits) => {
2083 for hit in hits {
2084 text_scores.insert(hit.record_id, hit.score as f64);
2085 }
2086 }
2087 Err(e) => {
2088 warn!(error = %e, "Text index search failed, falling back to store query");
2090 for store_type in &stores {
2091 let results = dispatch_store!(
2092 self,
2093 *store_type,
2094 query_text(text, req.limit as usize * 2)
2095 )?;
2096 for record in results {
2097 text_scores.insert(record.id, record.fidelity.score);
2098 }
2099 }
2100 }
2101 }
2102 }
2103
2104 if let Some(ref embedding) = cue_embedding {
2106 if let Ok(hits) = self.vector_index.search(embedding, req.limit as usize * 3) {
2107 for hit in hits {
2108 if hit.similarity > 0.0 {
2109 vec_scores.insert(hit.record_id, hit.similarity as f64);
2110 }
2111 }
2112 }
2113 }
2114
2115 let all_ids: HashSet<Uuid> = text_scores
2117 .keys()
2118 .chain(vec_scores.keys())
2119 .copied()
2120 .collect();
2121 let mut scanned_ids = all_ids.clone();
2122 let mut total_records_scanned = scanned_ids.len() as u32;
2123 let mut fidelity_filtered: u32 = 0;
2124
2125 for id in all_ids {
2126 if !seen_ids.insert(id) {
2127 continue;
2128 }
2129 if let Some((record, _)) = self.get_store_record(&id).await? {
2130 if !stores.contains(&record.store) {
2132 continue;
2133 }
2134 if let Some(min_f) = req.min_fidelity {
2135 if record.fidelity.score < min_f && !req.include_decayed {
2136 fidelity_filtered += 1;
2137 continue;
2138 }
2139 }
2140
2141 let ts = text_scores.get(&id);
2143 let vs = vec_scores.get(&id);
2144 let score = match (ts, vs) {
2145 (Some(&t), Some(&v)) => t * 0.6 + v * 0.4,
2146 (Some(&t), None) => t,
2147 (None, Some(&v)) => v,
2148 (None, None) => 0.0,
2149 };
2150
2151 candidates.push((record, score));
2152 }
2153 }
2154
2155 if cue_text.is_none() && cue_embedding.is_none() {
2158 }
2160
2161 if let Some(ref temporal) = req.cue.temporal {
2163 for store_type in &stores {
2164 let records = dispatch_store!(self, *store_type, get_all())?;
2165 for record in records {
2166 if record.created_at < temporal.start || record.created_at > temporal.end {
2167 continue;
2168 }
2169 if scanned_ids.insert(record.id) {
2170 total_records_scanned += 1;
2171 }
2172 if !seen_ids.insert(record.id) {
2173 continue;
2174 }
2175 if let Some(min_f) = req.min_fidelity {
2176 if record.fidelity.score < min_f && !req.include_decayed {
2177 fidelity_filtered += 1;
2178 continue;
2179 }
2180 }
2181 let score = record.fidelity.score;
2182 candidates.push((record, score));
2183 }
2184 }
2185 }
2186
2187 let mut activated_ids: HashMap<Uuid, f64> = HashMap::new();
2189 if req.activation_depth > 0 && !candidates.is_empty() {
2190 let top_id = candidates
2191 .iter()
2192 .max_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal))
2193 .map(|(r, _)| r.id);
2194
2195 if let Some(source_id) = top_id {
2196 if let Ok(activated) = self
2197 .activation
2198 .activate(&source_id, req.activation_depth, 0.05)
2199 .await
2200 {
2201 for act in &activated {
2202 activated_ids.insert(act.record_id, act.activation_level);
2203 if seen_ids.insert(act.record_id) {
2204 if let Some((record, _store)) =
2205 self.get_store_record(&act.record_id).await?
2206 {
2207 if !stores.contains(&record.store) {
2208 continue;
2209 }
2210 if let Some(ref temporal) = req.cue.temporal {
2211 if record.created_at < temporal.start
2212 || record.created_at > temporal.end
2213 {
2214 continue;
2215 }
2216 }
2217 if let Some(min_f) = req.min_fidelity {
2218 if record.fidelity.score < min_f && !req.include_decayed {
2219 fidelity_filtered += 1;
2220 continue;
2221 }
2222 }
2223 candidates.push((record, act.activation_level * 0.5));
2224 }
2225 }
2226 }
2227 }
2228 }
2229 }
2230
2231 for (record, relevance) in &mut candidates {
2233 if let Some(activation) = activated_ids.get(&record.id) {
2234 *relevance += activation * 0.3;
2235 }
2236 }
2237
2238 if let Some(ref temporal) = req.cue.temporal {
2241 candidates.retain(|(record, _)| {
2242 record.created_at >= temporal.start && record.created_at <= temporal.end
2243 });
2244 }
2245 candidates.retain(|(record, _)| stores.contains(&record.store));
2246 if let Some(min_fidelity) = req.min_fidelity {
2247 let before = candidates.len();
2248 candidates
2249 .retain(|(record, _)| req.include_decayed || record.fidelity.score >= min_fidelity);
2250 fidelity_filtered += (before - candidates.len()) as u32;
2251 }
2252
2253 candidates.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
2255 let total_candidates = candidates.len() as u32;
2256 candidates.truncate(req.limit as usize);
2257
2258 let mut memories = Vec::with_capacity(candidates.len());
2260 for (mut record, relevance) in candidates {
2261 if req.reconsolidate {
2263 record.access_count += 1;
2264 record.last_accessed_at = Utc::now();
2265
2266 let new_stability = self.decay.boost_stability(record.fidelity.stability);
2268 record.fidelity.stability = new_stability;
2269 record.fidelity.reinforcement_count += 1;
2270
2271 if let Some(store_type) = self.coordinator.get_record_store_type(&record.id).await?
2272 {
2273 if let Err(e) = dispatch_store!(
2274 self,
2275 store_type,
2276 update_fidelity(&record.id, record.fidelity.clone())
2277 ) {
2278 warn!(record_id = %record.id, error = %e, "Failed to update fidelity during reconsolidation");
2279 }
2280 if let Err(e) = dispatch_store!(
2281 self,
2282 store_type,
2283 update_access(&record.id, record.access_count, record.last_accessed_at)
2284 ) {
2285 warn!(record_id = %record.id, error = %e, "Failed to update access during reconsolidation");
2286 }
2287 }
2288 }
2289
2290 let rendered_content = match effective_mode {
2292 RecallMode::Perfect => record.content.clone(),
2293 RecallMode::Human => apply_human_noise(&record.content, record.fidelity.score),
2294 };
2295
2296 memories.push(RecalledMemory {
2297 activation_path: activated_ids.get(&record.id).map(|_| vec![record.id]),
2298 relevance_score: relevance,
2299 rendered_content,
2300 record,
2301 });
2302 }
2303
2304 if !memories.is_empty() {
2306 let hit_count = memories.iter().filter(|m| m.relevance_score > 0.0).count();
2307 let hit_rate = hit_count as f64 / memories.len() as f64;
2308 let store = memories[0].record.store;
2310 self.evolution.observe_recall(store, hit_rate);
2311 }
2312
2313 metrics::counter!("cerememory_recall_total").increment(1);
2314
2315 Ok(RecallQueryResponse {
2316 memories,
2317 activation_trace: None,
2318 total_candidates,
2319 query_metadata: Some(QueryMetadata {
2320 total_records_scanned,
2321 stores_searched: stores,
2322 fidelity_filtered,
2323 }),
2324 })
2325 }
2326
2327 pub async fn recall_associate(
2329 &self,
2330 req: RecallAssociateRequest,
2331 ) -> Result<RecallAssociateResponse, CerememoryError> {
2332 self.get_store_record(&req.record_id)
2334 .await?
2335 .ok_or_else(|| CerememoryError::RecordNotFound(req.record_id.to_string()))?;
2336
2337 let activated = self
2338 .activation
2339 .activate(&req.record_id, req.depth, req.min_weight)
2340 .await?;
2341
2342 let source_assocs = if req.association_types.is_some() {
2343 Some(self.coordinator.get_associations(&req.record_id).await?)
2344 } else {
2345 None
2346 };
2347
2348 let mut memories = Vec::new();
2349 for act in activated.iter().take(req.limit as usize) {
2350 if let Some(types) = &req.association_types {
2351 let assocs = source_assocs.as_ref().expect("pre-fetched above");
2352 let matches = assocs
2353 .iter()
2354 .any(|a| a.target_id == act.record_id && types.contains(&a.association_type));
2355 if !matches && act.path.len() <= 2 {
2356 continue;
2357 }
2358 }
2359
2360 if let Some((record, _)) = self.get_store_record(&act.record_id).await? {
2361 memories.push(RecalledMemory {
2362 rendered_content: record.content.clone(),
2363 relevance_score: act.activation_level,
2364 activation_path: Some(act.path.clone()),
2365 record,
2366 });
2367 }
2368 }
2369
2370 Ok(RecallAssociateResponse {
2371 total_candidates: memories.len() as u32,
2372 memories,
2373 })
2374 }
2375
2376 pub async fn recall_timeline(
2378 &self,
2379 req: RecallTimelineRequest,
2380 ) -> Result<RecallTimelineResponse, CerememoryError> {
2381 let records = self
2382 .episodic
2383 .query_temporal_range(req.range.start, req.range.end)
2384 .await?;
2385
2386 let mut bucket_map: std::collections::BTreeMap<i64, Vec<MemoryRecord>> =
2388 std::collections::BTreeMap::new();
2389
2390 for record in records {
2391 if let Some(min_f) = req.min_fidelity {
2393 if record.fidelity.score < min_f {
2394 continue;
2395 }
2396 }
2397
2398 if let Some(ref filter) = req.emotion_filter {
2400 if !Self::emotion_matches(&record.emotion, filter) {
2401 continue;
2402 }
2403 }
2404
2405 let bucket_key = Self::bucket_key(record.created_at, req.granularity);
2406 bucket_map.entry(bucket_key).or_default().push(record);
2407 }
2408
2409 for store_type in [
2411 StoreType::Semantic,
2412 StoreType::Procedural,
2413 StoreType::Emotional,
2414 ] {
2415 let records = dispatch_store!(self, store_type, get_all())?;
2416 for record in records {
2417 if record.created_at >= req.range.start && record.created_at <= req.range.end {
2418 if let Some(min_f) = req.min_fidelity {
2419 if record.fidelity.score < min_f {
2420 continue;
2421 }
2422 }
2423 if let Some(ref filter) = req.emotion_filter {
2424 if !Self::emotion_matches(&record.emotion, filter) {
2425 continue;
2426 }
2427 }
2428 let bucket_key = Self::bucket_key(record.created_at, req.granularity);
2429 bucket_map.entry(bucket_key).or_default().push(record);
2430 }
2431 }
2432 }
2433
2434 let buckets: Vec<TimelineBucket> = bucket_map
2436 .into_iter()
2437 .map(|(key, records)| {
2438 let (start, end) = Self::bucket_range(key, req.granularity);
2439 let count = records.len() as u32;
2440 let memories = records
2441 .into_iter()
2442 .map(|record| RecalledMemory {
2443 relevance_score: record.fidelity.score,
2444 rendered_content: record.content.clone(),
2445 activation_path: None,
2446 record,
2447 })
2448 .collect();
2449 TimelineBucket {
2450 start,
2451 end,
2452 memories,
2453 count,
2454 }
2455 })
2456 .collect();
2457
2458 Ok(RecallTimelineResponse { buckets })
2459 }
2460
2461 fn emotion_matches(record_emotion: &EmotionVector, filter: &EmotionVector) -> bool {
2465 let r = [
2466 record_emotion.joy,
2467 record_emotion.trust,
2468 record_emotion.fear,
2469 record_emotion.surprise,
2470 record_emotion.sadness,
2471 record_emotion.disgust,
2472 record_emotion.anger,
2473 record_emotion.anticipation,
2474 ];
2475 let f = [
2476 filter.joy,
2477 filter.trust,
2478 filter.fear,
2479 filter.surprise,
2480 filter.sadness,
2481 filter.disgust,
2482 filter.anger,
2483 filter.anticipation,
2484 ];
2485
2486 let dot: f64 = r.iter().zip(f.iter()).map(|(a, b)| a * b).sum();
2487 let norm_r: f64 = r.iter().map(|x| x * x).sum::<f64>().sqrt();
2488 let norm_f: f64 = f.iter().map(|x| x * x).sum::<f64>().sqrt();
2489
2490 if norm_r < f64::EPSILON || norm_f < f64::EPSILON {
2491 return false; }
2493
2494 let similarity = dot / (norm_r * norm_f);
2495 similarity > 0.5
2496 }
2497
2498 fn bucket_key(ts: chrono::DateTime<Utc>, granularity: TimeGranularity) -> i64 {
2499 use chrono::Datelike;
2500 let secs = ts.timestamp();
2501 match granularity {
2502 TimeGranularity::Minute => secs / 60,
2503 TimeGranularity::Hour => secs / 3600,
2504 TimeGranularity::Day => secs / 86400,
2505 TimeGranularity::Week => secs / 604800,
2506 TimeGranularity::Month => ts.year() as i64 * 12 + (ts.month() as i64 - 1),
2507 }
2508 }
2509
2510 fn bucket_range(
2511 key: i64,
2512 granularity: TimeGranularity,
2513 ) -> (chrono::DateTime<Utc>, chrono::DateTime<Utc>) {
2514 use chrono::TimeZone;
2515
2516 let epoch_secs = |g: TimeGranularity| -> Option<i64> {
2518 match g {
2519 TimeGranularity::Minute => Some(60),
2520 TimeGranularity::Hour => Some(3600),
2521 TimeGranularity::Day => Some(86400),
2522 TimeGranularity::Week => Some(604800),
2523 TimeGranularity::Month => None,
2524 }
2525 };
2526
2527 if let Some(secs) = epoch_secs(granularity) {
2528 let start = Utc
2529 .timestamp_opt(key * secs, 0)
2530 .single()
2531 .unwrap_or_default();
2532 let end = Utc
2533 .timestamp_opt((key + 1) * secs, 0)
2534 .single()
2535 .unwrap_or_default();
2536 return (start, end);
2537 }
2538
2539 let year = key / 12;
2541 let month = (key % 12) + 1;
2542 let start = Utc
2543 .with_ymd_and_hms(year as i32, month as u32, 1, 0, 0, 0)
2544 .single()
2545 .unwrap_or_default();
2546 let next_month = if month == 12 { 1 } else { month + 1 };
2547 let next_year = if month == 12 { year + 1 } else { year };
2548 let end = Utc
2549 .with_ymd_and_hms(next_year as i32, next_month as u32, 1, 0, 0, 0)
2550 .single()
2551 .unwrap_or_default();
2552 (start, end)
2553 }
2554
2555 pub async fn recall_graph(
2557 &self,
2558 req: RecallGraphRequest,
2559 ) -> Result<RecallGraphResponse, CerememoryError> {
2560 let mut nodes: Vec<GraphNode> = Vec::new();
2561 let mut edges: Vec<GraphEdge> = Vec::new();
2562 let mut visited: HashSet<Uuid> = HashSet::new();
2563 let mut queue: std::collections::VecDeque<(Uuid, u32)> = std::collections::VecDeque::new();
2564 let limit = req.limit_nodes as usize;
2565
2566 if let Some(center) = req.center_id {
2568 queue.push_back((center, 0));
2569 } else {
2570 let reg = self.coordinator.records_by_store().await;
2572 let all_ids: Vec<Uuid> = {
2573 let mut ids = Vec::new();
2574 for st in [
2575 StoreType::Episodic,
2576 StoreType::Semantic,
2577 StoreType::Procedural,
2578 StoreType::Emotional,
2579 ] {
2580 if reg.contains_key(&st) {
2581 let store_ids = dispatch_store!(self, st, list_ids())?;
2582 ids.extend(store_ids.into_iter().take(limit));
2583 }
2584 if ids.len() >= limit {
2585 break;
2586 }
2587 }
2588 ids
2589 };
2590 for id in all_ids {
2591 queue.push_back((id, 0));
2592 }
2593 }
2594
2595 while let Some((id, depth)) = queue.pop_front() {
2597 if !visited.insert(id) || nodes.len() >= limit {
2598 continue;
2599 }
2600
2601 if let Some((record, _store)) = self.get_store_record(&id).await? {
2602 nodes.push(GraphNode {
2603 id: record.id,
2604 store: record.store,
2605 summary: record.content.summary.clone().or_else(|| {
2606 record.text_content().map(|t| {
2607 if t.len() > 80 {
2608 format!("{}...", truncate_str(t, 80))
2609 } else {
2610 t.to_string()
2611 }
2612 })
2613 }),
2614 fidelity: record.fidelity.score,
2615 });
2616
2617 if depth < req.depth {
2618 let assocs = self.coordinator.get_associations(&id).await?;
2619 for assoc in assocs {
2620 if let Some(ref types) = req.edge_types {
2622 let type_str = serde_json::to_value(assoc.association_type)
2623 .ok()
2624 .and_then(|v| v.as_str().map(|s| s.to_string()))
2625 .unwrap_or_default();
2626 if !types.iter().any(|t| t.to_lowercase() == type_str) {
2627 continue;
2628 }
2629 }
2630
2631 if edges.len() < limit * 10 {
2633 edges.push(GraphEdge {
2634 source: id,
2635 target: assoc.target_id,
2636 edge_type: assoc.association_type,
2637 weight: assoc.weight,
2638 });
2639 }
2640
2641 if !visited.contains(&assoc.target_id) {
2642 queue.push_back((assoc.target_id, depth + 1));
2643 }
2644 }
2645 }
2646 }
2647 }
2648
2649 let total_nodes = nodes.len() as u32;
2650 Ok(RecallGraphResponse {
2651 nodes,
2652 edges,
2653 total_nodes,
2654 })
2655 }
2656
2657 pub async fn lifecycle_dream_tick(
2661 &self,
2662 req: DreamTickRequest,
2663 ) -> Result<DreamTickResponse, CerememoryError> {
2664 let secrecy_levels = req.secrecy_levels.as_deref();
2665 let session_filter = req
2666 .session_id
2667 .as_ref()
2668 .map(|session_id| session_id.trim())
2669 .filter(|session_id| !session_id.is_empty())
2670 .map(str::to_string);
2671
2672 let raw_records = self.raw_journal.get_all().await?;
2673 let mut candidate_records = Vec::new();
2674
2675 for record in raw_records {
2676 if record.suppressed || Self::raw_record_processed(&record) {
2677 continue;
2678 }
2679 if !Self::raw_query_allowed_visibility(
2680 &record,
2681 req.include_private_scratch,
2682 req.include_sealed,
2683 ) {
2684 continue;
2685 }
2686 if !Self::raw_query_allowed_secrecy(&record, secrecy_levels) {
2687 continue;
2688 }
2689 if let Some(ref session_id) = session_filter {
2690 if &record.session_id != session_id {
2691 continue;
2692 }
2693 }
2694 candidate_records.push(record);
2695 }
2696
2697 let mut groups_processed = 0u32;
2698 let mut raw_records_processed = 0u32;
2699 let mut episodic_summaries_created = 0u32;
2700 let mut semantic_nodes_created = 0u32;
2701 let max_groups = req.max_groups as usize;
2702
2703 for group in Self::infer_dream_groups(candidate_records)
2704 .into_iter()
2705 .take(max_groups)
2706 {
2707 if group.records.is_empty() {
2708 continue;
2709 }
2710
2711 groups_processed += 1;
2712 raw_records_processed += group.records.len() as u32;
2713
2714 if req.dry_run {
2715 episodic_summaries_created += 1;
2716 continue;
2717 }
2718
2719 let session_id = group.session_id.clone();
2720 let (texts, summary_stats) = Self::prepare_dream_summary_inputs(&group.records);
2721
2722 let summary_text = if texts.is_empty() {
2723 let redacted_total = summary_stats["redacted_total"].as_u64().unwrap_or(0);
2724 format!(
2725 "Dream summary for session {}: {} raw record(s) preserved. {} redacted from summary.",
2726 session_id,
2727 group.records.len(),
2728 redacted_total
2729 )
2730 } else {
2731 self.summarize_dream_group(
2732 &session_id,
2733 group.topic_hint.as_deref(),
2734 &texts,
2735 &summary_stats,
2736 )
2737 .await
2738 };
2739
2740 let dreamed_at = Utc::now();
2741 let mut summary_record =
2742 MemoryRecord::new_text(StoreType::Episodic, summary_text.clone());
2743 summary_record.content.summary = Some(if summary_text.len() > 160 {
2744 format!("{}...", truncate_str(&summary_text, 160))
2745 } else {
2746 summary_text.clone()
2747 });
2748 summary_record.metadata = Self::build_dream_summary_metadata(
2749 &session_id,
2750 group.raw_topic_id.as_deref(),
2751 group.topic_hint.as_deref(),
2752 group.inferred_topic,
2753 &group.records,
2754 dreamed_at,
2755 summary_stats.clone(),
2756 );
2757
2758 let promote_semantic = req.promote_semantic
2759 && Self::should_promote_dream_group(
2760 group.raw_topic_id.as_deref(),
2761 group.topic_hint.as_deref(),
2762 &summary_stats,
2763 );
2764 let semantic_record = if promote_semantic {
2765 let mut semantic_record =
2766 MemoryRecord::new_text(StoreType::Semantic, summary_text.clone());
2767 semantic_record.content.summary = summary_record.content.summary.clone();
2768 semantic_record.metadata = Self::build_dream_semantic_metadata(
2769 &session_id,
2770 group.raw_topic_id.as_deref(),
2771 group.topic_hint.as_deref(),
2772 group.inferred_topic,
2773 &group.records,
2774 summary_record.id,
2775 dreamed_at,
2776 summary_stats.clone(),
2777 );
2778 Some(semantic_record)
2779 } else {
2780 None
2781 };
2782
2783 if let Some(semantic_record) = &semantic_record {
2784 let assoc = Association {
2785 target_id: semantic_record.id,
2786 association_type: AssociationType::Semantic,
2787 weight: 1.0,
2788 created_at: dreamed_at,
2789 last_co_activation: dreamed_at,
2790 };
2791 summary_record.associations.push(assoc);
2792 Self::attach_summary_semantic_link(&mut summary_record, semantic_record.id);
2793 }
2794
2795 let summary_id = summary_record.id;
2796 dispatch_store!(self, StoreType::Episodic, store(summary_record.clone()))?;
2797 self.coordinator
2798 .register(
2799 summary_record.id,
2800 StoreType::Episodic,
2801 summary_record.associations.clone(),
2802 )
2803 .await;
2804 if let Err(e) = self.index_record(&summary_record) {
2805 warn!(error = %e, record_id = %summary_record.id, "Failed to index dream summary");
2806 }
2807
2808 let semantic_id = if let Some(mut semantic_record) = semantic_record {
2809 semantic_record.associations.push(Association {
2810 target_id: summary_id,
2811 association_type: AssociationType::Semantic,
2812 weight: 1.0,
2813 created_at: dreamed_at,
2814 last_co_activation: dreamed_at,
2815 });
2816 let semantic_id = semantic_record.id;
2817 if let Err(err) =
2818 dispatch_store!(self, StoreType::Semantic, store(semantic_record.clone()))
2819 {
2820 if dispatch_store!(self, StoreType::Episodic, delete(&summary_id))? {
2821 self.cleanup_deleted_records(&[(summary_id, StoreType::Episodic)])
2822 .await?;
2823 }
2824 return Err(err);
2825 }
2826 self.coordinator
2827 .register(
2828 semantic_record.id,
2829 StoreType::Semantic,
2830 semantic_record.associations.clone(),
2831 )
2832 .await;
2833 if let Err(e) = self.index_record(&semantic_record) {
2834 warn!(error = %e, record_id = %semantic_record.id, "Failed to index dream semantic summary");
2835 }
2836 semantic_nodes_created += 1;
2837 Some(semantic_id)
2838 } else {
2839 None
2840 };
2841
2842 let original_group = group.records.clone();
2843 let mut update_failed: Option<CerememoryError> = None;
2844 let mut updated_group = group.records.clone();
2845 for raw_record in &mut updated_group {
2846 Self::mark_raw_record_dream_processed(
2847 raw_record,
2848 summary_id,
2849 semantic_id,
2850 dreamed_at,
2851 );
2852 if let Err(err) = self.raw_journal.update(raw_record.clone()).await {
2853 update_failed = Some(err);
2854 break;
2855 }
2856 }
2857
2858 if let Some(err) = update_failed {
2859 for original in &original_group {
2860 let _ = self.raw_journal.update(original.clone()).await;
2861 }
2862 let mut cleanup_targets = Vec::new();
2863 if dispatch_store!(self, StoreType::Episodic, delete(&summary_id))? {
2864 cleanup_targets.push((summary_id, StoreType::Episodic));
2865 }
2866 if let Some(semantic_id) = semantic_id {
2867 if dispatch_store!(self, StoreType::Semantic, delete(&semantic_id))? {
2868 cleanup_targets.push((semantic_id, StoreType::Semantic));
2869 }
2870 }
2871 if !cleanup_targets.is_empty() {
2872 self.cleanup_deleted_records(&cleanup_targets).await?;
2873 }
2874 return Err(CerememoryError::Internal(format!(
2875 "Dream tick failed while updating raw journal state: {err}"
2876 )));
2877 }
2878
2879 episodic_summaries_created += 1;
2880 }
2881
2882 Ok(DreamTickResponse {
2883 groups_processed,
2884 raw_records_processed,
2885 episodic_summaries_created,
2886 semantic_nodes_created,
2887 })
2888 }
2889
2890 pub async fn lifecycle_consolidate(
2899 &self,
2900 req: ConsolidateRequest,
2901 ) -> Result<ConsolidateResponse, CerememoryError> {
2902 let ids = self.episodic.list_ids().await?;
2903 let mut processed = 0u32;
2904 let mut migrated = 0u32;
2905 let mut semantic_created = 0u32;
2906 let mut compressed = 0u32;
2907 let mut pruned = 0u32;
2908
2909 let mut duplicate_groups: Vec<(Uuid, Uuid)> = Vec::new();
2911 if !req.dry_run {
2912 let mut checked: HashSet<Uuid> = HashSet::new();
2913 for &id in &ids {
2914 if checked.contains(&id) {
2915 continue;
2916 }
2917 if let Some(record) = self.episodic.get(&id).await? {
2919 if let Some(emb) = Self::primary_embedding(&record) {
2920 if let Ok(hits) = self.vector_index.search(emb, 5) {
2921 for hit in hits {
2922 if hit.record_id != id
2923 && hit.similarity > 0.92
2924 && !checked.contains(&hit.record_id)
2925 {
2926 let Some((_, hit_store)) =
2927 self.get_store_record(&hit.record_id).await?
2928 else {
2929 continue;
2930 };
2931 if hit_store != StoreType::Episodic {
2932 continue;
2933 }
2934 duplicate_groups.push((id, hit.record_id));
2935 checked.insert(hit.record_id);
2936 }
2937 }
2938 }
2939 }
2940 }
2941 checked.insert(id);
2942 }
2943
2944 for (keep_id, remove_id) in &duplicate_groups {
2946 if let (Some(keep_rec), Some(remove_rec)) = (
2947 self.episodic.get(keep_id).await?,
2948 self.get_store_record(remove_id).await?,
2949 ) {
2950 let (remove_record, remove_store) = remove_rec;
2951 let (actual_keep, actual_remove, actual_remove_store) =
2953 if keep_rec.fidelity.score >= remove_record.fidelity.score {
2954 (*keep_id, *remove_id, remove_store)
2955 } else {
2956 (*remove_id, *keep_id, StoreType::Episodic)
2957 };
2958
2959 let removed_assocs = self.coordinator.get_associations(&actual_remove).await?;
2961 for assoc in removed_assocs {
2962 self.add_persisted_association(&actual_keep, assoc).await?;
2963 }
2964
2965 if dispatch_store!(self, actual_remove_store, delete(&actual_remove))? {
2967 self.cleanup_deleted_records(&[(actual_remove, actual_remove_store)])
2968 .await?;
2969 compressed += 1;
2970 }
2971 }
2972 }
2973 }
2974
2975 let remaining_records = self.episodic.get_all().await?;
2977
2978 for record in remaining_records {
2979 processed += 1;
2980 let age_hours = (Utc::now() - record.created_at).num_hours() as u32;
2981 if age_hours < req.min_age_hours {
2982 continue;
2983 }
2984 if record.access_count < req.min_access_count {
2985 continue;
2986 }
2987
2988 if req.dry_run {
2989 migrated += 1;
2990 continue;
2991 }
2992
2993 let mut semantic_record = record.clone();
2995 semantic_record.store = StoreType::Semantic;
2996 semantic_record.id = Uuid::now_v7();
2997
2998 if semantic_record.content.summary.is_none() {
3000 if let Some(ref provider) = self.llm_provider {
3001 if let Some(text) = record.text_content() {
3002 match provider.summarize(&[text.to_string()], 200).await {
3003 Ok(summary) if !summary.is_empty() => {
3004 semantic_record.content.summary = Some(summary);
3005 }
3006 _ => {
3007 semantic_record.content.summary = Some(
3009 record
3010 .text_content()
3011 .map(|t| {
3012 if t.len() > 100 {
3013 format!("{}...", truncate_str(t, 100))
3014 } else {
3015 t.to_string()
3016 }
3017 })
3018 .unwrap_or_default(),
3019 );
3020 }
3021 }
3022 }
3023 } else {
3024 semantic_record.content.summary = semantic_record.text_content().map(|t| {
3025 if t.len() > 100 {
3026 format!("{}...", truncate_str(t, 100))
3027 } else {
3028 t.to_string()
3029 }
3030 });
3031 }
3032 }
3033
3034 if let Some(ref provider) = self.llm_provider {
3036 if let Some(text) = record.text_content() {
3037 if let Ok(relations) = provider.extract_relations(text).await {
3038 for rel in relations {
3039 if let serde_json::Value::Object(ref mut map) = semantic_record.metadata
3041 {
3042 let relations_arr = map
3043 .entry("extracted_relations".to_string())
3044 .or_insert_with(|| serde_json::json!([]));
3045 if let serde_json::Value::Array(ref mut arr) = relations_arr {
3046 arr.push(serde_json::json!({
3047 "subject": rel.subject,
3048 "predicate": rel.predicate,
3049 "object": rel.object,
3050 "confidence": rel.confidence,
3051 }));
3052 }
3053 }
3054 }
3055 }
3056 }
3057 }
3058
3059 dispatch_store!(self, StoreType::Semantic, store(semantic_record.clone()))?;
3060 semantic_created += 1;
3061 self.coordinator
3062 .register(
3063 semantic_record.id,
3064 StoreType::Semantic,
3065 semantic_record.associations.clone(),
3066 )
3067 .await;
3068
3069 if let Err(e) = self.index_record(&semantic_record) {
3070 warn!(error = %e, "Failed to index consolidated record");
3071 }
3072
3073 let assoc = Association {
3074 target_id: semantic_record.id,
3075 association_type: AssociationType::Semantic,
3076 weight: 1.0,
3077 created_at: Utc::now(),
3078 last_co_activation: Utc::now(),
3079 };
3080 self.add_persisted_association(&record.id, assoc).await?;
3081
3082 migrated += 1;
3083
3084 if record.fidelity.score < 0.1 && self.episodic.delete(&record.id).await? {
3085 self.cleanup_deleted_records(&[(record.id, StoreType::Episodic)])
3086 .await?;
3087 pruned += 1;
3088 }
3089 }
3090
3091 info!(
3092 processed,
3093 migrated, semantic_created, compressed, pruned, "Smart consolidation completed"
3094 );
3095
3096 Ok(ConsolidateResponse {
3097 records_processed: processed,
3098 records_migrated: migrated,
3099 records_compressed: compressed,
3100 records_pruned: pruned,
3101 semantic_nodes_created: semantic_created,
3102 })
3103 }
3104
3105 pub async fn lifecycle_decay_tick(
3107 &self,
3108 req: DecayTickRequest,
3109 ) -> Result<DecayTickResponse, CerememoryError> {
3110 let tick_secs = req.tick_duration_seconds.unwrap_or(3600) as f64;
3111
3112 let mut all_inputs = Vec::new();
3113 let mut record_stores: HashMap<Uuid, StoreType> = HashMap::new();
3114
3115 for store_type in [
3116 StoreType::Episodic,
3117 StoreType::Semantic,
3118 StoreType::Procedural,
3119 StoreType::Emotional,
3120 ] {
3121 let records = dispatch_store!(self, store_type, get_all())?;
3122 for record in records {
3123 all_inputs.push(DecayInput {
3124 id: record.id,
3125 fidelity: record.fidelity.clone(),
3126 emotion: record.emotion.clone(),
3127 last_accessed_at: record.last_accessed_at,
3128 access_count: record.access_count,
3129 });
3130 record_stores.insert(record.id, store_type);
3131 }
3132 }
3133
3134 let decay = self.decay.clone();
3135 let result =
3136 tokio::task::spawn_blocking(move || decay.compute_tick(&all_inputs, tick_secs))
3137 .await
3138 .map_err(|e| CerememoryError::Internal(format!("Decay task failed: {e}")))?;
3139
3140 for output in &result.updates {
3141 if let Some(&store_type) = record_stores.get(&output.id) {
3142 if output.should_prune {
3143 if dispatch_store!(self, store_type, delete(&output.id))? {
3144 self.cleanup_deleted_records(&[(output.id, store_type)])
3145 .await?;
3146 }
3147 } else {
3148 dispatch_store!(
3149 self,
3150 store_type,
3151 update_fidelity(&output.id, output.new_fidelity.clone())
3152 )?;
3153 }
3154 }
3155 }
3156
3157 let mut fidelity_by_store: HashMap<StoreType, Vec<f64>> = HashMap::new();
3159 for output in &result.updates {
3160 if let Some(&store_type) = record_stores.get(&output.id) {
3161 fidelity_by_store
3162 .entry(store_type)
3163 .or_default()
3164 .push(output.new_fidelity.score);
3165 }
3166 }
3167 for (store_type, scores) in &fidelity_by_store {
3168 self.evolution.observe_decay_tick(*store_type, scores);
3169 }
3170
3171 info!(
3172 updated = result.records_updated,
3173 below_threshold = result.records_below_threshold,
3174 pruned = result.records_pruned,
3175 "Decay tick completed"
3176 );
3177
3178 Ok(DecayTickResponse {
3179 records_updated: result.records_updated,
3180 records_below_threshold: result.records_below_threshold,
3181 records_pruned: result.records_pruned,
3182 })
3183 }
3184
3185 pub async fn lifecycle_set_mode(&self, req: SetModeRequest) -> Result<(), CerememoryError> {
3187 *self.recall_mode.write().await = req.mode;
3188 info!(mode = ?req.mode, "Recall mode changed");
3189 Ok(())
3190 }
3191
3192 pub async fn lifecycle_forget(&self, req: ForgetRequest) -> Result<u32, CerememoryError> {
3194 if !req.confirm {
3195 return Err(CerememoryError::ForgetUnconfirmed);
3196 }
3197
3198 let ForgetRequest {
3199 record_ids,
3200 store,
3201 temporal_range,
3202 cascade,
3203 ..
3204 } = req;
3205 let mut deleted = 0u32;
3206 let mut delete_targets: HashMap<Uuid, StoreType> = HashMap::new();
3207
3208 if let Some(ids) = record_ids {
3209 for id in ids {
3210 if let Some((record, store_type)) = self.get_store_record(&id).await? {
3211 delete_targets.insert(id, store_type);
3212 if cascade {
3213 for assoc in &record.associations {
3214 if let Some((_, st)) = self.get_store_record(&assoc.target_id).await? {
3215 delete_targets.insert(assoc.target_id, st);
3216 }
3217 }
3218 }
3219 }
3220 }
3221 }
3222
3223 if let Some(store_type) = store {
3224 let ids = dispatch_store!(self, store_type, list_ids())?;
3225 for id in ids {
3226 delete_targets.insert(id, store_type);
3227 }
3228 }
3229
3230 if let Some(range) = temporal_range {
3231 for store_type in ALL_STORES {
3232 let records = dispatch_store!(self, store_type, get_all())?;
3233 for record in records {
3234 if record.created_at >= range.start && record.created_at <= range.end {
3235 delete_targets.insert(record.id, store_type);
3236 }
3237 }
3238 }
3239 }
3240
3241 let mut deleted_records = Vec::new();
3242 for (id, store_type) in delete_targets {
3243 if dispatch_store!(self, store_type, delete(&id))? {
3244 deleted_records.push((id, store_type));
3245 deleted += 1;
3246 }
3247 }
3248 self.cleanup_deleted_records(&deleted_records).await?;
3249
3250 warn!(deleted, "Forget operation completed");
3251 Ok(deleted)
3252 }
3253
3254 pub async fn lifecycle_export(
3257 &self,
3258 req: ExportRequest,
3259 ) -> Result<(Vec<u8>, ExportResponse), CerememoryError> {
3260 match Self::normalize_export_format(&req.format)? {
3261 "cma" => {}
3262 _ => unreachable!("normalize_export_format only returns supported formats"),
3263 }
3264
3265 let records = self
3266 .collect_records_for_stores(req.stores.as_deref())
3267 .await?;
3268 let raw_records = if req.include_raw_journal {
3269 self.collect_all_raw_journal_records().await?
3270 } else {
3271 Vec::new()
3272 };
3273
3274 let encryption_key = if req.encrypt {
3275 let key_str = req.encryption_key.as_deref().ok_or_else(|| {
3276 CerememoryError::Validation(
3277 "encryption_key is required when encrypt=true".to_string(),
3278 )
3279 })?;
3280 Some(cerememory_archive::crypto::derive_key(key_str))
3281 } else {
3282 None
3283 };
3284
3285 if req.include_raw_journal {
3286 cerememory_archive::export_bundle_filtered(
3287 &records,
3288 &raw_records,
3289 req.stores.as_deref(),
3290 encryption_key.as_ref(),
3291 )
3292 } else {
3293 cerememory_archive::export_filtered(
3294 &records,
3295 req.stores.as_deref(),
3296 encryption_key.as_ref(),
3297 )
3298 }
3299 }
3300
3301 pub async fn lifecycle_import(&self, req: ImportRequest) -> Result<u32, CerememoryError> {
3304 let data = req.archive_data.ok_or_else(|| {
3305 CerememoryError::Validation("Import requires archive_data".to_string())
3306 })?;
3307
3308 let decryption_key = req
3309 .decryption_key
3310 .as_deref()
3311 .map(cerememory_archive::crypto::derive_key);
3312 let bundle = cerememory_archive::import_bundle_with_key(&data, decryption_key.as_ref())?;
3313 let imported = match req.strategy {
3314 ImportStrategy::Merge => {
3315 let imported_curated = self
3316 .import_records_with_conflict_resolution(
3317 bundle.records,
3318 req.conflict_resolution,
3319 )
3320 .await?;
3321 let imported_raw = self
3322 .import_raw_records_with_conflict_resolution(
3323 bundle.raw_records,
3324 req.conflict_resolution,
3325 )
3326 .await?;
3327 imported_curated + imported_raw
3328 }
3329 ImportStrategy::Replace => {
3330 let snapshot = self.collect_records_for_stores(None).await?;
3331 let raw_snapshot = self.collect_all_raw_journal_records().await?;
3332
3333 if let Err(err) = self.clear_all_records().await {
3334 if let Err(restore_err) = self.restore_records(&snapshot).await {
3335 return Err(CerememoryError::ImportConflict(format!(
3336 "Replace import failed while clearing existing records: {err}. Rollback failed: {restore_err}"
3337 )));
3338 }
3339 return Err(err);
3340 }
3341 if let Err(err) = self.clear_raw_journal().await {
3342 let _ = self.restore_records(&snapshot).await;
3343 if let Err(restore_err) = self.restore_raw_journal(&raw_snapshot).await {
3344 return Err(CerememoryError::ImportConflict(format!(
3345 "Replace import failed while clearing raw journal: {err}. Rollback failed: {restore_err}"
3346 )));
3347 }
3348 return Err(CerememoryError::ImportConflict(format!(
3349 "Replace import failed while clearing raw journal: {err}"
3350 )));
3351 }
3352
3353 match async {
3354 let imported_curated = self
3355 .import_records_with_conflict_resolution(
3356 bundle.records,
3357 req.conflict_resolution,
3358 )
3359 .await?;
3360 let imported_raw = self
3361 .import_raw_records_with_conflict_resolution(
3362 bundle.raw_records,
3363 req.conflict_resolution,
3364 )
3365 .await?;
3366 Ok::<u32, CerememoryError>(imported_curated + imported_raw)
3367 }
3368 .await
3369 {
3370 Ok(imported) => imported,
3371 Err(err) => {
3372 if let Err(restore_err) = self.restore_records(&snapshot).await {
3373 return Err(CerememoryError::ImportConflict(format!(
3374 "Replace import failed: {err}. Rollback failed: {restore_err}"
3375 )));
3376 }
3377 if let Err(restore_err) = self.restore_raw_journal(&raw_snapshot).await {
3378 return Err(CerememoryError::ImportConflict(format!(
3379 "Replace import failed: {err}. Raw rollback failed: {restore_err}"
3380 )));
3381 }
3382 return Err(err);
3383 }
3384 }
3385 }
3386 };
3387
3388 info!(imported, strategy = ?req.strategy, "Import completed");
3389 Ok(imported)
3390 }
3391
3392 pub async fn import_records(&self, data: &[u8]) -> Result<u32, CerememoryError> {
3396 let records = cerememory_archive::import_records(data)?;
3397 let mut imported = 0u32;
3398
3399 for record in records {
3400 let store_type = record.store;
3401
3402 if self.get_store_record(&record.id).await?.is_some() {
3404 continue;
3405 }
3406
3407 dispatch_store!(self, store_type, store(record.clone()))?;
3409
3410 self.coordinator
3412 .register(record.id, store_type, record.associations.clone())
3413 .await;
3414 if let Err(e) = self.index_record(&record) {
3415 warn!(error = %e, record_id = %record.id, "Failed to index imported record");
3416 }
3417 imported += 1;
3418 }
3419
3420 info!(imported, "Import completed");
3421 Ok(imported)
3422 }
3423
3424 pub async fn collect_all_records(&self) -> Result<Vec<MemoryRecord>, CerememoryError> {
3426 self.collect_records_for_stores(None).await
3427 }
3428
3429 pub async fn collect_records_for_stores(
3431 &self,
3432 stores: Option<&[StoreType]>,
3433 ) -> Result<Vec<MemoryRecord>, CerememoryError> {
3434 let target_stores = stores.unwrap_or(&ALL_STORES);
3435
3436 let mut records = Vec::new();
3437 let mut seen_stores = HashSet::with_capacity(target_stores.len());
3438 for &store_type in target_stores {
3439 if !seen_stores.insert(store_type) {
3440 continue;
3441 }
3442
3443 let store_records = dispatch_store!(self, store_type, get_all())?;
3444 records.extend(store_records);
3445 }
3446 Ok(records)
3447 }
3448
3449 pub async fn introspect_stats(&self) -> Result<StatsResponse, CerememoryError> {
3453 let mut records_by_store = HashMap::new();
3454 let mut avg_fidelity_by_store = HashMap::new();
3455 let mut total_records = 0u32;
3456 let mut total_fidelity = 0.0f64;
3457 let mut dream_episodic_summaries = 0u32;
3458 let mut dream_semantic_nodes = 0u32;
3459 let mut last_dream_tick_at: Option<chrono::DateTime<Utc>> = None;
3460
3461 for store_type in ALL_STORES {
3462 let count = dispatch_store!(self, store_type, count())? as u32;
3463 records_by_store.insert(store_type, count);
3464 total_records += count;
3465
3466 if count > 0 {
3467 let records = dispatch_store!(self, store_type, get_all())?;
3468 let mut store_fidelity = 0.0f64;
3469 for record in &records {
3470 store_fidelity += record.fidelity.score;
3471 total_fidelity += record.fidelity.score;
3472
3473 if let Some(kind) = record
3474 .metadata
3475 .get("_dream")
3476 .and_then(|value| value.get("kind"))
3477 .and_then(|value| value.as_str())
3478 {
3479 match kind {
3480 "episodic_summary" if store_type == StoreType::Episodic => {
3481 dream_episodic_summaries += 1;
3482 }
3483 "semantic_summary" if store_type == StoreType::Semantic => {
3484 dream_semantic_nodes += 1;
3485 }
3486 _ => {}
3487 }
3488 }
3489 if let Some(timestamp) = record
3490 .metadata
3491 .get("_origin")
3492 .and_then(|value| value.get("dream_tick_at"))
3493 .and_then(|value| value.as_str())
3494 .and_then(|value| chrono::DateTime::parse_from_rfc3339(value).ok())
3495 .map(|value| value.with_timezone(&Utc))
3496 {
3497 if last_dream_tick_at.is_none_or(|current| timestamp > current) {
3498 last_dream_tick_at = Some(timestamp);
3499 }
3500 }
3501 }
3502 avg_fidelity_by_store.insert(store_type, store_fidelity / count as f64);
3503 }
3504 }
3505
3506 let raw_journal_all = self.raw_journal.get_all().await?;
3507 let raw_journal_records = raw_journal_all.len() as u32;
3508 let raw_journal_pending_dream = raw_journal_all
3509 .iter()
3510 .filter(|record| !Self::raw_record_processed(record))
3511 .count() as u32;
3512
3513 let avg_fidelity = if total_records > 0 {
3514 total_fidelity / total_records as f64
3515 } else {
3516 0.0
3517 };
3518
3519 Ok(StatsResponse {
3520 total_records,
3521 records_by_store,
3522 total_associations: self.coordinator.total_associations().await,
3523 avg_fidelity,
3524 avg_fidelity_by_store,
3525 oldest_record: None,
3526 newest_record: None,
3527 total_recall_count: 0,
3528 raw_journal_records,
3529 raw_journal_pending_dream,
3530 dream_episodic_summaries,
3531 dream_semantic_nodes,
3532 last_dream_tick_at,
3533 evolution_metrics: Some(self.evolution.get_metrics()),
3534 background_decay_enabled: self.is_background_decay_enabled().await,
3535 background_dream_enabled: self.is_background_dream_enabled().await,
3536 })
3537 }
3538
3539 pub async fn introspect_record(
3541 &self,
3542 req: RecordIntrospectRequest,
3543 ) -> Result<MemoryRecord, CerememoryError> {
3544 let (record, _) = self
3545 .get_store_record(&req.record_id)
3546 .await?
3547 .ok_or_else(|| CerememoryError::RecordNotFound(req.record_id.to_string()))?;
3548
3549 Ok(record)
3550 }
3551
3552 pub async fn introspect_decay_forecast(
3554 &self,
3555 req: DecayForecastRequest,
3556 ) -> Result<DecayForecastResponse, CerememoryError> {
3557 let mut forecasts = Vec::with_capacity(req.record_ids.len());
3558
3559 for record_id in &req.record_ids {
3560 let (record, _) = self
3561 .get_store_record(record_id)
3562 .await?
3563 .ok_or_else(|| CerememoryError::RecordNotFound(record_id.to_string()))?;
3564
3565 let current_fidelity = record.fidelity.score;
3566 let last_access_secs = record.last_accessed_at.timestamp() as f64;
3568 let last_tick_secs = record.fidelity.last_decay_tick.timestamp() as f64;
3569 let baseline_secs = last_access_secs.max(last_tick_secs);
3570 let forecast_secs = req.forecast_at.timestamp() as f64;
3571 let elapsed_secs = (forecast_secs - baseline_secs).max(0.0);
3572
3573 let decay_exponent = record.fidelity.decay_rate;
3575 let emotion_mod = cerememory_decay::math::compute_emotion_mod(record.emotion.intensity);
3576 let forecasted_fidelity = cerememory_decay::math::compute_fidelity(
3577 current_fidelity,
3578 elapsed_secs,
3579 record.fidelity.stability,
3580 decay_exponent,
3581 emotion_mod,
3582 );
3583
3584 let params = self.decay.params();
3586 let estimated_threshold_date = if current_fidelity > params.prune_threshold {
3587 Self::estimate_threshold_date(
3588 &record,
3589 decay_exponent,
3590 params.prune_threshold,
3591 emotion_mod,
3592 )
3593 } else {
3594 None
3596 };
3597
3598 forecasts.push(DecayForecast {
3599 record_id: *record_id,
3600 current_fidelity,
3601 forecasted_fidelity,
3602 estimated_threshold_date,
3603 });
3604 }
3605
3606 Ok(DecayForecastResponse { forecasts })
3607 }
3608
3609 fn estimate_threshold_date(
3611 record: &MemoryRecord,
3612 decay_exponent: f64,
3613 prune_threshold: f64,
3614 emotion_mod: f64,
3615 ) -> Option<chrono::DateTime<Utc>> {
3616 let base_time = record.last_accessed_at.max(record.fidelity.last_decay_tick);
3618 let f0 = record.fidelity.score;
3619 let stability = record.fidelity.stability;
3620
3621 let mut lo: f64 = 0.0;
3623 let mut hi: f64 = 315_360_000.0;
3624
3625 let f_hi = cerememory_decay::math::compute_fidelity(
3627 f0,
3628 hi,
3629 stability,
3630 decay_exponent,
3631 emotion_mod,
3632 );
3633 if f_hi >= prune_threshold {
3634 return None; }
3636
3637 for _ in 0..30 {
3639 let mid = (lo + hi) / 2.0;
3640 let f_mid = cerememory_decay::math::compute_fidelity(
3641 f0,
3642 mid,
3643 stability,
3644 decay_exponent,
3645 emotion_mod,
3646 );
3647 if f_mid > prune_threshold {
3648 lo = mid;
3649 } else {
3650 hi = mid;
3651 }
3652 }
3653
3654 let threshold_secs = ((lo + hi) / 2.0) as i64;
3655 Some(base_time + chrono::Duration::seconds(threshold_secs))
3656 }
3657
3658 pub async fn introspect_evolution(&self) -> Result<EvolutionMetrics, CerememoryError> {
3660 Ok(self.evolution.get_metrics())
3661 }
3662}
3663
3664use cerememory_core::truncate_str;
3666
3667fn apply_human_noise(content: &MemoryContent, fidelity: f64) -> MemoryContent {
3670 if fidelity >= 0.95 {
3671 return content.clone();
3672 }
3673
3674 let mut noised = content.clone();
3675 for block in &mut noised.blocks {
3676 if block.modality == Modality::Text {
3677 if let Ok(text) = std::str::from_utf8(&block.data) {
3678 let degraded = degrade_text(text, fidelity);
3679 block.data = degraded.into_bytes();
3680 }
3681 }
3682 }
3683 noised
3684}
3685
3686fn degrade_text(text: &str, fidelity: f64) -> String {
3688 if fidelity >= 0.9 {
3689 return text.to_string();
3690 }
3691
3692 let words: Vec<&str> = text.split_whitespace().collect();
3693 if words.is_empty() {
3694 return text.to_string();
3695 }
3696
3697 let degrade_fraction = (1.0 - fidelity).min(0.8);
3698 let step = (1.0 / degrade_fraction).max(2.0) as usize;
3699
3700 let mut result = Vec::with_capacity(words.len());
3701 for (i, word) in words.iter().enumerate() {
3702 if i % step == 0 {
3703 result.push("...");
3704 } else {
3705 result.push(word);
3706 }
3707 }
3708
3709 result.join(" ")
3710}
3711
3712#[cfg(test)]
3713mod tests {
3714 use super::*;
3715
3716 async fn make_engine() -> CerememoryEngine {
3717 CerememoryEngine::in_memory().unwrap()
3718 }
3719
3720 fn text_store_req(text: &str, store: Option<StoreType>) -> EncodeStoreRequest {
3721 EncodeStoreRequest {
3722 header: None,
3723 content: MemoryContent {
3724 blocks: vec![ContentBlock {
3725 modality: Modality::Text,
3726 format: "text/plain".to_string(),
3727 data: text.as_bytes().to_vec(),
3728 embedding: None,
3729 }],
3730 summary: None,
3731 },
3732 store,
3733 emotion: None,
3734 context: None,
3735 metadata: None,
3736 associations: None,
3737 }
3738 }
3739
3740 fn structured_store_req(json: &str, store: Option<StoreType>) -> EncodeStoreRequest {
3741 EncodeStoreRequest {
3742 header: None,
3743 content: MemoryContent {
3744 blocks: vec![ContentBlock {
3745 modality: Modality::Structured,
3746 format: "application/json".to_string(),
3747 data: json.as_bytes().to_vec(),
3748 embedding: None,
3749 }],
3750 summary: None,
3751 },
3752 store,
3753 emotion: None,
3754 context: None,
3755 metadata: None,
3756 associations: None,
3757 }
3758 }
3759
3760 fn raw_text_record(session_id: &str, text: &str) -> RawJournalRecord {
3761 RawJournalRecord::new_text(
3762 session_id,
3763 RawSource::Conversation,
3764 RawSpeaker::User,
3765 RawVisibility::Normal,
3766 SecrecyLevel::Public,
3767 text,
3768 )
3769 }
3770
3771 fn raw_text_store_req(
3772 session_id: &str,
3773 text: &str,
3774 visibility: RawVisibility,
3775 secrecy_level: SecrecyLevel,
3776 ) -> EncodeStoreRawRequest {
3777 EncodeStoreRawRequest {
3778 header: None,
3779 session_id: session_id.to_string(),
3780 turn_id: None,
3781 topic_id: None,
3782 source: RawSource::Conversation,
3783 speaker: RawSpeaker::User,
3784 visibility,
3785 secrecy_level,
3786 content: MemoryContent {
3787 blocks: vec![ContentBlock {
3788 modality: Modality::Text,
3789 format: "text/plain".to_string(),
3790 data: text.as_bytes().to_vec(),
3791 embedding: None,
3792 }],
3793 summary: None,
3794 },
3795 metadata: None,
3796 }
3797 }
3798
3799 #[tokio::test]
3800 async fn raw_journal_append_and_get_roundtrip() {
3801 let engine = make_engine().await;
3802 let record = raw_text_record("sess-raw-1", "hello from raw journal");
3803 let id = record.id;
3804
3805 let stored_id = engine.append_raw_journal(record).await.unwrap();
3806 assert_eq!(stored_id, id);
3807
3808 let restored = engine.get_raw_journal_record(&id).await.unwrap().unwrap();
3809 assert_eq!(restored.session_id, "sess-raw-1");
3810 assert_eq!(restored.text_content(), Some("hello from raw journal"));
3811 }
3812
3813 #[tokio::test]
3814 async fn raw_journal_query_session_filters_records() {
3815 let engine = make_engine().await;
3816 engine
3817 .append_raw_journal(raw_text_record("sess-a", "first"))
3818 .await
3819 .unwrap();
3820 engine
3821 .append_raw_journal(raw_text_record("sess-b", "second"))
3822 .await
3823 .unwrap();
3824 engine
3825 .append_raw_journal(raw_text_record("sess-a", "third"))
3826 .await
3827 .unwrap();
3828
3829 let records = engine.query_raw_journal_by_session("sess-a").await.unwrap();
3830 assert_eq!(records.len(), 2);
3831 assert!(records.iter().all(|record| record.session_id == "sess-a"));
3832 }
3833
3834 #[tokio::test]
3835 async fn raw_journal_count_tracks_records() {
3836 let engine = make_engine().await;
3837 assert_eq!(engine.raw_journal_count().await.unwrap(), 0);
3838
3839 engine
3840 .append_raw_journal(raw_text_record("sess-a", "first"))
3841 .await
3842 .unwrap();
3843 engine
3844 .append_raw_journal(raw_text_record("sess-a", "second"))
3845 .await
3846 .unwrap();
3847
3848 assert_eq!(engine.raw_journal_count().await.unwrap(), 2);
3849 }
3850
3851 #[tokio::test]
3852 async fn encode_store_raw_and_recall_raw_roundtrip() {
3853 let engine = make_engine().await;
3854 let response = engine
3855 .encode_store_raw(raw_text_store_req(
3856 "sess-raw",
3857 "forensic memory",
3858 RawVisibility::Normal,
3859 SecrecyLevel::Public,
3860 ))
3861 .await
3862 .unwrap();
3863
3864 let recalled = engine
3865 .recall_raw_query(RecallRawQueryRequest {
3866 header: None,
3867 session_id: Some("sess-raw".to_string()),
3868 query: Some("forensic".to_string()),
3869 temporal: None,
3870 limit: 10,
3871 include_private_scratch: false,
3872 include_sealed: false,
3873 secrecy_levels: None,
3874 })
3875 .await
3876 .unwrap();
3877
3878 assert_eq!(recalled.total_candidates, 1);
3879 assert_eq!(recalled.records.len(), 1);
3880 assert_eq!(recalled.records[0].id, response.record_id);
3881 assert_eq!(recalled.records[0].text_content(), Some("forensic memory"));
3882 }
3883
3884 #[tokio::test]
3885 async fn recall_raw_defaults_exclude_private_scratch_and_secret() {
3886 let engine = make_engine().await;
3887 engine
3888 .encode_batch_store_raw(EncodeBatchStoreRawRequest {
3889 header: None,
3890 records: vec![
3891 raw_text_store_req(
3892 "sess-secure",
3893 "public normal",
3894 RawVisibility::Normal,
3895 SecrecyLevel::Public,
3896 ),
3897 raw_text_store_req(
3898 "sess-secure",
3899 "private scratch",
3900 RawVisibility::PrivateScratch,
3901 SecrecyLevel::Sensitive,
3902 ),
3903 raw_text_store_req(
3904 "sess-secure",
3905 "sealed secret",
3906 RawVisibility::Sealed,
3907 SecrecyLevel::Secret,
3908 ),
3909 ],
3910 })
3911 .await
3912 .unwrap();
3913
3914 let recalled = engine
3915 .recall_raw_query(RecallRawQueryRequest {
3916 header: None,
3917 session_id: Some("sess-secure".to_string()),
3918 query: None,
3919 temporal: None,
3920 limit: 10,
3921 include_private_scratch: false,
3922 include_sealed: false,
3923 secrecy_levels: None,
3924 })
3925 .await
3926 .unwrap();
3927
3928 assert_eq!(recalled.total_candidates, 1);
3929 assert_eq!(recalled.records[0].text_content(), Some("public normal"));
3930 }
3931
3932 #[tokio::test]
3933 async fn raw_journal_does_not_leak_into_normal_recall() {
3934 let engine = make_engine().await;
3935 engine
3936 .encode_store_raw(raw_text_store_req(
3937 "sess-leak",
3938 "hidden raw only",
3939 RawVisibility::Normal,
3940 SecrecyLevel::Public,
3941 ))
3942 .await
3943 .unwrap();
3944
3945 let response = engine
3946 .recall_query(RecallQueryRequest {
3947 header: None,
3948 cue: RecallCue {
3949 text: Some("hidden raw".to_string()),
3950 ..Default::default()
3951 },
3952 stores: None,
3953 limit: 10,
3954 min_fidelity: None,
3955 include_decayed: false,
3956 reconsolidate: false,
3957 activation_depth: 0,
3958 recall_mode: RecallMode::Perfect,
3959 })
3960 .await
3961 .unwrap();
3962
3963 assert_eq!(response.total_candidates, 0);
3964 assert!(response.memories.is_empty());
3965 }
3966
3967 #[tokio::test]
3968 async fn dream_tick_creates_episodic_summary_and_marks_raw_processed() {
3969 let engine = make_engine().await;
3970 let first = engine
3971 .encode_store_raw(raw_text_store_req(
3972 "sess-dream",
3973 "Discussed API timeout policy",
3974 RawVisibility::Normal,
3975 SecrecyLevel::Public,
3976 ))
3977 .await
3978 .unwrap();
3979 let second = engine
3980 .encode_store_raw(raw_text_store_req(
3981 "sess-dream",
3982 "Decided to keep retries idempotent-only",
3983 RawVisibility::Normal,
3984 SecrecyLevel::Public,
3985 ))
3986 .await
3987 .unwrap();
3988
3989 let resp = engine
3990 .lifecycle_dream_tick(DreamTickRequest {
3991 header: None,
3992 session_id: Some("sess-dream".to_string()),
3993 dry_run: false,
3994 max_groups: 10,
3995 include_private_scratch: false,
3996 include_sealed: false,
3997 promote_semantic: true,
3998 secrecy_levels: None,
3999 })
4000 .await
4001 .unwrap();
4002
4003 assert_eq!(resp.groups_processed, 1);
4004 assert_eq!(resp.raw_records_processed, 2);
4005 assert_eq!(resp.episodic_summaries_created, 1);
4006 assert_eq!(resp.semantic_nodes_created, 1);
4007
4008 let summaries = engine.episodic.get_all().await.unwrap();
4009 assert_eq!(summaries.len(), 1);
4010 let summary = &summaries[0];
4011 let semantic_records = engine.semantic.get_all().await.unwrap();
4012 assert_eq!(semantic_records.len(), 1);
4013 let semantic = &semantic_records[0];
4014 assert_eq!(summary.store, StoreType::Episodic);
4015 assert_eq!(summary.metadata["_origin"]["raw_session_id"], "sess-dream");
4016 assert_eq!(summary.metadata["_origin"]["raw_record_count"], 2);
4017 assert_eq!(
4018 summary.metadata["_origin"]["raw_record_ids"][0],
4019 first.record_id.to_string()
4020 );
4021 assert_eq!(
4022 summary.metadata["_origin"]["raw_record_ids"][1],
4023 second.record_id.to_string()
4024 );
4025
4026 let raw_one = engine
4027 .get_raw_journal_record(&first.record_id)
4028 .await
4029 .unwrap()
4030 .unwrap();
4031 let raw_two = engine
4032 .get_raw_journal_record(&second.record_id)
4033 .await
4034 .unwrap()
4035 .unwrap();
4036 assert_eq!(raw_one.derived_memory_ids, vec![summary.id, semantic.id]);
4037 assert_eq!(raw_two.derived_memory_ids, vec![summary.id, semantic.id]);
4038 assert_eq!(
4039 raw_one.metadata["_dream"]["last_summary_id"],
4040 serde_json::Value::String(summary.id.to_string())
4041 );
4042 assert!(raw_one.metadata["_dream"]["processed_at"].is_string());
4043 }
4044
4045 #[tokio::test]
4046 async fn dream_tick_is_idempotent_for_processed_raw_records() {
4047 let engine = make_engine().await;
4048 engine
4049 .encode_store_raw(raw_text_store_req(
4050 "sess-dream-repeat",
4051 "First raw note",
4052 RawVisibility::Normal,
4053 SecrecyLevel::Public,
4054 ))
4055 .await
4056 .unwrap();
4057
4058 let first = engine
4059 .lifecycle_dream_tick(DreamTickRequest {
4060 header: None,
4061 session_id: Some("sess-dream-repeat".to_string()),
4062 dry_run: false,
4063 max_groups: 10,
4064 include_private_scratch: false,
4065 include_sealed: false,
4066 promote_semantic: true,
4067 secrecy_levels: None,
4068 })
4069 .await
4070 .unwrap();
4071 let second = engine
4072 .lifecycle_dream_tick(DreamTickRequest {
4073 header: None,
4074 session_id: Some("sess-dream-repeat".to_string()),
4075 dry_run: false,
4076 max_groups: 10,
4077 include_private_scratch: false,
4078 include_sealed: false,
4079 promote_semantic: true,
4080 secrecy_levels: None,
4081 })
4082 .await
4083 .unwrap();
4084
4085 assert_eq!(first.episodic_summaries_created, 1);
4086 assert_eq!(first.semantic_nodes_created, 0);
4087 assert_eq!(second.episodic_summaries_created, 0);
4088 assert_eq!(second.semantic_nodes_created, 0);
4089 assert_eq!(engine.episodic.count().await.unwrap(), 1);
4090 }
4091
4092 #[tokio::test]
4093 async fn dream_tick_redacts_sealed_secret_and_private_scratch_content() {
4094 let engine = make_engine().await;
4095 engine
4096 .encode_batch_store_raw(EncodeBatchStoreRawRequest {
4097 header: None,
4098 records: vec![
4099 raw_text_store_req(
4100 "sess-dream-redact",
4101 "Visible public note",
4102 RawVisibility::Normal,
4103 SecrecyLevel::Public,
4104 ),
4105 raw_text_store_req(
4106 "sess-dream-redact",
4107 "Private scratch hypothesis",
4108 RawVisibility::PrivateScratch,
4109 SecrecyLevel::Sensitive,
4110 ),
4111 raw_text_store_req(
4112 "sess-dream-redact",
4113 "Sealed customer secret",
4114 RawVisibility::Sealed,
4115 SecrecyLevel::Secret,
4116 ),
4117 ],
4118 })
4119 .await
4120 .unwrap();
4121
4122 let resp = engine
4123 .lifecycle_dream_tick(DreamTickRequest {
4124 header: None,
4125 session_id: Some("sess-dream-redact".to_string()),
4126 dry_run: false,
4127 max_groups: 10,
4128 include_private_scratch: true,
4129 include_sealed: true,
4130 promote_semantic: true,
4131 secrecy_levels: Some(vec![
4132 SecrecyLevel::Public,
4133 SecrecyLevel::Sensitive,
4134 SecrecyLevel::Secret,
4135 ]),
4136 })
4137 .await
4138 .unwrap();
4139
4140 assert_eq!(resp.groups_processed, 1);
4141 assert_eq!(resp.semantic_nodes_created, 0);
4142 let summaries = engine.episodic.get_all().await.unwrap();
4143 assert_eq!(summaries.len(), 1);
4144 let summary = &summaries[0];
4145
4146 let summary_text = summary.text_content().unwrap_or("");
4147 assert!(summary_text.contains("Visible public note"));
4148 assert!(!summary_text.contains("Private scratch hypothesis"));
4149 assert!(!summary_text.contains("Sealed customer secret"));
4150 assert!(summary_text.contains("Redacted raw records: 2"));
4151
4152 assert_eq!(
4153 summary.metadata["_dream"]["summary_stats"]["normal_records"],
4154 1
4155 );
4156 assert_eq!(
4157 summary.metadata["_dream"]["summary_stats"]["private_scratch_redacted"],
4158 1
4159 );
4160 assert_eq!(
4161 summary.metadata["_dream"]["summary_stats"]["secret_redacted"],
4162 1
4163 );
4164 }
4165
4166 #[tokio::test]
4167 async fn dream_tick_infers_multiple_topics_with_time_gap_and_lexical_shift() {
4168 let engine = make_engine().await;
4169 let base = Utc::now() - chrono::Duration::hours(2);
4170
4171 let mut first = raw_text_record("sess-topic-infer", "API timeout retries idempotent only");
4172 first.created_at = base;
4173 first.updated_at = base;
4174
4175 let mut second = raw_text_record("sess-topic-infer", "Backoff budget and timeout policy");
4176 second.created_at = base + chrono::Duration::minutes(5);
4177 second.updated_at = second.created_at;
4178
4179 let mut third = raw_text_record("sess-topic-infer", "Landing page hero typography palette");
4180 third.created_at = base + chrono::Duration::minutes(25);
4181 third.updated_at = third.created_at;
4182
4183 engine.append_raw_journal(first).await.unwrap();
4184 engine.append_raw_journal(second).await.unwrap();
4185 engine.append_raw_journal(third).await.unwrap();
4186
4187 let resp = engine
4188 .lifecycle_dream_tick(DreamTickRequest {
4189 header: None,
4190 session_id: Some("sess-topic-infer".to_string()),
4191 dry_run: false,
4192 max_groups: 10,
4193 include_private_scratch: false,
4194 include_sealed: false,
4195 promote_semantic: true,
4196 secrecy_levels: None,
4197 })
4198 .await
4199 .unwrap();
4200
4201 assert_eq!(resp.groups_processed, 2);
4202 assert_eq!(resp.episodic_summaries_created, 2);
4203 assert_eq!(resp.semantic_nodes_created, 1);
4204
4205 let summaries = engine.episodic.get_all().await.unwrap();
4206 assert_eq!(summaries.len(), 2);
4207 assert!(summaries.iter().all(|summary| {
4208 summary.metadata["_origin"]["raw_topic_inferred"] == serde_json::Value::Bool(true)
4209 }));
4210 assert!(summaries
4211 .iter()
4212 .all(|summary| { summary.metadata["_origin"]["raw_topic_hint"].is_string() }));
4213 }
4214
4215 #[tokio::test]
4216 async fn encode_recall_roundtrip() {
4217 let engine = make_engine().await;
4218
4219 let resp = engine
4220 .encode_store(text_store_req(
4221 "The quick brown fox",
4222 Some(StoreType::Episodic),
4223 ))
4224 .await
4225 .unwrap();
4226 assert_eq!(resp.store, StoreType::Episodic);
4227 assert_eq!(resp.initial_fidelity, 1.0);
4228
4229 let query = RecallQueryRequest {
4230 header: None,
4231 cue: RecallCue {
4232 text: Some("quick brown".to_string()),
4233 ..Default::default()
4234 },
4235 stores: None,
4236 limit: 10,
4237 min_fidelity: None,
4238 include_decayed: false,
4239 reconsolidate: true,
4240 activation_depth: 0,
4241 recall_mode: RecallMode::Perfect,
4242 };
4243
4244 let recall_resp = engine.recall_query(query).await.unwrap();
4245 assert!(!recall_resp.memories.is_empty());
4246 assert_eq!(
4247 recall_resp.memories[0].record.text_content(),
4248 Some("The quick brown fox")
4249 );
4250 }
4251
4252 #[tokio::test]
4253 async fn encode_store_persists_metadata_and_context() {
4254 let engine = make_engine().await;
4255
4256 let resp = engine
4257 .encode_store(EncodeStoreRequest {
4258 header: None,
4259 content: MemoryContent {
4260 blocks: vec![ContentBlock {
4261 modality: Modality::Text,
4262 format: "text/plain".to_string(),
4263 data: b"metadata roundtrip".to_vec(),
4264 embedding: None,
4265 }],
4266 summary: None,
4267 },
4268 store: Some(StoreType::Episodic),
4269 emotion: None,
4270 context: Some(EncodeContext {
4271 source: Some("chat".to_string()),
4272 session_id: Some("sess-1".to_string()),
4273 spatial: None,
4274 temporal: None,
4275 }),
4276 metadata: Some(serde_json::json!({"tag": "important"})),
4277 associations: None,
4278 })
4279 .await
4280 .unwrap();
4281
4282 let record = engine
4283 .introspect_record(RecordIntrospectRequest {
4284 header: None,
4285 record_id: resp.record_id,
4286 include_history: false,
4287 include_associations: false,
4288 include_versions: false,
4289 })
4290 .await
4291 .unwrap();
4292 assert_eq!(record.metadata["tag"], "important");
4293 assert_eq!(record.metadata["_context"]["source"], "chat");
4294 assert_eq!(record.metadata["_context"]["session_id"], "sess-1");
4295 }
4296
4297 #[tokio::test]
4298 async fn tantivy_tokenized_search() {
4299 let engine = make_engine().await;
4300
4301 engine
4302 .encode_store(text_store_req(
4303 "The quick brown fox jumps over the lazy dog",
4304 Some(StoreType::Episodic),
4305 ))
4306 .await
4307 .unwrap();
4308
4309 let query = RecallQueryRequest {
4311 header: None,
4312 cue: RecallCue {
4313 text: Some("quick".to_string()),
4314 ..Default::default()
4315 },
4316 stores: None,
4317 limit: 10,
4318 min_fidelity: None,
4319 include_decayed: false,
4320 reconsolidate: false,
4321 activation_depth: 0,
4322 recall_mode: RecallMode::Perfect,
4323 };
4324
4325 let resp = engine.recall_query(query).await.unwrap();
4326 assert!(!resp.memories.is_empty());
4327 assert_eq!(
4328 resp.memories[0].record.text_content(),
4329 Some("The quick brown fox jumps over the lazy dog")
4330 );
4331 }
4332
4333 #[tokio::test]
4334 async fn vector_search_recall() {
4335 let engine = make_engine().await;
4336
4337 let req = EncodeStoreRequest {
4339 header: None,
4340 content: MemoryContent {
4341 blocks: vec![ContentBlock {
4342 modality: Modality::Text,
4343 format: "text/plain".to_string(),
4344 data: b"Cats are fluffy animals".to_vec(),
4345 embedding: Some(vec![1.0, 0.0, 0.0]),
4346 }],
4347 summary: None,
4348 },
4349 store: Some(StoreType::Episodic),
4350 emotion: None,
4351 context: None,
4352 metadata: None,
4353 associations: None,
4354 };
4355 engine.encode_store(req).await.unwrap();
4356
4357 let query = RecallQueryRequest {
4359 header: None,
4360 cue: RecallCue {
4361 embedding: Some(vec![1.0, 0.1, 0.0]),
4362 ..Default::default()
4363 },
4364 stores: None,
4365 limit: 10,
4366 min_fidelity: None,
4367 include_decayed: false,
4368 reconsolidate: false,
4369 activation_depth: 0,
4370 recall_mode: RecallMode::Perfect,
4371 };
4372
4373 let resp = engine.recall_query(query).await.unwrap();
4374 assert!(!resp.memories.is_empty());
4375 assert_eq!(
4376 resp.memories[0].record.text_content(),
4377 Some("Cats are fluffy animals")
4378 );
4379 }
4380
4381 #[tokio::test]
4382 async fn hybrid_text_vector_search() {
4383 let engine = make_engine().await;
4384
4385 let req1 = EncodeStoreRequest {
4387 header: None,
4388 content: MemoryContent {
4389 blocks: vec![ContentBlock {
4390 modality: Modality::Text,
4391 format: "text/plain".to_string(),
4392 data: b"Machine learning is fascinating".to_vec(),
4393 embedding: Some(vec![1.0, 0.0, 0.0]),
4394 }],
4395 summary: None,
4396 },
4397 store: Some(StoreType::Episodic),
4398 emotion: None,
4399 context: None,
4400 metadata: None,
4401 associations: None,
4402 };
4403 engine.encode_store(req1).await.unwrap();
4404
4405 let query = RecallQueryRequest {
4407 header: None,
4408 cue: RecallCue {
4409 text: Some("machine learning".to_string()),
4410 embedding: Some(vec![1.0, 0.0, 0.0]),
4411 ..Default::default()
4412 },
4413 stores: None,
4414 limit: 10,
4415 min_fidelity: None,
4416 include_decayed: false,
4417 reconsolidate: false,
4418 activation_depth: 0,
4419 recall_mode: RecallMode::Perfect,
4420 };
4421
4422 let resp = engine.recall_query(query).await.unwrap();
4423 assert!(!resp.memories.is_empty());
4424 }
4425
4426 #[tokio::test]
4427 async fn recall_query_metadata_counts_temporal_only_searches() {
4428 let engine = make_engine().await;
4429
4430 engine
4431 .encode_store(text_store_req(
4432 "Temporal-only query metadata should count scanned records",
4433 Some(StoreType::Episodic),
4434 ))
4435 .await
4436 .unwrap();
4437
4438 let now = Utc::now();
4439 let resp = engine
4440 .recall_query(RecallQueryRequest {
4441 header: None,
4442 cue: RecallCue {
4443 temporal: Some(TemporalRange {
4444 start: now - chrono::Duration::minutes(1),
4445 end: now + chrono::Duration::minutes(1),
4446 }),
4447 ..Default::default()
4448 },
4449 stores: None,
4450 limit: 10,
4451 min_fidelity: None,
4452 include_decayed: false,
4453 reconsolidate: false,
4454 activation_depth: 0,
4455 recall_mode: RecallMode::Perfect,
4456 })
4457 .await
4458 .unwrap();
4459
4460 let metadata = resp
4461 .query_metadata
4462 .expect("query metadata should be present");
4463 assert_eq!(metadata.total_records_scanned, 1);
4464 assert_eq!(resp.memories.len(), 1);
4465 }
4466
4467 #[tokio::test]
4468 async fn temporal_query_filters_activation_results_across_stores() {
4469 let engine = make_engine().await;
4470
4471 let mut old_semantic = MemoryRecord::new_text(StoreType::Semantic, "alpha out of range");
4472 old_semantic.created_at = Utc::now() - chrono::Duration::days(7);
4473 old_semantic.updated_at = old_semantic.created_at;
4474 old_semantic.last_accessed_at = old_semantic.created_at;
4475 engine.semantic.store(old_semantic.clone()).await.unwrap();
4476 engine
4477 .coordinator
4478 .register(
4479 old_semantic.id,
4480 StoreType::Semantic,
4481 old_semantic.associations.clone(),
4482 )
4483 .await;
4484 engine.index_record(&old_semantic).unwrap();
4485
4486 let current = MemoryRecord {
4487 associations: vec![Association {
4488 target_id: old_semantic.id,
4489 association_type: AssociationType::Semantic,
4490 weight: 0.9,
4491 created_at: Utc::now(),
4492 last_co_activation: Utc::now(),
4493 }],
4494 ..MemoryRecord::new_text(StoreType::Episodic, "alpha in range")
4495 };
4496 engine.episodic.store(current.clone()).await.unwrap();
4497 engine
4498 .coordinator
4499 .register(
4500 current.id,
4501 StoreType::Episodic,
4502 current.associations.clone(),
4503 )
4504 .await;
4505 engine.index_record(¤t).unwrap();
4506
4507 let now = Utc::now();
4508 let resp = engine
4509 .recall_query(RecallQueryRequest {
4510 header: None,
4511 cue: RecallCue {
4512 text: Some("alpha".to_string()),
4513 temporal: Some(TemporalRange {
4514 start: now - chrono::Duration::minutes(1),
4515 end: now + chrono::Duration::minutes(1),
4516 }),
4517 ..Default::default()
4518 },
4519 stores: None,
4520 limit: 10,
4521 min_fidelity: None,
4522 include_decayed: false,
4523 reconsolidate: false,
4524 activation_depth: 1,
4525 recall_mode: RecallMode::Perfect,
4526 })
4527 .await
4528 .unwrap();
4529
4530 assert_eq!(resp.memories.len(), 1);
4531 assert_eq!(resp.memories[0].record.id, current.id);
4532 }
4533
4534 #[tokio::test]
4535 async fn encode_batch_with_associations() {
4536 let engine = make_engine().await;
4537
4538 let batch = EncodeBatchRequest {
4539 header: None,
4540 records: vec![
4541 text_store_req("First memory", Some(StoreType::Episodic)),
4542 text_store_req("Second memory", Some(StoreType::Episodic)),
4543 text_store_req("Third memory", Some(StoreType::Episodic)),
4544 ],
4545 infer_associations: true,
4546 };
4547
4548 let resp = engine.encode_batch(batch).await.unwrap();
4549 assert_eq!(resp.results.len(), 3);
4550 assert_eq!(resp.associations_inferred, 4);
4551 }
4552
4553 #[tokio::test]
4554 async fn encode_batch_associations_survive_rebuild() {
4555 let engine = make_engine().await;
4556
4557 let resp = engine
4558 .encode_batch(EncodeBatchRequest {
4559 header: None,
4560 records: vec![
4561 text_store_req("Persisted first", Some(StoreType::Episodic)),
4562 text_store_req("Persisted second", Some(StoreType::Episodic)),
4563 ],
4564 infer_associations: true,
4565 })
4566 .await
4567 .unwrap();
4568
4569 engine.rebuild_coordinator().await.unwrap();
4570
4571 let first = engine
4572 .introspect_record(RecordIntrospectRequest {
4573 header: None,
4574 record_id: resp.results[0].record_id,
4575 include_history: false,
4576 include_associations: true,
4577 include_versions: false,
4578 })
4579 .await
4580 .unwrap();
4581 assert_eq!(first.associations.len(), 1);
4582
4583 let assoc_resp = engine
4584 .recall_associate(RecallAssociateRequest {
4585 header: None,
4586 record_id: resp.results[0].record_id,
4587 association_types: Some(vec![AssociationType::Sequential]),
4588 depth: 1,
4589 min_weight: 0.1,
4590 limit: 10,
4591 })
4592 .await
4593 .unwrap();
4594 assert_eq!(assoc_resp.memories.len(), 1);
4595 assert_eq!(assoc_resp.memories[0].record.id, resp.results[1].record_id);
4596 }
4597
4598 #[tokio::test]
4599 async fn decay_tick_integration() {
4600 let engine = make_engine().await;
4601
4602 for i in 0..5 {
4603 engine
4604 .encode_store(text_store_req(
4605 &format!("Memory {i}"),
4606 Some(StoreType::Episodic),
4607 ))
4608 .await
4609 .unwrap();
4610 }
4611
4612 let resp = engine
4613 .lifecycle_decay_tick(DecayTickRequest {
4614 header: None,
4615 tick_duration_seconds: Some(86400),
4616 })
4617 .await
4618 .unwrap();
4619
4620 assert_eq!(resp.records_updated, 5);
4621 }
4622
4623 #[tokio::test]
4624 async fn forget_requires_confirmation() {
4625 let engine = make_engine().await;
4626
4627 let result = engine
4628 .lifecycle_forget(ForgetRequest {
4629 header: None,
4630 record_ids: None,
4631 store: None,
4632 temporal_range: None,
4633 cascade: false,
4634 confirm: false,
4635 })
4636 .await;
4637
4638 assert!(matches!(result, Err(CerememoryError::ForgetUnconfirmed)));
4639 }
4640
4641 #[tokio::test]
4642 async fn forget_deletes_records() {
4643 let engine = make_engine().await;
4644
4645 let resp = engine
4646 .encode_store(text_store_req("To forget", Some(StoreType::Episodic)))
4647 .await
4648 .unwrap();
4649
4650 let deleted = engine
4651 .lifecycle_forget(ForgetRequest {
4652 header: None,
4653 record_ids: Some(vec![resp.record_id]),
4654 store: None,
4655 temporal_range: None,
4656 cascade: false,
4657 confirm: true,
4658 })
4659 .await
4660 .unwrap();
4661
4662 assert_eq!(deleted, 1);
4663 let record = engine.get_store_record(&resp.record_id).await.unwrap();
4664 assert!(record.is_none());
4665
4666 let hits = engine.text_index.search("forget", None, 10).unwrap();
4668 assert!(hits.is_empty());
4669 }
4670
4671 #[tokio::test]
4672 async fn forget_temporal_range_deletes_matching_records() {
4673 let engine = make_engine().await;
4674
4675 let mut old_record = MemoryRecord::new_text(StoreType::Episodic, "old");
4676 old_record.created_at = Utc::now() - chrono::Duration::days(2);
4677 old_record.updated_at = old_record.created_at;
4678 old_record.last_accessed_at = old_record.created_at;
4679 engine.episodic.store(old_record.clone()).await.unwrap();
4680 engine
4681 .coordinator
4682 .register(
4683 old_record.id,
4684 StoreType::Episodic,
4685 old_record.associations.clone(),
4686 )
4687 .await;
4688 engine.index_record(&old_record).unwrap();
4689
4690 let current = MemoryRecord::new_text(StoreType::Episodic, "current");
4691 engine.episodic.store(current.clone()).await.unwrap();
4692 engine
4693 .coordinator
4694 .register(
4695 current.id,
4696 StoreType::Episodic,
4697 current.associations.clone(),
4698 )
4699 .await;
4700 engine.index_record(¤t).unwrap();
4701
4702 let deleted = engine
4703 .lifecycle_forget(ForgetRequest {
4704 header: None,
4705 record_ids: None,
4706 store: None,
4707 temporal_range: Some(TemporalRange {
4708 start: Utc::now() - chrono::Duration::minutes(1),
4709 end: Utc::now() + chrono::Duration::minutes(1),
4710 }),
4711 cascade: false,
4712 confirm: true,
4713 })
4714 .await
4715 .unwrap();
4716
4717 assert_eq!(deleted, 1);
4718 assert!(engine
4719 .get_store_record(¤t.id)
4720 .await
4721 .unwrap()
4722 .is_none());
4723 assert!(engine
4724 .get_store_record(&old_record.id)
4725 .await
4726 .unwrap()
4727 .is_some());
4728 }
4729
4730 #[tokio::test]
4731 async fn mode_switch() {
4732 let engine = make_engine().await;
4733
4734 engine
4735 .lifecycle_set_mode(SetModeRequest {
4736 header: None,
4737 mode: RecallMode::Perfect,
4738 scope: None,
4739 })
4740 .await
4741 .unwrap();
4742
4743 assert_eq!(*engine.recall_mode.read().await, RecallMode::Perfect);
4744 }
4745
4746 #[tokio::test]
4747 async fn introspect_stats() {
4748 let engine = make_engine().await;
4749
4750 engine
4751 .encode_store(text_store_req("Stats test", Some(StoreType::Episodic)))
4752 .await
4753 .unwrap();
4754 engine
4755 .encode_store_raw(raw_text_store_req(
4756 "sess-stats",
4757 "Raw stats note",
4758 RawVisibility::Normal,
4759 SecrecyLevel::Public,
4760 ))
4761 .await
4762 .unwrap();
4763
4764 let stats = engine.introspect_stats().await.unwrap();
4765 assert_eq!(stats.total_records, 1);
4766 assert_eq!(stats.records_by_store[&StoreType::Episodic], 1);
4767 assert!((stats.avg_fidelity - 1.0).abs() < f64::EPSILON);
4768 assert!(!stats.background_decay_enabled);
4769 assert!(!stats.background_dream_enabled);
4770 assert_eq!(stats.raw_journal_records, 1);
4771 assert_eq!(stats.raw_journal_pending_dream, 1);
4772 }
4773
4774 #[tokio::test]
4775 async fn auto_store_routing() {
4776 let engine = make_engine().await;
4777
4778 let resp1 = engine
4779 .encode_store(text_store_req("An event", None))
4780 .await
4781 .unwrap();
4782 assert_eq!(resp1.store, StoreType::Episodic);
4783
4784 let req = EncodeStoreRequest {
4785 header: None,
4786 content: MemoryContent {
4787 blocks: vec![ContentBlock {
4788 modality: Modality::Text,
4789 format: "text/plain".to_string(),
4790 data: b"Rust is a systems language".to_vec(),
4791 embedding: None,
4792 }],
4793 summary: Some("Rust programming facts".to_string()),
4794 },
4795 store: None,
4796 emotion: None,
4797 context: None,
4798 metadata: None,
4799 associations: None,
4800 };
4801 let resp2 = engine.encode_store(req).await.unwrap();
4802 assert_eq!(resp2.store, StoreType::Semantic);
4803 }
4804
4805 #[tokio::test]
4806 async fn human_mode_degrades_content() {
4807 let content = MemoryContent {
4808 blocks: vec![ContentBlock {
4809 modality: Modality::Text,
4810 format: "text/plain".to_string(),
4811 data: b"The quick brown fox jumps over the lazy dog".to_vec(),
4812 embedding: None,
4813 }],
4814 summary: None,
4815 };
4816
4817 let result = apply_human_noise(&content, 0.95);
4818 assert_eq!(result.blocks[0].data, content.blocks[0].data);
4819
4820 let result = apply_human_noise(&content, 0.3);
4821 let text = std::str::from_utf8(&result.blocks[0].data).unwrap();
4822 assert!(text.contains("..."));
4823 }
4824
4825 #[tokio::test]
4826 async fn encode_update() {
4827 let engine = make_engine().await;
4828
4829 let resp = engine
4830 .encode_store(text_store_req("Original", Some(StoreType::Episodic)))
4831 .await
4832 .unwrap();
4833
4834 engine
4835 .encode_update(EncodeUpdateRequest {
4836 header: None,
4837 record_id: resp.record_id,
4838 content: Some(MemoryContent {
4839 blocks: vec![ContentBlock {
4840 modality: Modality::Text,
4841 format: "text/plain".to_string(),
4842 data: b"Updated content".to_vec(),
4843 embedding: None,
4844 }],
4845 summary: None,
4846 }),
4847 emotion: None,
4848 metadata: None,
4849 })
4850 .await
4851 .unwrap();
4852
4853 let record = engine
4854 .introspect_record(RecordIntrospectRequest {
4855 header: None,
4856 record_id: resp.record_id,
4857 include_history: false,
4858 include_associations: false,
4859 include_versions: false,
4860 })
4861 .await
4862 .unwrap();
4863
4864 assert_eq!(record.text_content(), Some("Updated content"));
4865
4866 let hits = engine.text_index.search("Updated", None, 10).unwrap();
4868 assert_eq!(hits.len(), 1);
4869 let hits = engine.text_index.search("Original", None, 10).unwrap();
4870 assert!(hits.is_empty());
4871 }
4872
4873 #[tokio::test]
4874 async fn encode_update_refreshes_structured_index() {
4875 let engine = make_engine().await;
4876
4877 let resp = engine
4878 .encode_store(EncodeStoreRequest {
4879 header: None,
4880 content: MemoryContent {
4881 blocks: vec![ContentBlock {
4882 modality: Modality::Structured,
4883 format: "application/json".to_string(),
4884 data: br#"{"user":{"name":"Alice"}}"#.to_vec(),
4885 embedding: None,
4886 }],
4887 summary: None,
4888 },
4889 store: Some(StoreType::Episodic),
4890 emotion: None,
4891 context: None,
4892 metadata: None,
4893 associations: None,
4894 })
4895 .await
4896 .unwrap();
4897
4898 assert_eq!(
4899 engine.text_index.search("Alice", None, 10).unwrap().len(),
4900 1
4901 );
4902
4903 engine
4904 .encode_update(EncodeUpdateRequest {
4905 header: None,
4906 record_id: resp.record_id,
4907 content: Some(MemoryContent {
4908 blocks: vec![ContentBlock {
4909 modality: Modality::Structured,
4910 format: "application/json".to_string(),
4911 data: br#"{"user":{"name":"Bob"}}"#.to_vec(),
4912 embedding: None,
4913 }],
4914 summary: None,
4915 }),
4916 emotion: None,
4917 metadata: None,
4918 })
4919 .await
4920 .unwrap();
4921
4922 assert!(engine
4923 .text_index
4924 .search("Alice", None, 10)
4925 .unwrap()
4926 .is_empty());
4927 assert_eq!(engine.text_index.search("Bob", None, 10).unwrap().len(), 1);
4928 }
4929
4930 #[tokio::test]
4931 async fn rebuild_coordinator_reindexes_structured_records() {
4932 let engine = make_engine().await;
4933
4934 let resp = engine
4935 .encode_store(EncodeStoreRequest {
4936 header: None,
4937 content: MemoryContent {
4938 blocks: vec![ContentBlock {
4939 modality: Modality::Structured,
4940 format: "application/json".to_string(),
4941 data: br#"{"project":{"name":"Cerememory"}}"#.to_vec(),
4942 embedding: None,
4943 }],
4944 summary: None,
4945 },
4946 store: Some(StoreType::Episodic),
4947 emotion: None,
4948 context: None,
4949 metadata: None,
4950 associations: None,
4951 })
4952 .await
4953 .unwrap();
4954
4955 engine.text_index.remove(resp.record_id).unwrap();
4956 assert!(engine
4957 .text_index
4958 .search("Cerememory", None, 10)
4959 .unwrap()
4960 .is_empty());
4961
4962 engine.rebuild_coordinator().await.unwrap();
4963
4964 assert_eq!(
4965 engine
4966 .text_index
4967 .search("Cerememory", None, 10)
4968 .unwrap()
4969 .len(),
4970 1
4971 );
4972 }
4973
4974 #[tokio::test]
4975 async fn background_decay_runs() {
4976 let config = EngineConfig {
4977 background_decay_interval_secs: Some(1), ..EngineConfig::default()
4979 };
4980 let engine = Arc::new(CerememoryEngine::new(config).unwrap());
4981
4982 engine
4984 .encode_store(text_store_req("Decay me", Some(StoreType::Episodic)))
4985 .await
4986 .unwrap();
4987
4988 engine.start_background_decay();
4989 assert!(engine.is_background_decay_enabled().await);
4990
4991 tokio::time::sleep(std::time::Duration::from_millis(2500)).await;
4993
4994 engine.stop_background_decay().await;
4995 assert!(!engine.is_background_decay_enabled().await);
4996 }
4997
4998 #[tokio::test]
4999 async fn background_decay_disabled_by_default() {
5000 let engine = Arc::new(make_engine().await);
5001 engine.start_background_decay(); assert!(!engine.is_background_decay_enabled().await);
5003 }
5004
5005 #[tokio::test]
5006 async fn background_dream_runs() {
5007 let config = EngineConfig {
5008 background_dream_interval_secs: Some(1),
5009 ..EngineConfig::default()
5010 };
5011 let engine = Arc::new(CerememoryEngine::new(config).unwrap());
5012 engine
5013 .encode_store_raw(raw_text_store_req(
5014 "sess-bg-dream",
5015 "Background dream me",
5016 RawVisibility::Normal,
5017 SecrecyLevel::Public,
5018 ))
5019 .await
5020 .unwrap();
5021
5022 engine.start_background_dream();
5023 assert!(engine.is_background_dream_enabled().await);
5024
5025 tokio::time::sleep(std::time::Duration::from_millis(2500)).await;
5026
5027 engine.stop_background_dream().await;
5028 assert!(!engine.is_background_dream_enabled().await);
5029 assert_eq!(engine.episodic.count().await.unwrap(), 1);
5030 }
5031
5032 #[tokio::test]
5033 async fn background_dream_disabled_by_default() {
5034 let engine = Arc::new(make_engine().await);
5035 engine.start_background_dream();
5036 assert!(!engine.is_background_dream_enabled().await);
5037 }
5038
5039 #[tokio::test]
5040 async fn multimodal_store_and_retrieve() {
5041 let engine = make_engine().await;
5042
5043 let req = EncodeStoreRequest {
5045 header: None,
5046 content: MemoryContent {
5047 blocks: vec![ContentBlock {
5048 modality: Modality::Image,
5049 format: "image/png".to_string(),
5050 data: vec![0x89, 0x50, 0x4E, 0x47], embedding: Some(vec![0.5, 0.3, 0.8]),
5052 }],
5053 summary: Some("A photo of a sunset".to_string()),
5054 },
5055 store: Some(StoreType::Episodic),
5056 emotion: None,
5057 context: None,
5058 metadata: None,
5059 associations: None,
5060 };
5061
5062 let resp = engine.encode_store(req).await.unwrap();
5063
5064 let record = engine
5066 .introspect_record(RecordIntrospectRequest {
5067 header: None,
5068 record_id: resp.record_id,
5069 include_history: false,
5070 include_associations: false,
5071 include_versions: false,
5072 })
5073 .await
5074 .unwrap();
5075
5076 assert_eq!(record.content.blocks[0].modality, Modality::Image);
5077 assert_eq!(record.content.blocks[0].data, vec![0x89, 0x50, 0x4E, 0x47]);
5078
5079 let query = RecallQueryRequest {
5081 header: None,
5082 cue: RecallCue {
5083 embedding: Some(vec![0.5, 0.3, 0.8]),
5084 ..Default::default()
5085 },
5086 stores: None,
5087 limit: 10,
5088 min_fidelity: None,
5089 include_decayed: false,
5090 reconsolidate: false,
5091 activation_depth: 0,
5092 recall_mode: RecallMode::Perfect,
5093 };
5094
5095 let recall_resp = engine.recall_query(query).await.unwrap();
5096 assert!(!recall_resp.memories.is_empty());
5097 }
5098
5099 #[tokio::test]
5100 async fn summary_only_records_are_text_searchable() {
5101 let engine = make_engine().await;
5102
5103 let resp = engine
5104 .encode_store(EncodeStoreRequest {
5105 header: None,
5106 content: MemoryContent {
5107 blocks: vec![ContentBlock {
5108 modality: Modality::Image,
5109 format: "image/png".to_string(),
5110 data: vec![0x89, 0x50, 0x4E, 0x47],
5111 embedding: None,
5112 }],
5113 summary: Some("sunset skyline".to_string()),
5114 },
5115 store: Some(StoreType::Semantic),
5116 emotion: None,
5117 context: None,
5118 metadata: None,
5119 associations: None,
5120 })
5121 .await
5122 .unwrap();
5123
5124 let recalled = engine
5125 .recall_query(RecallQueryRequest {
5126 header: None,
5127 cue: RecallCue {
5128 text: Some("skyline".to_string()),
5129 ..Default::default()
5130 },
5131 stores: Some(vec![StoreType::Semantic]),
5132 limit: 10,
5133 min_fidelity: None,
5134 include_decayed: false,
5135 reconsolidate: false,
5136 activation_depth: 0,
5137 recall_mode: RecallMode::Perfect,
5138 })
5139 .await
5140 .unwrap();
5141
5142 assert_eq!(recalled.memories.len(), 1);
5143 assert_eq!(recalled.memories[0].record.id, resp.record_id);
5144 }
5145
5146 #[tokio::test]
5147 async fn multiple_text_blocks_are_all_indexed() {
5148 let engine = make_engine().await;
5149
5150 engine
5151 .encode_store(EncodeStoreRequest {
5152 header: None,
5153 content: MemoryContent {
5154 blocks: vec![
5155 ContentBlock {
5156 modality: Modality::Text,
5157 format: "text/plain".to_string(),
5158 data: b"primary block".to_vec(),
5159 embedding: None,
5160 },
5161 ContentBlock {
5162 modality: Modality::Text,
5163 format: "text/plain".to_string(),
5164 data: b"secondary block".to_vec(),
5165 embedding: None,
5166 },
5167 ],
5168 summary: None,
5169 },
5170 store: Some(StoreType::Episodic),
5171 emotion: None,
5172 context: None,
5173 metadata: None,
5174 associations: None,
5175 })
5176 .await
5177 .unwrap();
5178
5179 let recalled = engine
5180 .recall_query(RecallQueryRequest {
5181 header: None,
5182 cue: RecallCue {
5183 text: Some("secondary".to_string()),
5184 ..Default::default()
5185 },
5186 stores: Some(vec![StoreType::Episodic]),
5187 limit: 10,
5188 min_fidelity: None,
5189 include_decayed: false,
5190 reconsolidate: false,
5191 activation_depth: 0,
5192 recall_mode: RecallMode::Perfect,
5193 })
5194 .await
5195 .unwrap();
5196
5197 assert_eq!(recalled.memories.len(), 1);
5198 }
5199
5200 #[tokio::test]
5201 async fn structured_recall_survives_rebuild() {
5202 let engine = make_engine().await;
5203 engine
5204 .encode_store(structured_store_req(
5205 r#"{"profile":{"city":"Tokyo","skills":["rust","python"]}}"#,
5206 Some(StoreType::Semantic),
5207 ))
5208 .await
5209 .unwrap();
5210
5211 let query = RecallQueryRequest {
5212 header: None,
5213 cue: RecallCue {
5214 text: Some("Tokyo".to_string()),
5215 ..Default::default()
5216 },
5217 stores: Some(vec![StoreType::Semantic]),
5218 limit: 10,
5219 min_fidelity: None,
5220 include_decayed: false,
5221 reconsolidate: false,
5222 activation_depth: 0,
5223 recall_mode: RecallMode::Perfect,
5224 };
5225
5226 let before = engine.recall_query(query.clone()).await.unwrap();
5227 assert_eq!(before.memories.len(), 1);
5228
5229 engine.rebuild_coordinator().await.unwrap();
5230
5231 let after = engine.recall_query(query).await.unwrap();
5232 assert_eq!(after.memories.len(), 1);
5233 }
5234
5235 #[tokio::test]
5236 async fn structured_update_rebuilds_text_index() {
5237 let engine = make_engine().await;
5238 let resp = engine
5239 .encode_store(structured_store_req(
5240 r#"{"profile":{"city":"Tokyo"}}"#,
5241 Some(StoreType::Semantic),
5242 ))
5243 .await
5244 .unwrap();
5245
5246 let tokyo_hits = engine
5247 .recall_query(RecallQueryRequest {
5248 header: None,
5249 cue: RecallCue {
5250 text: Some("Tokyo".to_string()),
5251 ..Default::default()
5252 },
5253 stores: Some(vec![StoreType::Semantic]),
5254 limit: 10,
5255 min_fidelity: None,
5256 include_decayed: false,
5257 reconsolidate: false,
5258 activation_depth: 0,
5259 recall_mode: RecallMode::Perfect,
5260 })
5261 .await
5262 .unwrap();
5263 assert_eq!(tokyo_hits.memories.len(), 1);
5264
5265 engine
5266 .encode_update(EncodeUpdateRequest {
5267 header: None,
5268 record_id: resp.record_id,
5269 content: Some(MemoryContent {
5270 blocks: vec![ContentBlock {
5271 modality: Modality::Structured,
5272 format: "application/json".to_string(),
5273 data: br#"{"profile":{"city":"Osaka"}}"#.to_vec(),
5274 embedding: None,
5275 }],
5276 summary: None,
5277 }),
5278 emotion: None,
5279 metadata: None,
5280 })
5281 .await
5282 .unwrap();
5283
5284 let tokyo_hits = engine
5285 .recall_query(RecallQueryRequest {
5286 header: None,
5287 cue: RecallCue {
5288 text: Some("Tokyo".to_string()),
5289 ..Default::default()
5290 },
5291 stores: Some(vec![StoreType::Semantic]),
5292 limit: 10,
5293 min_fidelity: None,
5294 include_decayed: false,
5295 reconsolidate: false,
5296 activation_depth: 0,
5297 recall_mode: RecallMode::Perfect,
5298 })
5299 .await
5300 .unwrap();
5301 assert!(tokyo_hits.memories.is_empty());
5302
5303 let osaka_hits = engine
5304 .recall_query(RecallQueryRequest {
5305 header: None,
5306 cue: RecallCue {
5307 text: Some("Osaka".to_string()),
5308 ..Default::default()
5309 },
5310 stores: Some(vec![StoreType::Semantic]),
5311 limit: 10,
5312 min_fidelity: None,
5313 include_decayed: false,
5314 reconsolidate: false,
5315 activation_depth: 0,
5316 recall_mode: RecallMode::Perfect,
5317 })
5318 .await
5319 .unwrap();
5320 assert_eq!(osaka_hits.memories.len(), 1);
5321 }
5322
5323 #[tokio::test]
5324 async fn multimodal_image_recall_uses_provider_embedding() {
5325 let provider = Arc::new(MockLLMProvider::new(4));
5326 let engine = CerememoryEngine::new(EngineConfig {
5327 llm_provider: Some(provider),
5328 ..Default::default()
5329 })
5330 .unwrap();
5331
5332 let image = vec![0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A];
5333 engine
5334 .encode_store(EncodeStoreRequest {
5335 header: None,
5336 content: MemoryContent {
5337 blocks: vec![ContentBlock {
5338 modality: Modality::Image,
5339 format: "image/png".to_string(),
5340 data: image.clone(),
5341 embedding: None,
5342 }],
5343 summary: Some("indexed image".to_string()),
5344 },
5345 store: Some(StoreType::Episodic),
5346 emotion: None,
5347 context: None,
5348 metadata: None,
5349 associations: None,
5350 })
5351 .await
5352 .unwrap();
5353
5354 let resp = engine
5355 .recall_query(RecallQueryRequest {
5356 header: None,
5357 cue: RecallCue {
5358 image: Some(image),
5359 ..Default::default()
5360 },
5361 stores: None,
5362 limit: 10,
5363 min_fidelity: None,
5364 include_decayed: false,
5365 reconsolidate: false,
5366 activation_depth: 0,
5367 recall_mode: RecallMode::Perfect,
5368 })
5369 .await
5370 .unwrap();
5371
5372 assert_eq!(resp.memories.len(), 1);
5373 assert_eq!(
5374 resp.memories[0].record.content.blocks[0].modality,
5375 Modality::Image
5376 );
5377 }
5378
5379 #[tokio::test]
5380 async fn multimodal_audio_recall_uses_provider_transcript() {
5381 let provider = Arc::new(MockLLMProvider::new(4));
5382 let engine = CerememoryEngine::new(EngineConfig {
5383 llm_provider: Some(provider),
5384 ..Default::default()
5385 })
5386 .unwrap();
5387
5388 let wav_bytes = b"RIFFabcdWAVE".to_vec();
5389 engine
5390 .encode_store(text_store_req("audio-12", Some(StoreType::Episodic)))
5391 .await
5392 .unwrap();
5393
5394 let resp = engine
5395 .recall_query(RecallQueryRequest {
5396 header: None,
5397 cue: RecallCue {
5398 audio: Some(wav_bytes),
5399 ..Default::default()
5400 },
5401 stores: None,
5402 limit: 10,
5403 min_fidelity: None,
5404 include_decayed: false,
5405 reconsolidate: false,
5406 activation_depth: 0,
5407 recall_mode: RecallMode::Perfect,
5408 })
5409 .await
5410 .unwrap();
5411
5412 assert!(!resp.memories.is_empty());
5413 assert_eq!(resp.memories[0].record.text_content(), Some("audio-12"));
5414 }
5415
5416 #[tokio::test]
5417 async fn encode_store_processes_all_multimodal_blocks() {
5418 let provider = Arc::new(MockLLMProvider::new(4));
5419 let engine = CerememoryEngine::new(EngineConfig {
5420 llm_provider: Some(provider),
5421 ..Default::default()
5422 })
5423 .unwrap();
5424
5425 let image_one = vec![0x89, b'P', b'N', b'G', 1, 2, 3, 4];
5426 let image_two = vec![0x89, b'P', b'N', b'G', 5, 6, 7, 8, 9];
5427 let audio_one = b"RIFFabcdWAVEone".to_vec();
5428 let audio_two = b"RIFFabcdWAVEtwoo".to_vec();
5429
5430 let resp = engine
5431 .encode_store(EncodeStoreRequest {
5432 header: None,
5433 content: MemoryContent {
5434 blocks: vec![
5435 ContentBlock {
5436 modality: Modality::Image,
5437 format: "image/png".to_string(),
5438 data: image_one.clone(),
5439 embedding: None,
5440 },
5441 ContentBlock {
5442 modality: Modality::Audio,
5443 format: "audio/wav".to_string(),
5444 data: audio_one.clone(),
5445 embedding: None,
5446 },
5447 ContentBlock {
5448 modality: Modality::Image,
5449 format: "image/png".to_string(),
5450 data: image_two.clone(),
5451 embedding: None,
5452 },
5453 ContentBlock {
5454 modality: Modality::Audio,
5455 format: "audio/wav".to_string(),
5456 data: audio_two.clone(),
5457 embedding: None,
5458 },
5459 ],
5460 summary: Some("multimodal batch".to_string()),
5461 },
5462 store: Some(StoreType::Episodic),
5463 emotion: None,
5464 context: None,
5465 metadata: None,
5466 associations: None,
5467 })
5468 .await
5469 .unwrap();
5470
5471 let record = engine
5472 .introspect_record(RecordIntrospectRequest {
5473 header: None,
5474 record_id: resp.record_id,
5475 include_history: false,
5476 include_associations: false,
5477 include_versions: false,
5478 })
5479 .await
5480 .unwrap();
5481
5482 assert_eq!(record.content.blocks.len(), 6);
5483 assert_eq!(record.content.blocks[0].modality, Modality::Image);
5484 assert_eq!(record.content.blocks[2].modality, Modality::Image);
5485 assert_eq!(record.content.blocks[4].modality, Modality::Text);
5486 assert_eq!(record.content.blocks[5].modality, Modality::Text);
5487
5488 let image_one_embedding = record.content.blocks[0].embedding.as_ref().unwrap();
5489 let image_two_embedding = record.content.blocks[2].embedding.as_ref().unwrap();
5490 assert_eq!(image_one_embedding[0], image_one.len() as f32);
5491 assert_eq!(image_two_embedding[0], image_two.len() as f32);
5492
5493 assert_eq!(
5494 std::str::from_utf8(&record.content.blocks[4].data).unwrap(),
5495 format!("audio-{}", audio_one.len())
5496 );
5497 assert_eq!(
5498 std::str::from_utf8(&record.content.blocks[5].data).unwrap(),
5499 format!("audio-{}", audio_two.len())
5500 );
5501 assert!(record.content.blocks[4].embedding.is_some());
5502 assert!(record.content.blocks[5].embedding.is_some());
5503 }
5504
5505 #[tokio::test]
5506 async fn size_limit_validation() {
5507 let engine = make_engine().await;
5508
5509 let big_text = vec![b'A'; 1_048_577]; let req = EncodeStoreRequest {
5512 header: None,
5513 content: MemoryContent {
5514 blocks: vec![ContentBlock {
5515 modality: Modality::Text,
5516 format: "text/plain".to_string(),
5517 data: big_text,
5518 embedding: None,
5519 }],
5520 summary: None,
5521 },
5522 store: Some(StoreType::Episodic),
5523 emotion: None,
5524 context: None,
5525 metadata: None,
5526 associations: None,
5527 };
5528
5529 let result = engine.encode_store(req).await;
5530 assert!(matches!(
5531 result,
5532 Err(CerememoryError::ContentTooLarge { .. })
5533 ));
5534 }
5535
5536 #[tokio::test]
5539 async fn lifecycle_export_returns_bytes_and_response() {
5540 let engine = make_engine().await;
5541
5542 engine
5543 .encode_store(text_store_req("Export me", Some(StoreType::Episodic)))
5544 .await
5545 .unwrap();
5546
5547 let (bytes, resp) = engine
5548 .lifecycle_export(ExportRequest {
5549 header: None,
5550 format: "cma".to_string(),
5551 stores: None,
5552 include_raw_journal: false,
5553 encrypt: false,
5554 encryption_key: None,
5555 })
5556 .await
5557 .unwrap();
5558
5559 assert_eq!(resp.record_count, 1);
5560 assert!(!bytes.is_empty());
5561 }
5562
5563 #[tokio::test]
5564 async fn lifecycle_export_accepts_jsonl_alias() {
5565 let engine = make_engine().await;
5566
5567 engine
5568 .encode_store(text_store_req("Export me too", Some(StoreType::Episodic)))
5569 .await
5570 .unwrap();
5571
5572 let (bytes, resp) = engine
5573 .lifecycle_export(ExportRequest {
5574 header: None,
5575 format: "jsonl".to_string(),
5576 stores: None,
5577 include_raw_journal: false,
5578 encrypt: false,
5579 encryption_key: None,
5580 })
5581 .await
5582 .unwrap();
5583
5584 assert_eq!(resp.record_count, 1);
5585 let records = cerememory_archive::import_records(&bytes).unwrap();
5586 assert_eq!(records.len(), 1);
5587 }
5588
5589 #[tokio::test]
5590 async fn lifecycle_export_rejects_unsupported_format() {
5591 let engine = make_engine().await;
5592
5593 let result = engine
5594 .lifecycle_export(ExportRequest {
5595 header: None,
5596 format: "zip".to_string(),
5597 stores: None,
5598 include_raw_journal: false,
5599 encrypt: false,
5600 encryption_key: None,
5601 })
5602 .await;
5603
5604 assert!(matches!(
5605 result,
5606 Err(CerememoryError::Validation(msg)) if msg.contains("Valid options: cma, jsonl")
5607 ));
5608 }
5609
5610 #[tokio::test]
5611 async fn lifecycle_export_store_filter() {
5612 let engine = make_engine().await;
5613
5614 engine
5615 .encode_store(text_store_req("Episodic", Some(StoreType::Episodic)))
5616 .await
5617 .unwrap();
5618 engine
5619 .encode_store(text_store_req("Semantic", Some(StoreType::Semantic)))
5620 .await
5621 .unwrap();
5622
5623 let (_, resp) = engine
5624 .lifecycle_export(ExportRequest {
5625 header: None,
5626 format: "cma".to_string(),
5627 stores: Some(vec![StoreType::Episodic]),
5628 include_raw_journal: false,
5629 encrypt: false,
5630 encryption_key: None,
5631 })
5632 .await
5633 .unwrap();
5634
5635 assert_eq!(resp.record_count, 1);
5636 }
5637
5638 #[tokio::test]
5639 async fn lifecycle_export_store_filter_deduplicates_requested_stores() {
5640 let engine = make_engine().await;
5641
5642 engine
5643 .encode_store(text_store_req("Episodic", Some(StoreType::Episodic)))
5644 .await
5645 .unwrap();
5646
5647 let (bytes, resp) = engine
5648 .lifecycle_export(ExportRequest {
5649 header: None,
5650 format: "cma".to_string(),
5651 stores: Some(vec![StoreType::Episodic, StoreType::Episodic]),
5652 include_raw_journal: false,
5653 encrypt: false,
5654 encryption_key: None,
5655 })
5656 .await
5657 .unwrap();
5658
5659 assert_eq!(resp.record_count, 1);
5660
5661 let records = cerememory_archive::import_records(&bytes).unwrap();
5662 assert_eq!(records.len(), 1);
5663 assert!(records.iter().all(|r| r.store == StoreType::Episodic));
5664 }
5665
5666 #[tokio::test]
5667 async fn lifecycle_export_encrypted_roundtrip() {
5668 let engine = make_engine().await;
5669
5670 engine
5671 .encode_store(text_store_req("Secret data", Some(StoreType::Episodic)))
5672 .await
5673 .unwrap();
5674
5675 let (encrypted_bytes, resp) = engine
5676 .lifecycle_export(ExportRequest {
5677 header: None,
5678 format: "cma".to_string(),
5679 stores: None,
5680 include_raw_journal: false,
5681 encrypt: true,
5682 encryption_key: Some("my-passphrase".to_string()),
5683 })
5684 .await
5685 .unwrap();
5686
5687 assert_eq!(resp.record_count, 1);
5688
5689 let result = engine.import_records(&encrypted_bytes).await;
5691 assert!(result.is_err());
5692 }
5693
5694 #[tokio::test]
5695 async fn lifecycle_export_import_with_raw_journal_roundtrip() {
5696 let engine = make_engine().await;
5697 engine
5698 .encode_store(text_store_req("Curated memory", Some(StoreType::Episodic)))
5699 .await
5700 .unwrap();
5701 engine
5702 .encode_store_raw(raw_text_store_req(
5703 "sess-export-raw",
5704 "Raw transcript note",
5705 RawVisibility::Normal,
5706 SecrecyLevel::Public,
5707 ))
5708 .await
5709 .unwrap();
5710
5711 let (bytes, resp) = engine
5712 .lifecycle_export(ExportRequest {
5713 header: None,
5714 format: "cma".to_string(),
5715 stores: None,
5716 include_raw_journal: true,
5717 encrypt: false,
5718 encryption_key: None,
5719 })
5720 .await
5721 .unwrap();
5722 assert_eq!(resp.record_count, 2);
5723
5724 let target = make_engine().await;
5725 let imported = target
5726 .lifecycle_import(ImportRequest {
5727 header: None,
5728 archive_id: "bundle-roundtrip".to_string(),
5729 strategy: ImportStrategy::Merge,
5730 conflict_resolution: ConflictResolution::KeepExisting,
5731 decryption_key: None,
5732 archive_data: Some(bytes),
5733 })
5734 .await
5735 .unwrap();
5736
5737 assert_eq!(imported, 2);
5738 assert_eq!(target.episodic.count().await.unwrap(), 1);
5739 assert_eq!(target.raw_journal_count().await.unwrap(), 1);
5740 }
5741
5742 #[tokio::test]
5743 async fn lifecycle_import_with_archive_data() {
5744 let engine = make_engine().await;
5745
5746 engine
5748 .encode_store(text_store_req("Import me", Some(StoreType::Episodic)))
5749 .await
5750 .unwrap();
5751
5752 let (bytes, _) = engine
5753 .lifecycle_export(ExportRequest {
5754 header: None,
5755 format: "cma".to_string(),
5756 stores: None,
5757 include_raw_journal: false,
5758 encrypt: false,
5759 encryption_key: None,
5760 })
5761 .await
5762 .unwrap();
5763
5764 let engine2 = make_engine().await;
5766 let imported = engine2
5767 .lifecycle_import(ImportRequest {
5768 header: None,
5769 archive_id: "test".to_string(),
5770 strategy: ImportStrategy::Merge,
5771 conflict_resolution: ConflictResolution::KeepExisting,
5772 decryption_key: None,
5773 archive_data: Some(bytes),
5774 })
5775 .await
5776 .unwrap();
5777
5778 assert_eq!(imported, 1);
5779 let stats = engine2.introspect_stats().await.unwrap();
5780 assert_eq!(stats.total_records, 1);
5781 }
5782
5783 #[tokio::test]
5784 async fn import_conflict_keep_existing() {
5785 let engine = make_engine().await;
5786
5787 let resp = engine
5789 .encode_store(text_store_req("Original", Some(StoreType::Episodic)))
5790 .await
5791 .unwrap();
5792
5793 let (bytes, _) = engine
5795 .lifecycle_export(ExportRequest {
5796 header: None,
5797 format: "cma".to_string(),
5798 stores: None,
5799 include_raw_journal: false,
5800 encrypt: false,
5801 encryption_key: None,
5802 })
5803 .await
5804 .unwrap();
5805
5806 let imported = engine
5808 .lifecycle_import(ImportRequest {
5809 header: None,
5810 archive_id: "test".to_string(),
5811 strategy: ImportStrategy::Merge,
5812 conflict_resolution: ConflictResolution::KeepExisting,
5813 decryption_key: None,
5814 archive_data: Some(bytes),
5815 })
5816 .await
5817 .unwrap();
5818
5819 assert_eq!(imported, 0);
5820 let record = engine
5821 .introspect_record(RecordIntrospectRequest {
5822 header: None,
5823 record_id: resp.record_id,
5824 include_history: false,
5825 include_associations: false,
5826 include_versions: false,
5827 })
5828 .await
5829 .unwrap();
5830 assert_eq!(record.text_content(), Some("Original"));
5831 }
5832
5833 #[tokio::test]
5834 async fn import_conflict_keep_imported() {
5835 let engine = make_engine().await;
5836
5837 let resp = engine
5839 .encode_store(text_store_req("Original text", Some(StoreType::Episodic)))
5840 .await
5841 .unwrap();
5842
5843 let (bytes, _) = engine
5845 .lifecycle_export(ExportRequest {
5846 header: None,
5847 format: "cma".to_string(),
5848 stores: None,
5849 include_raw_journal: false,
5850 encrypt: false,
5851 encryption_key: None,
5852 })
5853 .await
5854 .unwrap();
5855
5856 let imported = engine
5858 .lifecycle_import(ImportRequest {
5859 header: None,
5860 archive_id: "test".to_string(),
5861 strategy: ImportStrategy::Merge,
5862 conflict_resolution: ConflictResolution::KeepImported,
5863 decryption_key: None,
5864 archive_data: Some(bytes),
5865 })
5866 .await
5867 .unwrap();
5868
5869 assert_eq!(imported, 1);
5870 let record = engine.get_store_record(&resp.record_id).await.unwrap();
5872 assert!(record.is_some());
5873 }
5874
5875 #[tokio::test]
5876 async fn import_conflict_keep_newer() {
5877 let engine = make_engine().await;
5878
5879 let resp = engine
5881 .encode_store(text_store_req("First version", Some(StoreType::Episodic)))
5882 .await
5883 .unwrap();
5884
5885 let (bytes, _) = engine
5887 .lifecycle_export(ExportRequest {
5888 header: None,
5889 format: "cma".to_string(),
5890 stores: None,
5891 include_raw_journal: false,
5892 encrypt: false,
5893 encryption_key: None,
5894 })
5895 .await
5896 .unwrap();
5897
5898 engine
5900 .encode_update(EncodeUpdateRequest {
5901 header: None,
5902 record_id: resp.record_id,
5903 content: Some(MemoryContent {
5904 blocks: vec![ContentBlock {
5905 modality: Modality::Text,
5906 format: "text/plain".to_string(),
5907 data: b"Updated version".to_vec(),
5908 embedding: None,
5909 }],
5910 summary: None,
5911 }),
5912 emotion: None,
5913 metadata: None,
5914 })
5915 .await
5916 .unwrap();
5917
5918 let imported = engine
5920 .lifecycle_import(ImportRequest {
5921 header: None,
5922 archive_id: "test".to_string(),
5923 strategy: ImportStrategy::Merge,
5924 conflict_resolution: ConflictResolution::KeepNewer,
5925 decryption_key: None,
5926 archive_data: Some(bytes),
5927 })
5928 .await
5929 .unwrap();
5930
5931 assert_eq!(imported, 0);
5932 let record = engine
5933 .introspect_record(RecordIntrospectRequest {
5934 header: None,
5935 record_id: resp.record_id,
5936 include_history: false,
5937 include_associations: false,
5938 include_versions: false,
5939 })
5940 .await
5941 .unwrap();
5942 assert_eq!(record.text_content(), Some("Updated version"));
5943 }
5944
5945 #[tokio::test]
5946 async fn import_encrypted_archive() {
5947 let engine = make_engine().await;
5948
5949 engine
5950 .encode_store(text_store_req(
5951 "Encrypted import",
5952 Some(StoreType::Episodic),
5953 ))
5954 .await
5955 .unwrap();
5956
5957 let (encrypted, _) = engine
5958 .lifecycle_export(ExportRequest {
5959 header: None,
5960 format: "cma".to_string(),
5961 stores: None,
5962 include_raw_journal: false,
5963 encrypt: true,
5964 encryption_key: Some("pass123".to_string()),
5965 })
5966 .await
5967 .unwrap();
5968
5969 let engine2 = make_engine().await;
5971 let imported = engine2
5972 .lifecycle_import(ImportRequest {
5973 header: None,
5974 archive_id: "test".to_string(),
5975 strategy: ImportStrategy::Merge,
5976 conflict_resolution: ConflictResolution::KeepExisting,
5977 decryption_key: Some("pass123".to_string()),
5978 archive_data: Some(encrypted),
5979 })
5980 .await
5981 .unwrap();
5982
5983 assert_eq!(imported, 1);
5984 }
5985
5986 #[tokio::test]
5987 async fn import_missing_archive_data() {
5988 let engine = make_engine().await;
5989
5990 let result = engine
5991 .lifecycle_import(ImportRequest {
5992 header: None,
5993 archive_id: "test".to_string(),
5994 strategy: ImportStrategy::Merge,
5995 conflict_resolution: ConflictResolution::KeepExisting,
5996 decryption_key: None,
5997 archive_data: None,
5998 })
5999 .await;
6000
6001 assert!(result.is_err());
6002 let err = format!("{:?}", result.unwrap_err());
6003 assert!(err.contains("archive_data"));
6004 }
6005
6006 #[tokio::test]
6007 async fn lifecycle_export_encrypt_without_key_fails() {
6008 let engine = make_engine().await;
6009
6010 engine
6011 .encode_store(text_store_req("Some data", Some(StoreType::Episodic)))
6012 .await
6013 .unwrap();
6014
6015 let result = engine
6016 .lifecycle_export(ExportRequest {
6017 header: None,
6018 format: "cma".to_string(),
6019 stores: None,
6020 include_raw_journal: false,
6021 encrypt: true,
6022 encryption_key: None,
6023 })
6024 .await;
6025
6026 assert!(result.is_err());
6027 let err = format!("{:?}", result.unwrap_err());
6028 assert!(err.contains("encryption_key is required"));
6029 }
6030
6031 #[tokio::test]
6032 async fn import_conflict_cross_store_keep_imported_count_stays_one() {
6033 let engine = make_engine().await;
6034
6035 let resp = engine
6037 .encode_store(text_store_req(
6038 "Original in episodic",
6039 Some(StoreType::Episodic),
6040 ))
6041 .await
6042 .unwrap();
6043 let record_id = resp.record_id;
6044
6045 let (bytes, _) = engine
6047 .lifecycle_export(ExportRequest {
6048 header: None,
6049 format: "cma".to_string(),
6050 stores: None,
6051 include_raw_journal: false,
6052 encrypt: false,
6053 encryption_key: None,
6054 })
6055 .await
6056 .unwrap();
6057
6058 let imported = engine
6063 .lifecycle_import(ImportRequest {
6064 header: None,
6065 archive_id: "test".to_string(),
6066 strategy: ImportStrategy::Merge,
6067 conflict_resolution: ConflictResolution::KeepImported,
6068 decryption_key: None,
6069 archive_data: Some(bytes),
6070 })
6071 .await
6072 .unwrap();
6073
6074 assert_eq!(imported, 1);
6075
6076 let stats = engine.introspect_stats().await.unwrap();
6078 assert_eq!(stats.total_records, 1);
6079
6080 let record = engine.get_store_record(&record_id).await.unwrap();
6082 assert!(record.is_some());
6083 }
6084
6085 #[tokio::test]
6086 async fn import_strategy_replace_replaces_existing_dataset() {
6087 let source = make_engine().await;
6088
6089 let imported_episode = source
6090 .encode_store(text_store_req(
6091 "Imported episodic",
6092 Some(StoreType::Episodic),
6093 ))
6094 .await;
6095 let imported_episode_id = imported_episode.unwrap().record_id;
6096 let imported_semantic = source
6097 .encode_store(text_store_req(
6098 "Imported semantic",
6099 Some(StoreType::Semantic),
6100 ))
6101 .await
6102 .unwrap()
6103 .record_id;
6104
6105 let (archive_data, _) = source
6106 .lifecycle_export(ExportRequest {
6107 header: None,
6108 format: "cma".to_string(),
6109 stores: None,
6110 include_raw_journal: false,
6111 encrypt: false,
6112 encryption_key: None,
6113 })
6114 .await
6115 .unwrap();
6116
6117 let target = make_engine().await;
6118 let old_id = target
6119 .encode_store(text_store_req("Old episodic", Some(StoreType::Episodic)))
6120 .await
6121 .unwrap()
6122 .record_id;
6123 target
6124 .encode_store(text_store_req("Old working", Some(StoreType::Working)))
6125 .await
6126 .unwrap();
6127
6128 let imported = target
6129 .lifecycle_import(ImportRequest {
6130 header: None,
6131 archive_id: "replace-test".to_string(),
6132 strategy: ImportStrategy::Replace,
6133 conflict_resolution: ConflictResolution::KeepNewer,
6134 decryption_key: None,
6135 archive_data: Some(archive_data),
6136 })
6137 .await
6138 .unwrap();
6139
6140 assert_eq!(imported, 2);
6141
6142 let stats = target.introspect_stats().await.unwrap();
6143 assert_eq!(stats.total_records, 2);
6144 assert_eq!(stats.records_by_store[&StoreType::Episodic], 1);
6145 assert_eq!(stats.records_by_store[&StoreType::Semantic], 1);
6146 assert_eq!(
6147 *stats
6148 .records_by_store
6149 .get(&StoreType::Working)
6150 .unwrap_or(&0),
6151 0
6152 );
6153
6154 assert!(target.get_store_record(&old_id).await.unwrap().is_none());
6155 assert!(target
6156 .get_store_record(&imported_episode_id)
6157 .await
6158 .unwrap()
6159 .is_some());
6160 assert!(target
6161 .get_store_record(&imported_semantic)
6162 .await
6163 .unwrap()
6164 .is_some());
6165 }
6166
6167 #[tokio::test]
6168 async fn import_strategy_replace_preserves_existing_data_on_invalid_archive() {
6169 let engine = make_engine().await;
6170 let old_id = engine
6171 .encode_store(text_store_req("Keep me", Some(StoreType::Episodic)))
6172 .await
6173 .unwrap()
6174 .record_id;
6175
6176 let result = engine
6177 .lifecycle_import(ImportRequest {
6178 header: None,
6179 archive_id: "invalid-replace".to_string(),
6180 strategy: ImportStrategy::Replace,
6181 conflict_resolution: ConflictResolution::KeepNewer,
6182 decryption_key: None,
6183 archive_data: Some(b"not-a-valid-cma-archive".to_vec()),
6184 })
6185 .await;
6186
6187 assert!(result.is_err());
6188 let stats = engine.introspect_stats().await.unwrap();
6189 assert_eq!(stats.total_records, 1);
6190 assert!(engine.get_store_record(&old_id).await.unwrap().is_some());
6191 }
6192
6193 #[tokio::test]
6196 async fn timeline_hour_granularity() {
6197 let engine = make_engine().await;
6198 engine
6199 .encode_store(text_store_req("Morning event", Some(StoreType::Episodic)))
6200 .await
6201 .unwrap();
6202
6203 let now = Utc::now();
6204 let resp = engine
6205 .recall_timeline(RecallTimelineRequest {
6206 header: None,
6207 range: TemporalRange {
6208 start: now - chrono::Duration::hours(1),
6209 end: now + chrono::Duration::hours(1),
6210 },
6211 granularity: TimeGranularity::Hour,
6212 min_fidelity: None,
6213 emotion_filter: None,
6214 })
6215 .await
6216 .unwrap();
6217
6218 assert!(!resp.buckets.is_empty());
6219 let total: u32 = resp.buckets.iter().map(|b| b.count).sum();
6220 assert!(total >= 1);
6221 }
6222
6223 #[tokio::test]
6224 async fn timeline_day_granularity() {
6225 let engine = make_engine().await;
6226 for i in 0..3 {
6227 engine
6228 .encode_store(text_store_req(
6229 &format!("Day event {i}"),
6230 Some(StoreType::Episodic),
6231 ))
6232 .await
6233 .unwrap();
6234 }
6235
6236 let now = Utc::now();
6237 let resp = engine
6238 .recall_timeline(RecallTimelineRequest {
6239 header: None,
6240 range: TemporalRange {
6241 start: now - chrono::Duration::days(1),
6242 end: now + chrono::Duration::days(1),
6243 },
6244 granularity: TimeGranularity::Day,
6245 min_fidelity: None,
6246 emotion_filter: None,
6247 })
6248 .await
6249 .unwrap();
6250
6251 let total: u32 = resp.buckets.iter().map(|b| b.count).sum();
6252 assert_eq!(total, 3);
6253 }
6254
6255 #[tokio::test]
6256 async fn timeline_empty_range() {
6257 let engine = make_engine().await;
6258 engine
6259 .encode_store(text_store_req("An event", Some(StoreType::Episodic)))
6260 .await
6261 .unwrap();
6262
6263 let resp = engine
6265 .recall_timeline(RecallTimelineRequest {
6266 header: None,
6267 range: TemporalRange {
6268 start: Utc::now() - chrono::Duration::days(365 * 10),
6269 end: Utc::now() - chrono::Duration::days(365 * 9),
6270 },
6271 granularity: TimeGranularity::Day,
6272 min_fidelity: None,
6273 emotion_filter: None,
6274 })
6275 .await
6276 .unwrap();
6277
6278 assert!(resp.buckets.is_empty());
6279 }
6280
6281 #[tokio::test]
6282 async fn timeline_min_fidelity_filter() {
6283 let engine = make_engine().await;
6284 engine
6285 .encode_store(text_store_req(
6286 "High fidelity event",
6287 Some(StoreType::Episodic),
6288 ))
6289 .await
6290 .unwrap();
6291
6292 let now = Utc::now();
6293 let resp = engine
6294 .recall_timeline(RecallTimelineRequest {
6295 header: None,
6296 range: TemporalRange {
6297 start: now - chrono::Duration::hours(1),
6298 end: now + chrono::Duration::hours(1),
6299 },
6300 granularity: TimeGranularity::Hour,
6301 min_fidelity: Some(0.5),
6302 emotion_filter: None,
6303 })
6304 .await
6305 .unwrap();
6306
6307 let total: u32 = resp.buckets.iter().map(|b| b.count).sum();
6309 assert!(total >= 1);
6310 }
6311
6312 #[tokio::test]
6313 async fn timeline_emotion_filter() {
6314 let engine = make_engine().await;
6315
6316 let req = EncodeStoreRequest {
6318 header: None,
6319 content: MemoryContent {
6320 blocks: vec![ContentBlock {
6321 modality: Modality::Text,
6322 format: "text/plain".to_string(),
6323 data: b"Happy event".to_vec(),
6324 embedding: None,
6325 }],
6326 summary: None,
6327 },
6328 store: Some(StoreType::Episodic),
6329 emotion: Some(EmotionVector {
6330 joy: 0.9,
6331 ..Default::default()
6332 }),
6333 context: None,
6334 metadata: None,
6335 associations: None,
6336 };
6337 engine.encode_store(req).await.unwrap();
6338
6339 let req2 = EncodeStoreRequest {
6341 header: None,
6342 content: MemoryContent {
6343 blocks: vec![ContentBlock {
6344 modality: Modality::Text,
6345 format: "text/plain".to_string(),
6346 data: b"Sad event".to_vec(),
6347 embedding: None,
6348 }],
6349 summary: None,
6350 },
6351 store: Some(StoreType::Episodic),
6352 emotion: Some(EmotionVector {
6353 sadness: 0.9,
6354 ..Default::default()
6355 }),
6356 context: None,
6357 metadata: None,
6358 associations: None,
6359 };
6360 engine.encode_store(req2).await.unwrap();
6361
6362 let now = Utc::now();
6363
6364 let resp = engine
6366 .recall_timeline(RecallTimelineRequest {
6367 header: None,
6368 range: TemporalRange {
6369 start: now - chrono::Duration::hours(1),
6370 end: now + chrono::Duration::hours(1),
6371 },
6372 granularity: TimeGranularity::Hour,
6373 min_fidelity: None,
6374 emotion_filter: Some(EmotionVector {
6375 joy: 1.0,
6376 ..Default::default()
6377 }),
6378 })
6379 .await
6380 .unwrap();
6381
6382 let total: u32 = resp.buckets.iter().map(|b| b.count).sum();
6383 assert_eq!(total, 1, "Only the joyful event should match");
6384 }
6385
6386 #[tokio::test]
6387 async fn timeline_multi_store() {
6388 let engine = make_engine().await;
6389 engine
6390 .encode_store(text_store_req("Episodic event", Some(StoreType::Episodic)))
6391 .await
6392 .unwrap();
6393 engine
6394 .encode_store(text_store_req(
6395 "Procedural event",
6396 Some(StoreType::Procedural),
6397 ))
6398 .await
6399 .unwrap();
6400
6401 let now = Utc::now();
6402 let resp = engine
6403 .recall_timeline(RecallTimelineRequest {
6404 header: None,
6405 range: TemporalRange {
6406 start: now - chrono::Duration::hours(1),
6407 end: now + chrono::Duration::hours(1),
6408 },
6409 granularity: TimeGranularity::Hour,
6410 min_fidelity: None,
6411 emotion_filter: None,
6412 })
6413 .await
6414 .unwrap();
6415
6416 let total: u32 = resp.buckets.iter().map(|b| b.count).sum();
6417 assert!(total >= 2);
6418 }
6419
6420 #[tokio::test]
6423 async fn graph_centered_traversal() {
6424 let engine = make_engine().await;
6425
6426 let r1 = engine
6428 .encode_store(text_store_req("Node A", Some(StoreType::Episodic)))
6429 .await
6430 .unwrap();
6431 let r2 = engine
6432 .encode_store(text_store_req("Node B", Some(StoreType::Episodic)))
6433 .await
6434 .unwrap();
6435
6436 let assoc = Association {
6438 target_id: r2.record_id,
6439 association_type: AssociationType::Semantic,
6440 weight: 0.9,
6441 created_at: Utc::now(),
6442 last_co_activation: Utc::now(),
6443 };
6444 engine
6445 .coordinator
6446 .add_association(&r1.record_id, assoc)
6447 .await
6448 .unwrap();
6449
6450 let resp = engine
6451 .recall_graph(RecallGraphRequest {
6452 header: None,
6453 center_id: Some(r1.record_id),
6454 depth: 2,
6455 edge_types: None,
6456 limit_nodes: 10,
6457 })
6458 .await
6459 .unwrap();
6460
6461 assert!(resp.nodes.len() >= 2);
6462 assert!(!resp.edges.is_empty());
6463 }
6464
6465 #[tokio::test]
6466 async fn graph_empty() {
6467 let engine = make_engine().await;
6468 let resp = engine
6469 .recall_graph(RecallGraphRequest {
6470 header: None,
6471 center_id: None,
6472 depth: 1,
6473 edge_types: None,
6474 limit_nodes: 10,
6475 })
6476 .await
6477 .unwrap();
6478
6479 assert!(resp.nodes.is_empty());
6480 }
6481
6482 #[tokio::test]
6483 async fn graph_depth_limiting() {
6484 let engine = make_engine().await;
6485 let r1 = engine
6486 .encode_store(text_store_req("Node 1", Some(StoreType::Episodic)))
6487 .await
6488 .unwrap();
6489
6490 let resp = engine
6491 .recall_graph(RecallGraphRequest {
6492 header: None,
6493 center_id: Some(r1.record_id),
6494 depth: 0,
6495 edge_types: None,
6496 limit_nodes: 10,
6497 })
6498 .await
6499 .unwrap();
6500
6501 assert_eq!(resp.nodes.len(), 1);
6503 assert!(resp.edges.is_empty());
6504 }
6505
6506 #[tokio::test]
6509 async fn decay_forecast_future() {
6510 let engine = make_engine().await;
6511 let resp = engine
6512 .encode_store(text_store_req("Forecast me", Some(StoreType::Episodic)))
6513 .await
6514 .unwrap();
6515
6516 let forecast = engine
6517 .introspect_decay_forecast(DecayForecastRequest {
6518 header: None,
6519 record_ids: vec![resp.record_id],
6520 forecast_at: Utc::now() + chrono::Duration::days(30),
6521 })
6522 .await
6523 .unwrap();
6524
6525 assert_eq!(forecast.forecasts.len(), 1);
6526 assert!(forecast.forecasts[0].forecasted_fidelity < forecast.forecasts[0].current_fidelity);
6527 assert!(forecast.forecasts[0].forecasted_fidelity > 0.0);
6528 }
6529
6530 #[tokio::test]
6531 async fn decay_forecast_threshold_date() {
6532 let engine = make_engine().await;
6533 let resp = engine
6534 .encode_store(text_store_req("Will decay", Some(StoreType::Episodic)))
6535 .await
6536 .unwrap();
6537
6538 let forecast = engine
6539 .introspect_decay_forecast(DecayForecastRequest {
6540 header: None,
6541 record_ids: vec![resp.record_id],
6542 forecast_at: Utc::now() + chrono::Duration::days(365),
6543 })
6544 .await
6545 .unwrap();
6546
6547 assert!(forecast.forecasts[0].estimated_threshold_date.is_some());
6549 assert!(forecast.forecasts[0].estimated_threshold_date.unwrap() > Utc::now());
6550 }
6551
6552 #[tokio::test]
6553 async fn decay_forecast_multiple_records() {
6554 let engine = make_engine().await;
6555 let r1 = engine
6556 .encode_store(text_store_req("Record 1", Some(StoreType::Episodic)))
6557 .await
6558 .unwrap();
6559 let r2 = engine
6560 .encode_store(text_store_req("Record 2", Some(StoreType::Semantic)))
6561 .await
6562 .unwrap();
6563
6564 let forecast = engine
6565 .introspect_decay_forecast(DecayForecastRequest {
6566 header: None,
6567 record_ids: vec![r1.record_id, r2.record_id],
6568 forecast_at: Utc::now() + chrono::Duration::days(7),
6569 })
6570 .await
6571 .unwrap();
6572
6573 assert_eq!(forecast.forecasts.len(), 2);
6574 }
6575
6576 #[tokio::test]
6577 async fn decay_forecast_uses_per_record_decay_rate() {
6578 let engine = make_engine().await;
6579 let resp = engine
6580 .encode_store(text_store_req("Test record", Some(StoreType::Episodic)))
6581 .await
6582 .unwrap();
6583
6584 engine
6586 .lifecycle_decay_tick(DecayTickRequest {
6587 header: None,
6588 tick_duration_seconds: Some(3600),
6589 })
6590 .await
6591 .unwrap();
6592
6593 let forecast = engine
6595 .introspect_decay_forecast(DecayForecastRequest {
6596 header: None,
6597 record_ids: vec![resp.record_id],
6598 forecast_at: Utc::now() + chrono::Duration::days(30),
6599 })
6600 .await
6601 .unwrap();
6602
6603 assert_eq!(forecast.forecasts.len(), 1);
6604 assert!(forecast.forecasts[0].forecasted_fidelity > 0.0);
6607 assert!(forecast.forecasts[0].forecasted_fidelity < 1.0);
6608 }
6609
6610 #[tokio::test]
6613 async fn evolution_returns_metrics() {
6614 let engine = make_engine().await;
6615 let metrics = engine.introspect_evolution().await.unwrap();
6616 assert!(metrics.parameter_adjustments.is_empty());
6618 }
6619
6620 #[tokio::test]
6621 async fn evolution_after_decay() {
6622 let engine = make_engine().await;
6623 for i in 0..5 {
6624 engine
6625 .encode_store(text_store_req(
6626 &format!("Record {i}"),
6627 Some(StoreType::Episodic),
6628 ))
6629 .await
6630 .unwrap();
6631 }
6632
6633 engine
6635 .lifecycle_decay_tick(DecayTickRequest {
6636 header: None,
6637 tick_duration_seconds: Some(86400),
6638 })
6639 .await
6640 .unwrap();
6641
6642 let metrics = engine.introspect_evolution().await.unwrap();
6643 assert!(metrics.detected_patterns.is_empty() || !metrics.detected_patterns.is_empty());
6645 }
6646
6647 struct MockLLMProvider {
6651 embed_dim: usize,
6652 }
6653
6654 impl MockLLMProvider {
6655 fn new(embed_dim: usize) -> Self {
6656 Self { embed_dim }
6657 }
6658 }
6659
6660 impl LLMProvider for MockLLMProvider {
6661 fn embed(
6662 &self,
6663 text: &str,
6664 ) -> std::pin::Pin<
6665 Box<dyn std::future::Future<Output = Result<Vec<f32>, CerememoryError>> + Send + '_>,
6666 > {
6667 let dim = self.embed_dim;
6668 let hash = text.len() as f32;
6669 Box::pin(async move {
6670 let mut v = vec![0.0f32; dim];
6671 v[0] = hash;
6672 v[1] = 1.0;
6673 Ok(v)
6674 })
6675 }
6676
6677 fn summarize(
6678 &self,
6679 texts: &[String],
6680 max_tokens: usize,
6681 ) -> std::pin::Pin<
6682 Box<dyn std::future::Future<Output = Result<String, CerememoryError>> + Send + '_>,
6683 > {
6684 let joined = texts.join("; ");
6685 let truncated = if joined.len() > max_tokens {
6686 format!("{}...", truncate_str(&joined, max_tokens))
6687 } else {
6688 joined
6689 };
6690 Box::pin(async move { Ok(truncated) })
6691 }
6692
6693 fn extract_relations(
6694 &self,
6695 text: &str,
6696 ) -> std::pin::Pin<
6697 Box<
6698 dyn std::future::Future<Output = Result<Vec<ExtractedRelation>, CerememoryError>>
6699 + Send
6700 + '_,
6701 >,
6702 > {
6703 let has_content = !text.is_empty();
6704 Box::pin(async move {
6705 if has_content {
6706 Ok(vec![ExtractedRelation {
6707 subject: "test".to_string(),
6708 predicate: "is_a".to_string(),
6709 object: "mock".to_string(),
6710 confidence: 0.9,
6711 }])
6712 } else {
6713 Ok(Vec::new())
6714 }
6715 })
6716 }
6717
6718 fn embed_image(
6719 &self,
6720 data: &[u8],
6721 _format: &str,
6722 ) -> std::pin::Pin<
6723 Box<dyn std::future::Future<Output = Result<Vec<f32>, CerememoryError>> + Send + '_>,
6724 > {
6725 let dim = self.embed_dim;
6726 let hash = data.len() as f32;
6727 Box::pin(async move {
6728 let mut v = vec![0.0f32; dim];
6729 v[0] = hash;
6730 v[1] = 2.0;
6731 Ok(v)
6732 })
6733 }
6734
6735 fn transcribe_audio(
6736 &self,
6737 data: &[u8],
6738 _format: &str,
6739 ) -> std::pin::Pin<
6740 Box<dyn std::future::Future<Output = Result<String, CerememoryError>> + Send + '_>,
6741 > {
6742 let transcript = format!("audio-{}", data.len());
6743 Box::pin(async move { Ok(transcript) })
6744 }
6745
6746 fn capabilities(&self) -> ProviderCapabilities {
6747 ProviderCapabilities {
6748 text_embedding: true,
6749 image_embedding: true,
6750 audio_transcription: true,
6751 }
6752 }
6753 }
6754
6755 #[tokio::test]
6756 async fn engine_auto_embed_with_provider() {
6757 let provider = Arc::new(MockLLMProvider::new(4));
6758 let engine = CerememoryEngine::new(EngineConfig {
6759 llm_provider: Some(provider),
6760 ..Default::default()
6761 })
6762 .unwrap();
6763
6764 let resp = engine
6766 .encode_store(text_store_req("Auto embed me", Some(StoreType::Episodic)))
6767 .await
6768 .unwrap();
6769
6770 let record = engine
6772 .introspect_record(RecordIntrospectRequest {
6773 header: None,
6774 record_id: resp.record_id,
6775 include_history: false,
6776 include_associations: false,
6777 include_versions: false,
6778 })
6779 .await
6780 .unwrap();
6781
6782 assert!(record.content.blocks[0].embedding.is_some());
6783 let emb = record.content.blocks[0].embedding.as_ref().unwrap();
6784 assert_eq!(emb.len(), 4);
6785 }
6786
6787 #[tokio::test]
6788 async fn engine_auto_embeds_all_image_blocks() {
6789 let provider = Arc::new(MockLLMProvider::new(4));
6790 let engine = CerememoryEngine::new(EngineConfig {
6791 llm_provider: Some(provider),
6792 ..Default::default()
6793 })
6794 .unwrap();
6795
6796 let resp = engine
6797 .encode_store(EncodeStoreRequest {
6798 header: None,
6799 content: MemoryContent {
6800 blocks: vec![
6801 ContentBlock {
6802 modality: Modality::Image,
6803 format: "image/png".to_string(),
6804 data: vec![1; 8],
6805 embedding: None,
6806 },
6807 ContentBlock {
6808 modality: Modality::Image,
6809 format: "image/png".to_string(),
6810 data: vec![2; 13],
6811 embedding: None,
6812 },
6813 ],
6814 summary: None,
6815 },
6816 store: Some(StoreType::Episodic),
6817 emotion: None,
6818 context: None,
6819 metadata: None,
6820 associations: None,
6821 })
6822 .await
6823 .unwrap();
6824
6825 let record = engine
6826 .introspect_record(RecordIntrospectRequest {
6827 header: None,
6828 record_id: resp.record_id,
6829 include_history: false,
6830 include_associations: false,
6831 include_versions: false,
6832 })
6833 .await
6834 .unwrap();
6835
6836 assert_eq!(record.content.blocks.len(), 2);
6837 assert_eq!(record.content.blocks[0].modality, Modality::Image);
6838 assert_eq!(record.content.blocks[1].modality, Modality::Image);
6839 assert_eq!(record.content.blocks[0].embedding.as_ref().unwrap()[0], 8.0);
6840 assert_eq!(
6841 record.content.blocks[1].embedding.as_ref().unwrap()[0],
6842 13.0
6843 );
6844 assert_eq!(record.content.blocks[0].embedding.as_ref().unwrap()[1], 2.0);
6845 assert_eq!(record.content.blocks[1].embedding.as_ref().unwrap()[1], 2.0);
6846 }
6847
6848 #[tokio::test]
6849 async fn engine_transcribes_all_audio_blocks_in_order() {
6850 let provider = Arc::new(MockLLMProvider::new(4));
6851 let engine = CerememoryEngine::new(EngineConfig {
6852 llm_provider: Some(provider),
6853 ..Default::default()
6854 })
6855 .unwrap();
6856
6857 let resp = engine
6858 .encode_store(EncodeStoreRequest {
6859 header: None,
6860 content: MemoryContent {
6861 blocks: vec![
6862 ContentBlock {
6863 modality: Modality::Audio,
6864 format: "audio/wav".to_string(),
6865 data: vec![0; 12],
6866 embedding: None,
6867 },
6868 ContentBlock {
6869 modality: Modality::Audio,
6870 format: "audio/wav".to_string(),
6871 data: vec![1; 123],
6872 embedding: None,
6873 },
6874 ],
6875 summary: None,
6876 },
6877 store: Some(StoreType::Episodic),
6878 emotion: None,
6879 context: None,
6880 metadata: None,
6881 associations: None,
6882 })
6883 .await
6884 .unwrap();
6885
6886 let record = engine
6887 .introspect_record(RecordIntrospectRequest {
6888 header: None,
6889 record_id: resp.record_id,
6890 include_history: false,
6891 include_associations: false,
6892 include_versions: false,
6893 })
6894 .await
6895 .unwrap();
6896
6897 assert_eq!(record.content.blocks.len(), 4);
6898 assert_eq!(record.content.blocks[0].modality, Modality::Audio);
6899 assert_eq!(record.content.blocks[1].modality, Modality::Audio);
6900 assert_eq!(record.content.blocks[2].modality, Modality::Text);
6901 assert_eq!(record.content.blocks[3].modality, Modality::Text);
6902 assert_eq!(
6903 std::str::from_utf8(&record.content.blocks[2].data).unwrap(),
6904 "audio-12"
6905 );
6906 assert_eq!(
6907 std::str::from_utf8(&record.content.blocks[3].data).unwrap(),
6908 "audio-123"
6909 );
6910 assert_eq!(
6911 record.content.blocks[2].embedding.as_ref().unwrap()[0],
6912 "audio-12".len() as f32
6913 );
6914 assert_eq!(
6915 record.content.blocks[3].embedding.as_ref().unwrap()[0],
6916 "audio-123".len() as f32
6917 );
6918 }
6919
6920 #[tokio::test]
6921 async fn engine_no_provider_passthrough() {
6922 let engine = make_engine().await;
6924 let resp = engine
6925 .encode_store(text_store_req("No provider", Some(StoreType::Episodic)))
6926 .await
6927 .unwrap();
6928
6929 let record = engine
6930 .introspect_record(RecordIntrospectRequest {
6931 header: None,
6932 record_id: resp.record_id,
6933 include_history: false,
6934 include_associations: false,
6935 include_versions: false,
6936 })
6937 .await
6938 .unwrap();
6939
6940 assert!(record.content.blocks[0].embedding.is_none());
6941 }
6942
6943 #[tokio::test]
6944 async fn engine_existing_embedding_not_overwritten() {
6945 let provider = Arc::new(MockLLMProvider::new(4));
6946 let engine = CerememoryEngine::new(EngineConfig {
6947 llm_provider: Some(provider),
6948 ..Default::default()
6949 })
6950 .unwrap();
6951
6952 let req = EncodeStoreRequest {
6954 header: None,
6955 content: MemoryContent {
6956 blocks: vec![ContentBlock {
6957 modality: Modality::Text,
6958 format: "text/plain".to_string(),
6959 data: b"Has embedding".to_vec(),
6960 embedding: Some(vec![9.0, 9.0, 9.0]),
6961 }],
6962 summary: None,
6963 },
6964 store: Some(StoreType::Episodic),
6965 emotion: None,
6966 context: None,
6967 metadata: None,
6968 associations: None,
6969 };
6970
6971 let resp = engine.encode_store(req).await.unwrap();
6972
6973 let record = engine
6974 .introspect_record(RecordIntrospectRequest {
6975 header: None,
6976 record_id: resp.record_id,
6977 include_history: false,
6978 include_associations: false,
6979 include_versions: false,
6980 })
6981 .await
6982 .unwrap();
6983
6984 let emb = record.content.blocks[0].embedding.as_ref().unwrap();
6986 assert_eq!(emb, &vec![9.0, 9.0, 9.0]);
6987 }
6988
6989 #[tokio::test]
6990 async fn noop_provider_embed_returns_empty() {
6991 let provider = NoOpProvider;
6992 let result = provider.embed("test").await;
6993 assert!(result.unwrap().is_empty());
6994 }
6995
6996 #[tokio::test]
6997 async fn noop_provider_summarize_concatenates() {
6998 let provider = NoOpProvider;
6999 let texts = vec!["hello".to_string(), "world".to_string()];
7000 let result = provider.summarize(&texts, 100).await.unwrap();
7001 assert_eq!(result, "hello world");
7002 }
7003
7004 #[tokio::test]
7005 async fn noop_provider_extract_relations_empty() {
7006 let provider = NoOpProvider;
7007 let result = provider.extract_relations("test").await.unwrap();
7008 assert!(result.is_empty());
7009 }
7010
7011 #[tokio::test]
7012 async fn noop_provider_capabilities_are_disabled() {
7013 let provider = NoOpProvider;
7014 let caps = provider.capabilities();
7015 assert!(!caps.text_embedding);
7016 assert!(!caps.image_embedding);
7017 assert!(!caps.audio_transcription);
7018 }
7019
7020 #[tokio::test]
7021 async fn mock_provider_embed_roundtrip() {
7022 let provider = MockLLMProvider::new(8);
7023 let result = provider.embed("hello").await.unwrap();
7024 assert_eq!(result.len(), 8);
7025 assert!(result[0] > 0.0); }
7027
7028 #[tokio::test]
7029 async fn mock_provider_summarize() {
7030 let provider = MockLLMProvider::new(4);
7031 let texts = vec!["one".to_string(), "two".to_string()];
7032 let result = provider.summarize(&texts, 100).await.unwrap();
7033 assert!(result.contains("one"));
7034 assert!(result.contains("two"));
7035 }
7036
7037 #[tokio::test]
7038 async fn mock_provider_extract_relations() {
7039 let provider = MockLLMProvider::new(4);
7040 let result = provider.extract_relations("some text").await.unwrap();
7041 assert_eq!(result.len(), 1);
7042 assert_eq!(result[0].predicate, "is_a");
7043 }
7044
7045 #[tokio::test]
7046 async fn auto_embed_enables_vector_search() {
7047 let provider = Arc::new(MockLLMProvider::new(4));
7048 let engine = CerememoryEngine::new(EngineConfig {
7049 llm_provider: Some(provider),
7050 ..Default::default()
7051 })
7052 .unwrap();
7053
7054 engine
7056 .encode_store(text_store_req("Searchable text", Some(StoreType::Episodic)))
7057 .await
7058 .unwrap();
7059
7060 let query = RecallQueryRequest {
7062 header: None,
7063 cue: RecallCue {
7064 embedding: Some(vec!["Searchable text".len() as f32, 1.0, 0.0, 0.0]),
7065 ..Default::default()
7066 },
7067 stores: None,
7068 limit: 10,
7069 min_fidelity: None,
7070 include_decayed: false,
7071 reconsolidate: false,
7072 activation_depth: 0,
7073 recall_mode: RecallMode::Perfect,
7074 };
7075
7076 let resp = engine.recall_query(query).await.unwrap();
7077 assert!(!resp.memories.is_empty());
7078 }
7079
7080 #[tokio::test]
7081 async fn image_recall_cue_uses_provider_embedding() {
7082 let provider = Arc::new(MockLLMProvider::new(4));
7083 let engine = CerememoryEngine::new(EngineConfig {
7084 llm_provider: Some(provider),
7085 ..Default::default()
7086 })
7087 .unwrap();
7088
7089 let image_bytes = vec![0x89, b'P', b'N', b'G', 0x0D, 0x0A, 0x1A, 0x0A, 1, 2, 3, 4];
7090 let resp = engine
7091 .encode_store(EncodeStoreRequest {
7092 header: None,
7093 content: MemoryContent {
7094 blocks: vec![ContentBlock {
7095 modality: Modality::Image,
7096 format: "image/png".to_string(),
7097 data: image_bytes.clone(),
7098 embedding: None,
7099 }],
7100 summary: None,
7101 },
7102 store: Some(StoreType::Episodic),
7103 emotion: None,
7104 context: None,
7105 metadata: None,
7106 associations: None,
7107 })
7108 .await
7109 .unwrap();
7110
7111 let recalled = engine
7112 .recall_query(RecallQueryRequest {
7113 header: None,
7114 cue: RecallCue {
7115 image: Some(image_bytes),
7116 ..Default::default()
7117 },
7118 stores: None,
7119 limit: 10,
7120 min_fidelity: None,
7121 include_decayed: false,
7122 reconsolidate: false,
7123 activation_depth: 0,
7124 recall_mode: RecallMode::Perfect,
7125 })
7126 .await
7127 .unwrap();
7128
7129 assert_eq!(recalled.memories.len(), 1);
7130 assert_eq!(recalled.memories[0].record.id, resp.record_id);
7131 }
7132
7133 #[tokio::test]
7134 async fn audio_recall_cue_uses_transcription() {
7135 let provider = Arc::new(MockLLMProvider::new(4));
7136 let engine = CerememoryEngine::new(EngineConfig {
7137 llm_provider: Some(provider),
7138 ..Default::default()
7139 })
7140 .unwrap();
7141
7142 let audio_bytes = b"RIFFabcdWAVErest".to_vec();
7143 let resp = engine
7144 .encode_store(EncodeStoreRequest {
7145 header: None,
7146 content: MemoryContent {
7147 blocks: vec![ContentBlock {
7148 modality: Modality::Audio,
7149 format: "audio/wav".to_string(),
7150 data: audio_bytes.clone(),
7151 embedding: None,
7152 }],
7153 summary: None,
7154 },
7155 store: Some(StoreType::Episodic),
7156 emotion: None,
7157 context: None,
7158 metadata: None,
7159 associations: None,
7160 })
7161 .await
7162 .unwrap();
7163
7164 let recalled = engine
7165 .recall_query(RecallQueryRequest {
7166 header: None,
7167 cue: RecallCue {
7168 audio: Some(audio_bytes),
7169 ..Default::default()
7170 },
7171 stores: None,
7172 limit: 10,
7173 min_fidelity: None,
7174 include_decayed: false,
7175 reconsolidate: false,
7176 activation_depth: 0,
7177 recall_mode: RecallMode::Perfect,
7178 })
7179 .await
7180 .unwrap();
7181
7182 assert_eq!(recalled.memories.len(), 1);
7183 assert_eq!(recalled.memories[0].record.id, resp.record_id);
7184 }
7185
7186 #[tokio::test]
7189 async fn consolidation_basic_migration() {
7190 let engine = make_engine().await;
7191 for i in 0..3 {
7192 engine
7193 .encode_store(text_store_req(
7194 &format!("Record {i}"),
7195 Some(StoreType::Episodic),
7196 ))
7197 .await
7198 .unwrap();
7199 }
7200
7201 let resp = engine
7202 .lifecycle_consolidate(ConsolidateRequest {
7203 header: None,
7204 strategy: ConsolidationStrategy::Incremental,
7205 min_age_hours: 0,
7206 min_access_count: 0,
7207 dry_run: false,
7208 })
7209 .await
7210 .unwrap();
7211
7212 assert_eq!(resp.records_processed, 3);
7213 assert_eq!(resp.records_migrated, 3);
7214 assert_eq!(resp.semantic_nodes_created, 3);
7215 }
7216
7217 #[tokio::test]
7218 async fn consolidation_dry_run() {
7219 let engine = make_engine().await;
7220 engine
7221 .encode_store(text_store_req("Dry run test", Some(StoreType::Episodic)))
7222 .await
7223 .unwrap();
7224
7225 let resp = engine
7226 .lifecycle_consolidate(ConsolidateRequest {
7227 header: None,
7228 strategy: ConsolidationStrategy::Incremental,
7229 min_age_hours: 0,
7230 min_access_count: 0,
7231 dry_run: true,
7232 })
7233 .await
7234 .unwrap();
7235
7236 assert_eq!(resp.records_migrated, 1);
7237 assert_eq!(engine.semantic.count().await.unwrap(), 0);
7239 }
7240
7241 #[tokio::test]
7242 async fn consolidation_with_llm_summarization() {
7243 let provider = Arc::new(MockLLMProvider::new(4));
7244 let engine = CerememoryEngine::new(EngineConfig {
7245 llm_provider: Some(provider),
7246 ..Default::default()
7247 })
7248 .unwrap();
7249
7250 engine
7251 .encode_store(text_store_req(
7252 "A very long piece of text that needs summarization for consolidation",
7253 Some(StoreType::Episodic),
7254 ))
7255 .await
7256 .unwrap();
7257
7258 let resp = engine
7259 .lifecycle_consolidate(ConsolidateRequest {
7260 header: None,
7261 strategy: ConsolidationStrategy::Incremental,
7262 min_age_hours: 0,
7263 min_access_count: 0,
7264 dry_run: false,
7265 })
7266 .await
7267 .unwrap();
7268
7269 assert_eq!(resp.records_migrated, 1);
7270
7271 let sem_ids = engine.semantic.list_ids().await.unwrap();
7273 assert_eq!(sem_ids.len(), 1);
7274 let sem_record = engine.semantic.get(&sem_ids[0]).await.unwrap().unwrap();
7275 assert!(sem_record.content.summary.is_some());
7276 }
7277
7278 #[tokio::test]
7279 async fn consolidation_with_relation_extraction() {
7280 let provider = Arc::new(MockLLMProvider::new(4));
7281 let engine = CerememoryEngine::new(EngineConfig {
7282 llm_provider: Some(provider),
7283 ..Default::default()
7284 })
7285 .unwrap();
7286
7287 engine
7288 .encode_store(text_store_req(
7289 "Cats are mammals",
7290 Some(StoreType::Episodic),
7291 ))
7292 .await
7293 .unwrap();
7294
7295 engine
7296 .lifecycle_consolidate(ConsolidateRequest {
7297 header: None,
7298 strategy: ConsolidationStrategy::Incremental,
7299 min_age_hours: 0,
7300 min_access_count: 0,
7301 dry_run: false,
7302 })
7303 .await
7304 .unwrap();
7305
7306 let sem_ids = engine.semantic.list_ids().await.unwrap();
7308 let sem_record = engine.semantic.get(&sem_ids[0]).await.unwrap().unwrap();
7309 if let serde_json::Value::Object(ref map) = sem_record.metadata {
7310 let relations = map.get("extracted_relations");
7311 assert!(relations.is_some());
7312 if let Some(serde_json::Value::Array(arr)) = relations {
7313 assert!(!arr.is_empty());
7314 }
7315 }
7316 }
7317
7318 #[tokio::test]
7319 async fn consolidation_no_llm_fallback() {
7320 let engine = make_engine().await;
7322 engine
7323 .encode_store(text_store_req(
7324 "Short text without LLM",
7325 Some(StoreType::Episodic),
7326 ))
7327 .await
7328 .unwrap();
7329
7330 let resp = engine
7331 .lifecycle_consolidate(ConsolidateRequest {
7332 header: None,
7333 strategy: ConsolidationStrategy::Incremental,
7334 min_age_hours: 0,
7335 min_access_count: 0,
7336 dry_run: false,
7337 })
7338 .await
7339 .unwrap();
7340
7341 assert_eq!(resp.records_migrated, 1);
7342 let sem_ids = engine.semantic.list_ids().await.unwrap();
7343 let sem_record = engine.semantic.get(&sem_ids[0]).await.unwrap().unwrap();
7344 assert!(sem_record.content.summary.is_some());
7345 assert_eq!(
7346 sem_record.content.summary.as_deref(),
7347 Some("Short text without LLM")
7348 );
7349 }
7350
7351 #[tokio::test]
7352 async fn consolidation_compression_metrics() {
7353 let engine = make_engine().await;
7354 for i in 0..5 {
7355 engine
7356 .encode_store(text_store_req(
7357 &format!("Test {i}"),
7358 Some(StoreType::Episodic),
7359 ))
7360 .await
7361 .unwrap();
7362 }
7363
7364 let resp = engine
7365 .lifecycle_consolidate(ConsolidateRequest {
7366 header: None,
7367 strategy: ConsolidationStrategy::Full,
7368 min_age_hours: 0,
7369 min_access_count: 0,
7370 dry_run: false,
7371 })
7372 .await
7373 .unwrap();
7374
7375 assert_eq!(resp.records_processed, 5);
7376 assert!(resp.records_migrated > 0);
7377 assert!(resp.records_processed > 0);
7379 }
7380
7381 #[tokio::test]
7382 async fn consolidation_min_age_filter() {
7383 let engine = make_engine().await;
7384 engine
7385 .encode_store(text_store_req("Fresh record", Some(StoreType::Episodic)))
7386 .await
7387 .unwrap();
7388
7389 let resp = engine
7390 .lifecycle_consolidate(ConsolidateRequest {
7391 header: None,
7392 strategy: ConsolidationStrategy::Incremental,
7393 min_age_hours: 24, min_access_count: 0,
7395 dry_run: false,
7396 })
7397 .await
7398 .unwrap();
7399
7400 assert_eq!(resp.records_migrated, 0);
7401 }
7402
7403 #[tokio::test]
7404 async fn consolidation_preserves_associations() {
7405 let engine = make_engine().await;
7406 let r1 = engine
7407 .encode_store(text_store_req("Memory A", Some(StoreType::Episodic)))
7408 .await
7409 .unwrap();
7410 let r2 = engine
7411 .encode_store(text_store_req("Memory B", Some(StoreType::Episodic)))
7412 .await
7413 .unwrap();
7414
7415 let assoc = Association {
7417 target_id: r2.record_id,
7418 association_type: AssociationType::Semantic,
7419 weight: 0.8,
7420 created_at: Utc::now(),
7421 last_co_activation: Utc::now(),
7422 };
7423 engine
7424 .coordinator
7425 .add_association(&r1.record_id, assoc)
7426 .await
7427 .unwrap();
7428
7429 engine
7430 .lifecycle_consolidate(ConsolidateRequest {
7431 header: None,
7432 strategy: ConsolidationStrategy::Incremental,
7433 min_age_hours: 0,
7434 min_access_count: 0,
7435 dry_run: false,
7436 })
7437 .await
7438 .unwrap();
7439
7440 let assocs = engine
7442 .coordinator
7443 .get_associations(&r1.record_id)
7444 .await
7445 .unwrap();
7446 assert!(assocs.len() >= 2); }
7448
7449 #[tokio::test]
7450 async fn duplicate_detection_with_embeddings() {
7451 let provider = Arc::new(MockLLMProvider::new(4));
7452 let engine = CerememoryEngine::new(EngineConfig {
7453 llm_provider: Some(provider),
7454 ..Default::default()
7455 })
7456 .unwrap();
7457
7458 engine
7460 .encode_store(text_store_req("Hello world", Some(StoreType::Episodic)))
7461 .await
7462 .unwrap();
7463 engine
7464 .encode_store(text_store_req("Hello worlds", Some(StoreType::Episodic)))
7465 .await
7466 .unwrap();
7467
7468 let initial_count = engine.episodic.count().await.unwrap();
7469
7470 let resp = engine
7471 .lifecycle_consolidate(ConsolidateRequest {
7472 header: None,
7473 strategy: ConsolidationStrategy::Full,
7474 min_age_hours: 0,
7475 min_access_count: 0,
7476 dry_run: false,
7477 })
7478 .await
7479 .unwrap();
7480
7481 assert!(resp.records_processed > 0 || initial_count > 0);
7484 }
7485}