Skip to main content

cerememory_engine/
lib.rs

1//! Cerememory Engine — the orchestrator.
2//!
3//! Assembles all stores, the decay engine, association engine,
4//! evolution engine, and the hippocampal coordinator into a
5//! unified system that implements the full CMP protocol.
6//!
7//! Phase 2 additions:
8//! - Tantivy full-text search integration (TextIndex)
9//! - Vector embedding similarity search (VectorIndex)
10//! - Background decay via tokio::spawn
11//! - Export/Import via cerememory-archive
12
13use std::collections::{BTreeMap, HashMap, HashSet};
14use std::sync::Arc;
15
16/// Default weight for automatically inferred sequential associations in batch encoding.
17const DEFAULT_BATCH_SEQUENTIAL_WEIGHT: f64 = 0.7;
18
19/// All five memory store types.
20const ALL_STORES: [StoreType; 5] = [
21    StoreType::Episodic,
22    StoreType::Semantic,
23    StoreType::Procedural,
24    StoreType::Emotional,
25    StoreType::Working,
26];
27
28/// Scope guard that records a histogram metric on drop (success or error path).
29struct TimerGuard {
30    name: &'static str,
31    start: std::time::Instant,
32}
33
34struct DreamGroup {
35    session_id: String,
36    raw_topic_id: Option<String>,
37    topic_hint: Option<String>,
38    inferred_topic: bool,
39    records: Vec<RawJournalRecord>,
40}
41
42impl TimerGuard {
43    fn new(name: &'static str) -> Self {
44        Self {
45            name,
46            start: std::time::Instant::now(),
47        }
48    }
49}
50
51impl Drop for TimerGuard {
52    fn drop(&mut self) {
53        metrics::histogram!(self.name).record(self.start.elapsed().as_secs_f64());
54    }
55}
56
57/// Dispatch a method call to the appropriate store based on StoreType.
58macro_rules! dispatch_store {
59    ($self:expr, $store_type:expr, $method:ident ( $($arg:expr),* )) => {
60        match $store_type {
61            StoreType::Episodic => $self.episodic.$method($($arg),*).await,
62            StoreType::Semantic => $self.semantic.$method($($arg),*).await,
63            StoreType::Procedural => $self.procedural.$method($($arg),*).await,
64            StoreType::Emotional => $self.emotional.$method($($arg),*).await,
65            StoreType::Working => $self.working.$method($($arg),*).await,
66        }
67    };
68}
69
70use chrono::Utc;
71use tracing::{info, warn};
72use uuid::Uuid;
73
74use cerememory_association::SpreadingActivationEngine;
75use cerememory_core::error::CerememoryError;
76use cerememory_core::protocol::*;
77use cerememory_core::traits::*;
78use cerememory_core::types::*;
79use cerememory_decay::{DecayParams, PowerLawDecayEngine};
80use cerememory_evolution::EvolutionEngine;
81use cerememory_index::text_index::TextIndex;
82use cerememory_index::vector_index::VectorIndex;
83use cerememory_index::HippocampalCoordinator;
84use cerememory_store_emotional::EmotionalStore;
85use cerememory_store_episodic::EpisodicStore;
86use cerememory_store_procedural::ProceduralStore;
87use cerememory_store_raw::RawJournalStore;
88use cerememory_store_semantic::SemanticStore;
89use cerememory_store_working::WorkingMemoryStore;
90
91/// Configuration for engine construction.
92pub struct EngineConfig {
93    pub raw_journal_path: Option<String>,
94    pub episodic_path: Option<String>,
95    pub semantic_path: Option<String>,
96    pub procedural_path: Option<String>,
97    pub emotional_path: Option<String>,
98    pub working_capacity: usize,
99    pub decay_params: DecayParams,
100    pub recall_mode: RecallMode,
101    /// Path for the Tantivy full-text search index directory.
102    pub index_path: Option<String>,
103    /// Path for the redb-backed vector index file.
104    pub vector_index_path: Option<String>,
105    /// If set, enables background decay at this interval (in seconds). None = disabled.
106    pub background_decay_interval_secs: Option<u64>,
107    /// If set, enables background dream processing at this interval (in seconds). None = disabled.
108    pub background_dream_interval_secs: Option<u64>,
109    /// Number of vectors at which to switch from brute-force to HNSW search.
110    /// Default: 1000. Set to `usize::MAX` to always use brute-force.
111    pub hnsw_threshold: usize,
112    /// Optional LLM provider for auto-embedding, summarization, and relation extraction.
113    /// When None, the engine operates without LLM capabilities (manual embeddings only).
114    pub llm_provider: Option<Arc<dyn LLMProvider>>,
115}
116
117impl Default for EngineConfig {
118    fn default() -> Self {
119        Self {
120            raw_journal_path: None,
121            episodic_path: None,
122            semantic_path: None,
123            procedural_path: None,
124            emotional_path: None,
125            working_capacity: 7,
126            decay_params: DecayParams::default(),
127            recall_mode: RecallMode::Human,
128            index_path: None,
129            vector_index_path: None,
130            background_decay_interval_secs: None,
131            background_dream_interval_secs: None,
132            hnsw_threshold: cerememory_index::vector_index::DEFAULT_HNSW_THRESHOLD,
133            llm_provider: None,
134        }
135    }
136}
137
138/// The main Cerememory engine — orchestrates all CMP operations.
139pub struct CerememoryEngine {
140    // Stores
141    raw_journal: RawJournalStore,
142    episodic: EpisodicStore,
143    semantic: SemanticStore,
144    procedural: ProceduralStore,
145    emotional: EmotionalStore,
146    working: WorkingMemoryStore,
147
148    // Engines
149    decay: PowerLawDecayEngine,
150    activation: SpreadingActivationEngine<HippocampalCoordinator>,
151    evolution: EvolutionEngine,
152
153    // Coordinator
154    coordinator: Arc<HippocampalCoordinator>,
155
156    // Indexes (Phase 2)
157    text_index: TextIndex,
158    vector_index: VectorIndex,
159
160    // State
161    recall_mode: tokio::sync::RwLock<RecallMode>,
162
163    // LLM provider (optional)
164    llm_provider: Option<Arc<dyn LLMProvider>>,
165
166    // Background decay
167    background_decay_interval_secs: Option<u64>,
168    decay_state: tokio::sync::Mutex<Option<BackgroundDecayState>>,
169    background_dream_interval_secs: Option<u64>,
170    dream_state: tokio::sync::Mutex<Option<BackgroundDreamState>>,
171}
172
173/// Tracks a running background decay task for clean shutdown.
174struct BackgroundDecayState {
175    shutdown_tx: tokio::sync::watch::Sender<bool>,
176    handle: tokio::task::JoinHandle<()>,
177}
178
179/// Tracks a running background dream task for clean shutdown.
180struct BackgroundDreamState {
181    shutdown_tx: tokio::sync::watch::Sender<bool>,
182    handle: tokio::task::JoinHandle<()>,
183}
184
185impl CerememoryEngine {
186    /// Create a new engine. Uses in-memory stores when paths are None.
187    pub fn new(config: EngineConfig) -> Result<Self, CerememoryError> {
188        let raw_journal = match &config.raw_journal_path {
189            Some(p) => RawJournalStore::open(p)?,
190            None => RawJournalStore::open_in_memory()?,
191        };
192
193        let episodic = match &config.episodic_path {
194            Some(p) => EpisodicStore::open(p)?,
195            None => EpisodicStore::open_in_memory()?,
196        };
197
198        let semantic = match &config.semantic_path {
199            Some(p) => SemanticStore::open(p)?,
200            None => SemanticStore::open_in_memory()?,
201        };
202
203        let text_index = match &config.index_path {
204            Some(p) => match TextIndex::open(p) {
205                Ok(idx) => idx,
206                Err(e) => {
207                    warn!(
208                        error = %e,
209                        path = %p,
210                        "Corrupted text index detected, recreating (data will be repopulated via rebuild_coordinator)"
211                    );
212                    // Attempt to remove corrupted index directory and recreate
213                    let _ = std::fs::remove_dir_all(p);
214                    TextIndex::open(p).map_err(|e2| {
215                        CerememoryError::Storage(format!(
216                            "Failed to recreate text index after corruption: {e2}"
217                        ))
218                    })?
219                }
220            },
221            None => TextIndex::open_in_memory()?,
222        };
223
224        let vector_index = match &config.vector_index_path {
225            Some(p) => VectorIndex::open_with_threshold(p, config.hnsw_threshold)?,
226            None => VectorIndex::open_in_memory_with_threshold(config.hnsw_threshold)?,
227        };
228
229        let procedural = match &config.procedural_path {
230            Some(p) => ProceduralStore::open(p)?,
231            None => ProceduralStore::open_in_memory()?,
232        };
233
234        let emotional = match &config.emotional_path {
235            Some(p) => EmotionalStore::open(p)?,
236            None => EmotionalStore::open_in_memory()?,
237        };
238
239        let coordinator = Arc::new(HippocampalCoordinator::new());
240        let activation = SpreadingActivationEngine::new(Arc::clone(&coordinator));
241
242        Ok(Self {
243            raw_journal,
244            episodic,
245            semantic,
246            procedural,
247            emotional,
248            working: WorkingMemoryStore::with_capacity(config.working_capacity.max(1)),
249            decay: PowerLawDecayEngine::new(config.decay_params),
250            activation,
251            evolution: EvolutionEngine::new(),
252            coordinator,
253            text_index,
254            vector_index,
255            recall_mode: tokio::sync::RwLock::new(config.recall_mode),
256            llm_provider: config.llm_provider,
257            background_decay_interval_secs: config.background_decay_interval_secs,
258            decay_state: tokio::sync::Mutex::new(None),
259            background_dream_interval_secs: config.background_dream_interval_secs,
260            dream_state: tokio::sync::Mutex::new(None),
261        })
262    }
263
264    /// Create an engine with all in-memory stores (for testing).
265    pub fn in_memory() -> Result<Self, CerememoryError> {
266        Self::new(EngineConfig::default())
267    }
268
269    /// Start the background decay task. Requires the engine to be wrapped in Arc.
270    /// No-op if already running or if `background_decay_interval_secs` is None.
271    pub fn start_background_decay(self: &Arc<Self>) {
272        let Some(interval_secs) = self.background_decay_interval_secs else {
273            return;
274        };
275
276        // Prevent double start
277        let mut guard = match self.decay_state.try_lock() {
278            Ok(g) => g,
279            Err(_) => return,
280        };
281        if guard.is_some() {
282            return; // Already running
283        }
284
285        let (tx, rx) = tokio::sync::watch::channel(false);
286        let engine = Arc::clone(self);
287
288        let handle = tokio::spawn(async move {
289            let mut backoff_secs = 1u64;
290            const MAX_BACKOFF_SECS: u64 = 60;
291
292            loop {
293                let engine_ref = Arc::clone(&engine);
294                let mut rx_ref = rx.clone();
295
296                let result = tokio::spawn(async move {
297                    let mut interval =
298                        tokio::time::interval(std::time::Duration::from_secs(interval_secs));
299                    interval.tick().await; // skip first immediate tick
300
301                    loop {
302                        tokio::select! {
303                            _ = interval.tick() => {
304                                let req = DecayTickRequest {
305                                    header: None,
306                                    tick_duration_seconds: Some(interval_secs.min(u32::MAX as u64) as u32),
307                                };
308                                if let Err(e) = engine_ref.lifecycle_decay_tick(req).await {
309                                    warn!(error = %e, "Background decay tick failed");
310                                }
311                            }
312                            _ = rx_ref.changed() => {
313                                if *rx_ref.borrow() {
314                                    info!("Background decay stopped");
315                                    return;
316                                }
317                            }
318                        }
319                    }
320                })
321                .await;
322
323                // Check if we got a shutdown signal
324                if *rx.borrow() {
325                    return;
326                }
327
328                match result {
329                    Ok(()) => return, // Clean exit
330                    Err(e) => {
331                        tracing::error!(
332                            error = %e,
333                            backoff_secs,
334                            "Background decay task panicked, restarting"
335                        );
336                        metrics::counter!("cerememory_decay_panics_total").increment(1);
337                        tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)).await;
338                        backoff_secs = (backoff_secs * 2).min(MAX_BACKOFF_SECS);
339                    }
340                }
341            }
342        });
343
344        *guard = Some(BackgroundDecayState {
345            shutdown_tx: tx,
346            handle,
347        });
348    }
349
350    /// Stop the background decay task and wait for it to finish.
351    pub async fn stop_background_decay(&self) {
352        let state = {
353            let mut guard = self.decay_state.lock().await;
354            guard.take()
355        };
356        if let Some(state) = state {
357            let _ = state.shutdown_tx.send(true);
358            let _ = state.handle.await;
359        }
360    }
361
362    /// Check if background decay is running.
363    pub async fn is_background_decay_enabled(&self) -> bool {
364        self.decay_state.lock().await.is_some()
365    }
366
367    /// Start the background dream task. Requires the engine to be wrapped in Arc.
368    /// No-op if already running or if `background_dream_interval_secs` is None.
369    pub fn start_background_dream(self: &Arc<Self>) {
370        let Some(interval_secs) = self.background_dream_interval_secs else {
371            return;
372        };
373
374        let mut guard = match self.dream_state.try_lock() {
375            Ok(g) => g,
376            Err(_) => return,
377        };
378        if guard.is_some() {
379            return;
380        }
381
382        let (tx, rx) = tokio::sync::watch::channel(false);
383        let engine = Arc::clone(self);
384
385        let handle = tokio::spawn(async move {
386            let mut backoff_secs = 1u64;
387            const MAX_BACKOFF_SECS: u64 = 300;
388
389            loop {
390                let engine_ref = Arc::clone(&engine);
391                let mut rx_ref = rx.clone();
392
393                let result = tokio::spawn(async move {
394                    let mut interval =
395                        tokio::time::interval(std::time::Duration::from_secs(interval_secs));
396                    interval.tick().await;
397
398                    loop {
399                        tokio::select! {
400                            _ = interval.tick() => {
401                                let req = DreamTickRequest {
402                                    header: None,
403                                    session_id: None,
404                                    dry_run: false,
405                                    max_groups: 50,
406                                    include_private_scratch: false,
407                                    include_sealed: false,
408                                    promote_semantic: true,
409                                    secrecy_levels: None,
410                                };
411                                if let Err(e) = engine_ref.lifecycle_dream_tick(req).await {
412                                    warn!(error = %e, "Background dream tick failed");
413                                }
414                            }
415                            _ = rx_ref.changed() => {
416                                if *rx_ref.borrow() {
417                                    info!("Background dream processing stopped");
418                                    return;
419                                }
420                            }
421                        }
422                    }
423                })
424                .await;
425
426                if *rx.borrow() {
427                    return;
428                }
429
430                match result {
431                    Ok(()) => return,
432                    Err(e) => {
433                        tracing::error!(
434                            error = %e,
435                            backoff_secs,
436                            "Background dream task panicked, restarting"
437                        );
438                        tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)).await;
439                        backoff_secs = (backoff_secs * 2).min(MAX_BACKOFF_SECS);
440                    }
441                }
442            }
443        });
444
445        *guard = Some(BackgroundDreamState {
446            shutdown_tx: tx,
447            handle,
448        });
449    }
450
451    /// Stop the background dream task and wait for it to finish.
452    pub async fn stop_background_dream(&self) {
453        let state = {
454            let mut guard = self.dream_state.lock().await;
455            guard.take()
456        };
457
458        if let Some(state) = state {
459            let _ = state.shutdown_tx.send(true);
460            if let Err(e) = state.handle.await {
461                tracing::error!(error = %e, "Background dream task join failed");
462            }
463        }
464    }
465
466    /// Check if background dream processing is running.
467    pub async fn is_background_dream_enabled(&self) -> bool {
468        self.dream_state.lock().await.is_some()
469    }
470
471    /// Rebuild the hippocampal coordinator and all indexes from persistent stores.
472    /// Must be called after construction when opening existing stores.
473    pub async fn rebuild_coordinator(&self) -> Result<(), CerememoryError> {
474        // Clear vector index to avoid stale entries from previous runs
475        self.vector_index.clear()?;
476
477        let mut entries = Vec::new();
478        let mut text_records = Vec::new();
479
480        for store_type in [
481            StoreType::Episodic,
482            StoreType::Semantic,
483            StoreType::Procedural,
484            StoreType::Emotional,
485        ] {
486            let records = dispatch_store!(self, store_type, get_all())?;
487            for record in records {
488                entries.push((record.id, store_type, record.associations.clone()));
489
490                // Collect searchable text for full-text index rebuild.
491                let text = Self::build_searchable_text(&record).unwrap_or_default();
492                if Self::has_indexable_content(&text, &record) {
493                    text_records.push((
494                        record.id,
495                        store_type,
496                        text,
497                        record.content.summary.clone(),
498                    ));
499                }
500
501                if let Some(embedding) = Self::primary_embedding(&record) {
502                    let _ = self.vector_index.upsert(record.id, embedding);
503                }
504            }
505        }
506
507        self.coordinator.rebuild(entries).await;
508        self.text_index.rebuild(&text_records)?;
509        self.vector_index.rebuild_hnsw()?;
510
511        info!(
512            records = self.coordinator.total_records().await,
513            hnsw_active = self.vector_index.is_hnsw_active(),
514            "Coordinator and indexes rebuilt from persistent stores"
515        );
516        Ok(())
517    }
518
519    // ─── Index helpers ─────────────────────────────────────────────
520
521    /// Returns `true` if the record has non-empty searchable text or a non-empty summary,
522    /// meaning it should be indexed in the full-text index.
523    fn has_indexable_content(searchable_text: &str, record: &MemoryRecord) -> bool {
524        !searchable_text.is_empty()
525            || record
526                .content
527                .summary
528                .as_deref()
529                .map(str::trim)
530                .is_some_and(|summary| !summary.is_empty())
531    }
532
533    /// Build a single searchable text body from textual and structured blocks.
534    fn build_searchable_text(record: &MemoryRecord) -> Option<String> {
535        let mut chunks = Vec::new();
536
537        for block in &record.content.blocks {
538            match block.modality {
539                Modality::Text => match std::str::from_utf8(&block.data) {
540                    Ok(text) => {
541                        let trimmed = text.trim();
542                        if !trimmed.is_empty() {
543                            chunks.push(trimmed.to_string());
544                        }
545                    }
546                    Err(error) => {
547                        warn!(
548                            error = %error,
549                            record_id = %record.id,
550                            "Skipping invalid UTF-8 text block during indexing"
551                        );
552                    }
553                },
554                Modality::Structured => {
555                    match cerememory_index::structured_index::flatten_json_to_text(&block.data) {
556                        Ok(flat) if !flat.is_empty() => chunks.push(flat),
557                        Ok(_) => {}
558                        Err(error) => {
559                            warn!(
560                                error = %error,
561                                record_id = %record.id,
562                                "Skipping invalid structured block during indexing"
563                            );
564                        }
565                    }
566                }
567                _ => {}
568            }
569        }
570
571        if chunks.is_empty() {
572            None
573        } else {
574            Some(chunks.join("\n"))
575        }
576    }
577
578    fn primary_embedding(record: &MemoryRecord) -> Option<&[f32]> {
579        record
580            .content
581            .blocks
582            .iter()
583            .find_map(|block| block.embedding.as_deref())
584    }
585
586    fn detect_image_format(data: &[u8]) -> Option<&'static str> {
587        if data.len() >= 8 && data.starts_with(b"\x89PNG\r\n\x1a\n") {
588            return Some("png");
589        }
590        if data.len() >= 3 && data.starts_with(&[0xFF, 0xD8, 0xFF]) {
591            return Some("jpeg");
592        }
593        if data.len() >= 6 && (data.starts_with(b"GIF87a") || data.starts_with(b"GIF89a")) {
594            return Some("gif");
595        }
596        if data.len() >= 12 && data.starts_with(b"RIFF") && &data[8..12] == b"WEBP" {
597            return Some("webp");
598        }
599        None
600    }
601
602    fn detect_audio_format(data: &[u8]) -> Option<&'static str> {
603        if data.len() >= 12 && data.starts_with(b"RIFF") && &data[8..12] == b"WAVE" {
604            return Some("wav");
605        }
606        if data.len() >= 4 && data.starts_with(b"fLaC") {
607            return Some("flac");
608        }
609        if data.len() >= 4 && data.starts_with(b"OggS") {
610            return Some("ogg");
611        }
612        if data.len() >= 3 && data.starts_with(b"ID3") {
613            return Some("mp3");
614        }
615        if data.len() >= 2 && data[0] == 0xFF && (data[1] & 0xE0) == 0xE0 {
616            return Some("mp3");
617        }
618        if data.len() >= 12 && &data[4..8] == b"ftyp" {
619            return Some("mp4");
620        }
621        if data.len() >= 4 && data.starts_with(&[0x1A, 0x45, 0xDF, 0xA3]) {
622            return Some("webm");
623        }
624        None
625    }
626
627    async fn resolve_recall_cues(
628        &self,
629        cue: &RecallCue,
630    ) -> Result<(Option<String>, Option<Vec<f32>>), CerememoryError> {
631        let mut text = cue
632            .text
633            .as_deref()
634            .map(str::trim)
635            .filter(|value| !value.is_empty())
636            .map(str::to_owned);
637        let mut embedding = cue.embedding.clone();
638
639        if let Some(image) = cue.image.as_ref() {
640            if image.is_empty() {
641                return Err(CerememoryError::Validation(
642                    "Recall image cue must not be empty".to_string(),
643                ));
644            }
645            if image.len() > MAX_IMAGE_SIZE {
646                return Err(CerememoryError::ContentTooLarge {
647                    size: image.len(),
648                    limit: MAX_IMAGE_SIZE,
649                });
650            }
651
652            if embedding.is_none() {
653                let provider = self.llm_provider.as_ref().ok_or_else(|| {
654                    CerememoryError::ModalityUnsupported(
655                        "Image recall requires an LLM provider with image embedding support"
656                            .to_string(),
657                    )
658                })?;
659                let caps = provider.capabilities();
660                if !caps.image_embedding {
661                    return Err(CerememoryError::ModalityUnsupported(
662                        "Configured LLM provider does not support image recall".to_string(),
663                    ));
664                }
665
666                let format = Self::detect_image_format(image).ok_or_else(|| {
667                    CerememoryError::Validation("Unsupported image recall cue format".to_string())
668                })?;
669                let generated = provider.embed_image(image, format).await?;
670                if generated.is_empty() {
671                    return Err(CerememoryError::Internal(
672                        "Image recall provider returned an empty embedding".to_string(),
673                    ));
674                }
675                embedding = Some(generated);
676            }
677        }
678
679        if let Some(audio) = cue.audio.as_ref() {
680            if audio.is_empty() {
681                return Err(CerememoryError::Validation(
682                    "Recall audio cue must not be empty".to_string(),
683                ));
684            }
685            if audio.len() > MAX_AUDIO_SIZE {
686                return Err(CerememoryError::ContentTooLarge {
687                    size: audio.len(),
688                    limit: MAX_AUDIO_SIZE,
689                });
690            }
691
692            let provider = self.llm_provider.as_ref().ok_or_else(|| {
693                CerememoryError::ModalityUnsupported(
694                    "Audio recall requires an LLM provider with transcription support".to_string(),
695                )
696            })?;
697            let caps = provider.capabilities();
698            if !caps.audio_transcription {
699                return Err(CerememoryError::ModalityUnsupported(
700                    "Configured LLM provider does not support audio recall".to_string(),
701                ));
702            }
703
704            let format = Self::detect_audio_format(audio).ok_or_else(|| {
705                CerememoryError::Validation("Unsupported audio recall cue format".to_string())
706            })?;
707            let transcript = provider.transcribe_audio(audio, format).await?;
708            let transcript = transcript.trim();
709            if transcript.is_empty() {
710                return Err(CerememoryError::Validation(
711                    "Audio recall cue produced an empty transcript".to_string(),
712                ));
713            }
714            match &mut text {
715                Some(existing) => {
716                    existing.push('\n');
717                    existing.push_str(transcript);
718                }
719                None => text = Some(transcript.to_string()),
720            }
721            if embedding.is_none() && caps.text_embedding {
722                let generated = provider.embed(transcript).await?;
723                if !generated.is_empty() {
724                    embedding = Some(generated);
725                }
726            }
727        }
728
729        if embedding.is_none() {
730            if let (Some(provider), Some(query_text)) =
731                (self.llm_provider.as_ref(), text.as_deref())
732            {
733                if provider.capabilities().text_embedding {
734                    match provider.embed(query_text).await {
735                        Ok(generated) if !generated.is_empty() => embedding = Some(generated),
736                        Ok(_) => warn!("Text recall cue embedding returned an empty vector"),
737                        Err(error) => warn!(error = %error, "Failed to embed text recall cue"),
738                    }
739                }
740            }
741        }
742
743        Ok((text, embedding))
744    }
745
746    /// Index a record's text content, structured data, and embeddings.
747    /// Uses only the first embedding found (one vector per record_id).
748    fn index_record(&self, record: &MemoryRecord) -> Result<(), CerememoryError> {
749        let text = Self::build_searchable_text(record).unwrap_or_default();
750        if Self::has_indexable_content(&text, record) {
751            self.text_index.add(
752                record.id,
753                record.store,
754                &text,
755                record.content.summary.as_deref(),
756            )?;
757        }
758
759        // Vector index — use first embedding found (one vector per record_id)
760        if let Some(embedding) = Self::primary_embedding(record) {
761            self.vector_index.upsert(record.id, embedding)?;
762        }
763
764        Ok(())
765    }
766
767    /// Remove a record from all indexes.
768    fn unindex_record(&self, id: Uuid) {
769        let _ = self.text_index.remove(id);
770        let _ = self.vector_index.remove(id);
771    }
772
773    // ─── Store routing ───────────────────────────────────────────────
774
775    fn route_store(&self, content: &MemoryContent) -> StoreType {
776        if content.summary.is_some() {
777            StoreType::Semantic
778        } else {
779            StoreType::Episodic
780        }
781    }
782
783    async fn get_store_record(
784        &self,
785        id: &Uuid,
786    ) -> Result<Option<(MemoryRecord, StoreType)>, CerememoryError> {
787        // Check coordinator first for store type hint
788        if let Some(st) = self.coordinator.get_record_store_type(id).await? {
789            let record = dispatch_store!(self, st, get(id))?;
790            return Ok(record.map(|r| (r, st)));
791        }
792
793        // Fallback: search all stores
794        for st in [
795            StoreType::Working,
796            StoreType::Episodic,
797            StoreType::Semantic,
798            StoreType::Procedural,
799            StoreType::Emotional,
800        ] {
801            if let Some(r) = dispatch_store!(self, st, get(id))? {
802                return Ok(Some((r, st)));
803            }
804        }
805        Ok(None)
806    }
807
808    fn build_record_metadata(
809        context: Option<EncodeContext>,
810        metadata: Option<serde_json::Value>,
811    ) -> serde_json::Value {
812        let context_value = context.and_then(|ctx| serde_json::to_value(ctx).ok());
813        match (metadata, context_value) {
814            (Some(serde_json::Value::Object(mut map)), Some(context)) => {
815                map.insert("_context".to_string(), context);
816                serde_json::Value::Object(map)
817            }
818            (Some(other), Some(context)) => serde_json::json!({
819                "_metadata": other,
820                "_context": context,
821            }),
822            (Some(metadata), None) => metadata,
823            (None, Some(context)) => serde_json::json!({ "_context": context }),
824            (None, None) => serde_json::Value::Object(serde_json::Map::new()),
825        }
826    }
827
828    async fn persist_associations_for_record(
829        &self,
830        record_id: &Uuid,
831        store_type: StoreType,
832        associations: Vec<Association>,
833    ) -> Result<(), CerememoryError> {
834        dispatch_store!(
835            self,
836            store_type,
837            replace_associations(record_id, associations.clone())
838        )?;
839        self.coordinator
840            .update_associations(record_id, associations)
841            .await?;
842        Ok(())
843    }
844
845    async fn add_persisted_association(
846        &self,
847        record_id: &Uuid,
848        association: Association,
849    ) -> Result<(), CerememoryError> {
850        let Some((record, store_type)) = self.get_store_record(record_id).await? else {
851            return Err(CerememoryError::RecordNotFound(record_id.to_string()));
852        };
853
854        let mut associations = self.coordinator.get_associations(record_id).await?;
855        for persisted in &record.associations {
856            if !associations.iter().any(|existing| {
857                existing.target_id == persisted.target_id
858                    && existing.association_type == persisted.association_type
859            }) {
860                associations.push(persisted.clone());
861            }
862        }
863
864        if associations.iter().any(|existing| {
865            existing.target_id == association.target_id
866                && existing.association_type == association.association_type
867        }) {
868            return Ok(());
869        }
870
871        associations.push(association);
872        self.persist_associations_for_record(record_id, store_type, associations)
873            .await
874    }
875
876    async fn remove_deleted_targets_from_records(
877        &self,
878        deleted_ids: &HashSet<Uuid>,
879    ) -> Result<(), CerememoryError> {
880        if deleted_ids.is_empty() {
881            return Ok(());
882        }
883
884        for store_type in ALL_STORES {
885            let records = dispatch_store!(self, store_type, get_all())?;
886            for record in records {
887                let filtered: Vec<_> = record
888                    .associations
889                    .iter()
890                    .filter(|association| !deleted_ids.contains(&association.target_id))
891                    .cloned()
892                    .collect();
893                if filtered.len() != record.associations.len() {
894                    self.persist_associations_for_record(&record.id, store_type, filtered)
895                        .await?;
896                }
897            }
898        }
899
900        Ok(())
901    }
902
903    async fn cleanup_deleted_records(
904        &self,
905        deleted_records: &[(Uuid, StoreType)],
906    ) -> Result<(), CerememoryError> {
907        if deleted_records.is_empty() {
908            return Ok(());
909        }
910
911        let mut deleted_ids = HashSet::with_capacity(deleted_records.len());
912        for (record_id, _) in deleted_records {
913            deleted_ids.insert(*record_id);
914            self.coordinator.unregister(record_id).await;
915            self.unindex_record(*record_id);
916        }
917        self.remove_deleted_targets_from_records(&deleted_ids).await
918    }
919
920    fn normalize_export_format(format: &str) -> Result<&'static str, CerememoryError> {
921        let format = format.trim();
922        if format.eq_ignore_ascii_case("cma") || format.eq_ignore_ascii_case("jsonl") {
923            Ok("cma")
924        } else {
925            Err(CerememoryError::Validation(format!(
926                "Unsupported export format '{format}'. Valid options: cma, jsonl"
927            )))
928        }
929    }
930
931    async fn delete_records(
932        &self,
933        delete_targets: Vec<(Uuid, StoreType)>,
934    ) -> Result<u32, CerememoryError> {
935        let mut deleted_records = Vec::new();
936        for (id, store_type) in delete_targets {
937            if dispatch_store!(self, store_type, delete(&id))? {
938                deleted_records.push((id, store_type));
939            }
940        }
941        let deleted = deleted_records.len() as u32;
942        self.cleanup_deleted_records(&deleted_records).await?;
943        Ok(deleted)
944    }
945
946    async fn clear_all_records(&self) -> Result<u32, CerememoryError> {
947        let mut delete_targets = Vec::new();
948        for store_type in ALL_STORES {
949            for id in dispatch_store!(self, store_type, list_ids())? {
950                delete_targets.push((id, store_type));
951            }
952        }
953        self.delete_records(delete_targets).await
954    }
955
956    async fn restore_records(&self, records: &[MemoryRecord]) -> Result<(), CerememoryError> {
957        self.clear_all_records().await?;
958        for record in records {
959            let store_type = record.store;
960            dispatch_store!(self, store_type, store(record.clone()))?;
961            self.coordinator
962                .register(record.id, store_type, record.associations.clone())
963                .await;
964            let _ = self.index_record(record);
965        }
966        Ok(())
967    }
968
969    async fn import_records_with_conflict_resolution(
970        &self,
971        records: Vec<MemoryRecord>,
972        conflict_resolution: ConflictResolution,
973    ) -> Result<u32, CerememoryError> {
974        let mut imported = 0u32;
975
976        for record in records {
977            let store_type = record.store;
978            let mut replaced_cross_store: Option<StoreType> = None;
979
980            if let Some((existing, existing_store)) = self.get_store_record(&record.id).await? {
981                match conflict_resolution {
982                    ConflictResolution::KeepExisting => continue,
983                    ConflictResolution::KeepImported => {
984                        if existing_store != store_type {
985                            replaced_cross_store = Some(existing_store);
986                        }
987                    }
988                    ConflictResolution::KeepNewer => {
989                        if record.updated_at <= existing.updated_at {
990                            continue;
991                        }
992                        if existing_store != store_type {
993                            replaced_cross_store = Some(existing_store);
994                        }
995                    }
996                }
997            }
998
999            dispatch_store!(self, store_type, store(record.clone()))?;
1000            if let Some(existing_store) = replaced_cross_store {
1001                if !dispatch_store!(self, existing_store, delete(&record.id))? {
1002                    if let Err(e) = dispatch_store!(self, store_type, delete(&record.id)) {
1003                        warn!(
1004                            record_id = %record.id,
1005                            store = %store_type,
1006                            error = %e,
1007                            "Failed to clean up newly stored record during import conflict rollback"
1008                        );
1009                    }
1010                    return Err(CerememoryError::ImportConflict(format!(
1011                        "Failed to replace cross-store record {} from {} to {}",
1012                        record.id, existing_store, store_type
1013                    )));
1014                }
1015            }
1016            self.coordinator
1017                .register(record.id, store_type, record.associations.clone())
1018                .await;
1019            if let Err(e) = self.index_record(&record) {
1020                warn!(error = %e, record_id = %record.id, "Failed to index imported record");
1021            }
1022            imported += 1;
1023        }
1024
1025        Ok(imported)
1026    }
1027
1028    async fn collect_all_raw_journal_records(
1029        &self,
1030    ) -> Result<Vec<RawJournalRecord>, CerememoryError> {
1031        self.raw_journal.get_all().await
1032    }
1033
1034    async fn clear_raw_journal(&self) -> Result<u32, CerememoryError> {
1035        let records = self.raw_journal.get_all().await?;
1036        let mut deleted = 0u32;
1037        for record in records {
1038            if self.raw_journal.delete(&record.id).await? {
1039                deleted += 1;
1040            }
1041        }
1042        Ok(deleted)
1043    }
1044
1045    async fn restore_raw_journal(
1046        &self,
1047        raw_records: &[RawJournalRecord],
1048    ) -> Result<(), CerememoryError> {
1049        self.clear_raw_journal().await?;
1050        for record in raw_records {
1051            self.raw_journal.append(record.clone()).await?;
1052        }
1053        Ok(())
1054    }
1055
1056    async fn import_raw_records_with_conflict_resolution(
1057        &self,
1058        raw_records: Vec<RawJournalRecord>,
1059        conflict_resolution: ConflictResolution,
1060    ) -> Result<u32, CerememoryError> {
1061        let mut imported = 0u32;
1062        for record in raw_records {
1063            if let Some(existing) = self.raw_journal.get(&record.id).await? {
1064                match conflict_resolution {
1065                    ConflictResolution::KeepExisting => continue,
1066                    ConflictResolution::KeepImported => {
1067                        self.raw_journal.update(record.clone()).await?;
1068                        imported += 1;
1069                    }
1070                    ConflictResolution::KeepNewer => {
1071                        if record.updated_at > existing.updated_at {
1072                            self.raw_journal.update(record.clone()).await?;
1073                            imported += 1;
1074                        }
1075                    }
1076                }
1077            } else {
1078                self.raw_journal.append(record).await?;
1079                imported += 1;
1080            }
1081        }
1082        Ok(imported)
1083    }
1084
1085    /// Append a verbatim preserved record to the raw journal.
1086    pub async fn append_raw_journal(
1087        &self,
1088        record: RawJournalRecord,
1089    ) -> Result<Uuid, CerememoryError> {
1090        self.raw_journal.append(record).await
1091    }
1092
1093    /// Retrieve a raw journal record by id.
1094    pub async fn get_raw_journal_record(
1095        &self,
1096        id: &Uuid,
1097    ) -> Result<Option<RawJournalRecord>, CerememoryError> {
1098        self.raw_journal.get(id).await
1099    }
1100
1101    /// Retrieve all raw journal records for a session in chronological order.
1102    pub async fn query_raw_journal_by_session(
1103        &self,
1104        session_id: &str,
1105    ) -> Result<Vec<RawJournalRecord>, CerememoryError> {
1106        self.raw_journal.query_session(session_id).await
1107    }
1108
1109    /// Retrieve raw journal records for a session within a time range.
1110    pub async fn query_raw_journal_session_range(
1111        &self,
1112        session_id: &str,
1113        start: chrono::DateTime<Utc>,
1114        end: chrono::DateTime<Utc>,
1115    ) -> Result<Vec<RawJournalRecord>, CerememoryError> {
1116        self.raw_journal
1117            .query_session_range(session_id, start, end)
1118            .await
1119    }
1120
1121    /// Return the number of raw journal records.
1122    pub async fn raw_journal_count(&self) -> Result<usize, CerememoryError> {
1123        self.raw_journal.count().await
1124    }
1125
1126    fn raw_query_allowed_visibility(
1127        record: &RawJournalRecord,
1128        include_private_scratch: bool,
1129        include_sealed: bool,
1130    ) -> bool {
1131        match record.visibility {
1132            RawVisibility::Normal => true,
1133            RawVisibility::PrivateScratch => include_private_scratch,
1134            RawVisibility::Sealed => include_sealed,
1135        }
1136    }
1137
1138    fn raw_query_allowed_secrecy(
1139        record: &RawJournalRecord,
1140        secrecy_levels: Option<&[SecrecyLevel]>,
1141    ) -> bool {
1142        match secrecy_levels {
1143            Some(levels) => levels.contains(&record.secrecy_level),
1144            None => matches!(
1145                record.secrecy_level,
1146                SecrecyLevel::Public | SecrecyLevel::Sensitive
1147            ),
1148        }
1149    }
1150
1151    fn raw_record_processed(record: &RawJournalRecord) -> bool {
1152        record
1153            .metadata
1154            .get("_dream")
1155            .and_then(|value| value.get("processed_at"))
1156            .and_then(|value| value.as_str())
1157            .is_some()
1158    }
1159
1160    fn mark_raw_record_dream_processed(
1161        record: &mut RawJournalRecord,
1162        summary_id: Uuid,
1163        semantic_id: Option<Uuid>,
1164        dreamed_at: chrono::DateTime<Utc>,
1165    ) {
1166        record.updated_at = dreamed_at;
1167        if !record.derived_memory_ids.contains(&summary_id) {
1168            record.derived_memory_ids.push(summary_id);
1169        }
1170        if let Some(semantic_id) = semantic_id {
1171            if !record.derived_memory_ids.contains(&semantic_id) {
1172                record.derived_memory_ids.push(semantic_id);
1173            }
1174        }
1175
1176        let metadata = if let serde_json::Value::Object(map) = &mut record.metadata {
1177            map
1178        } else {
1179            record.metadata = serde_json::json!({});
1180            match &mut record.metadata {
1181                serde_json::Value::Object(map) => map,
1182                _ => unreachable!("metadata initialized as object"),
1183            }
1184        };
1185
1186        let dream = metadata
1187            .entry("_dream".to_string())
1188            .or_insert_with(|| serde_json::json!({}));
1189        if let serde_json::Value::Object(map) = dream {
1190            map.insert(
1191                "processed_at".to_string(),
1192                serde_json::Value::String(dreamed_at.to_rfc3339()),
1193            );
1194            map.insert(
1195                "last_summary_id".to_string(),
1196                serde_json::Value::String(summary_id.to_string()),
1197            );
1198        }
1199
1200        let derived = metadata
1201            .entry("_derived".to_string())
1202            .or_insert_with(|| serde_json::json!({}));
1203        if let serde_json::Value::Object(map) = derived {
1204            let summary_entry = map
1205                .entry("episodic_summary_ids".to_string())
1206                .or_insert_with(|| serde_json::json!([]));
1207            if let serde_json::Value::Array(ids) = summary_entry {
1208                let summary_value = serde_json::Value::String(summary_id.to_string());
1209                if !ids.iter().any(|existing| existing == &summary_value) {
1210                    ids.push(summary_value);
1211                }
1212            }
1213            if let Some(semantic_id) = semantic_id {
1214                let semantic_entry = map
1215                    .entry("semantic_ids".to_string())
1216                    .or_insert_with(|| serde_json::json!([]));
1217                if let serde_json::Value::Array(ids) = semantic_entry {
1218                    let semantic_value = serde_json::Value::String(semantic_id.to_string());
1219                    if !ids.iter().any(|existing| existing == &semantic_value) {
1220                        ids.push(semantic_value);
1221                    }
1222                }
1223            }
1224        }
1225    }
1226
1227    fn dream_stat_u64(summary_stats: &serde_json::Value, key: &str) -> u64 {
1228        summary_stats
1229            .get(key)
1230            .and_then(|value| value.as_u64())
1231            .unwrap_or(0)
1232    }
1233
1234    fn should_promote_dream_group(
1235        raw_topic_id: Option<&str>,
1236        topic_hint: Option<&str>,
1237        summary_stats: &serde_json::Value,
1238    ) -> bool {
1239        let normal_records = Self::dream_stat_u64(summary_stats, "normal_records");
1240        let has_topic_signal = raw_topic_id.filter(|topic| !topic.is_empty()).is_some()
1241            || topic_hint.filter(|topic| !topic.is_empty()).is_some();
1242        normal_records >= 2 && has_topic_signal
1243    }
1244
1245    fn attach_summary_semantic_link(summary_record: &mut MemoryRecord, semantic_id: Uuid) {
1246        if let serde_json::Value::Object(map) = &mut summary_record.metadata {
1247            let derived = map
1248                .entry("_derived".to_string())
1249                .or_insert_with(|| serde_json::json!({}));
1250            if let serde_json::Value::Object(derived_map) = derived {
1251                let semantic_ids = derived_map
1252                    .entry("semantic_ids".to_string())
1253                    .or_insert_with(|| serde_json::json!([]));
1254                if let serde_json::Value::Array(ids) = semantic_ids {
1255                    let semantic_value = serde_json::Value::String(semantic_id.to_string());
1256                    if !ids.iter().any(|existing| existing == &semantic_value) {
1257                        ids.push(semantic_value);
1258                    }
1259                }
1260            }
1261        }
1262    }
1263
1264    fn prepare_dream_summary_inputs(
1265        raw_records: &[RawJournalRecord],
1266    ) -> (Vec<String>, serde_json::Value) {
1267        let mut texts = Vec::new();
1268        let mut normal_count = 0u32;
1269        let mut private_scratch_redacted = 0u32;
1270        let mut sealed_redacted = 0u32;
1271        let mut secret_redacted = 0u32;
1272
1273        for record in raw_records {
1274            match (record.visibility, record.secrecy_level) {
1275                (_, SecrecyLevel::Secret) => {
1276                    secret_redacted += 1;
1277                }
1278                (RawVisibility::Sealed, _) => {
1279                    sealed_redacted += 1;
1280                }
1281                (RawVisibility::PrivateScratch, _) => {
1282                    private_scratch_redacted += 1;
1283                }
1284                (RawVisibility::Normal, _) => {
1285                    if let Some(text) = record.text_content() {
1286                        texts.push(text.to_string());
1287                    }
1288                    normal_count += 1;
1289                }
1290            }
1291        }
1292
1293        let stats = serde_json::json!({
1294            "normal_records": normal_count,
1295            "private_scratch_redacted": private_scratch_redacted,
1296            "sealed_redacted": sealed_redacted,
1297            "secret_redacted": secret_redacted,
1298            "redacted_total": private_scratch_redacted + sealed_redacted + secret_redacted,
1299        });
1300
1301        (texts, stats)
1302    }
1303
1304    fn tokenize_topic_text(text: &str) -> HashSet<String> {
1305        const STOPWORDS: &[&str] = &[
1306            "the", "and", "for", "with", "that", "this", "from", "into", "only", "then", "than",
1307            "have", "has", "had", "were", "was", "are", "but", "not", "you", "your", "our", "raw",
1308            "note", "notes", "session", "topic", "summary", "record", "records",
1309        ];
1310
1311        text.chars()
1312            .map(|ch| if ch.is_alphanumeric() { ch } else { ' ' })
1313            .collect::<String>()
1314            .split_whitespace()
1315            .map(|token| token.to_lowercase())
1316            .filter(|token| token.len() >= 3)
1317            .filter(|token| !STOPWORDS.iter().any(|stopword| stopword == token))
1318            .collect()
1319    }
1320
1321    fn topic_token_overlap(left: &HashSet<String>, right: &HashSet<String>) -> f64 {
1322        if left.is_empty() || right.is_empty() {
1323            return 1.0;
1324        }
1325
1326        let intersection = left.intersection(right).count() as f64;
1327        let union = left.union(right).count() as f64;
1328        if union == 0.0 {
1329            1.0
1330        } else {
1331            intersection / union
1332        }
1333    }
1334
1335    fn infer_topic_hint(raw_records: &[RawJournalRecord]) -> Option<String> {
1336        let mut counts: HashMap<String, u32> = HashMap::new();
1337        for record in raw_records {
1338            if matches!(record.visibility, RawVisibility::Normal)
1339                && !matches!(record.secrecy_level, SecrecyLevel::Secret)
1340            {
1341                if let Some(text) = record.text_content() {
1342                    for token in Self::tokenize_topic_text(text) {
1343                        *counts.entry(token).or_insert(0) += 1;
1344                    }
1345                }
1346            }
1347        }
1348
1349        let mut ranked: Vec<(String, u32)> = counts.into_iter().collect();
1350        ranked.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
1351        let top_terms: Vec<String> = ranked.into_iter().take(3).map(|(token, _)| token).collect();
1352        if top_terms.is_empty() {
1353            None
1354        } else {
1355            Some(top_terms.join("+"))
1356        }
1357    }
1358
1359    fn infer_dream_groups(records: Vec<RawJournalRecord>) -> Vec<DreamGroup> {
1360        let mut explicit_groups: BTreeMap<
1361            (chrono::NaiveDate, String, String),
1362            Vec<RawJournalRecord>,
1363        > = BTreeMap::new();
1364        let mut inferred_sources: BTreeMap<(chrono::NaiveDate, String), Vec<RawJournalRecord>> =
1365            BTreeMap::new();
1366
1367        for record in records {
1368            let date = record.created_at.date_naive();
1369            if let Some(topic_id) = record
1370                .topic_id
1371                .as_ref()
1372                .map(|topic| topic.trim())
1373                .filter(|topic| !topic.is_empty())
1374            {
1375                explicit_groups
1376                    .entry((date, record.session_id.clone(), topic_id.to_string()))
1377                    .or_default()
1378                    .push(record);
1379            } else {
1380                inferred_sources
1381                    .entry((date, record.session_id.clone()))
1382                    .or_default()
1383                    .push(record);
1384            }
1385        }
1386
1387        let mut groups = Vec::new();
1388        for ((_, session_id, topic_id), mut records) in explicit_groups {
1389            records.sort_by(|a, b| {
1390                a.created_at
1391                    .cmp(&b.created_at)
1392                    .then_with(|| a.id.cmp(&b.id))
1393            });
1394            groups.push(DreamGroup {
1395                session_id,
1396                raw_topic_id: Some(topic_id.clone()),
1397                topic_hint: Some(topic_id),
1398                inferred_topic: false,
1399                records,
1400            });
1401        }
1402
1403        for ((date, session_id), mut records) in inferred_sources {
1404            records.sort_by(|a, b| {
1405                a.created_at
1406                    .cmp(&b.created_at)
1407                    .then_with(|| a.id.cmp(&b.id))
1408            });
1409
1410            let mut current: Vec<RawJournalRecord> = Vec::new();
1411            let mut current_tokens: HashSet<String> = HashSet::new();
1412            let mut segment_index = 0u32;
1413
1414            for record in records {
1415                let record_tokens = record
1416                    .text_content()
1417                    .map(Self::tokenize_topic_text)
1418                    .unwrap_or_default();
1419                let should_split = current.last().is_some_and(|previous: &RawJournalRecord| {
1420                    let gap = record.created_at.signed_duration_since(previous.created_at);
1421                    let overlap = Self::topic_token_overlap(&current_tokens, &record_tokens);
1422                    gap > chrono::Duration::minutes(45)
1423                        || (gap > chrono::Duration::minutes(10) && overlap < 0.08)
1424                });
1425
1426                if should_split && !current.is_empty() {
1427                    let topic_hint = Self::infer_topic_hint(&current);
1428                    groups.push(DreamGroup {
1429                        session_id: session_id.clone(),
1430                        raw_topic_id: None,
1431                        topic_hint,
1432                        inferred_topic: true,
1433                        records: std::mem::take(&mut current),
1434                    });
1435                    current_tokens.clear();
1436                    segment_index += 1;
1437                }
1438
1439                current_tokens.extend(record_tokens);
1440                current.push(record);
1441            }
1442
1443            if !current.is_empty() {
1444                let topic_hint = Self::infer_topic_hint(&current)
1445                    .or_else(|| Some(format!("auto-{}-{segment_index}", date.format("%Y%m%d"))));
1446                groups.push(DreamGroup {
1447                    session_id: session_id.clone(),
1448                    raw_topic_id: None,
1449                    topic_hint,
1450                    inferred_topic: true,
1451                    records: current,
1452                });
1453            }
1454        }
1455
1456        groups.sort_by(|left, right| {
1457            let left_time = left.records.first().map(|record| record.created_at);
1458            let right_time = right.records.first().map(|record| record.created_at);
1459            left_time
1460                .cmp(&right_time)
1461                .then_with(|| left.session_id.cmp(&right.session_id))
1462                .then_with(|| left.topic_hint.cmp(&right.topic_hint))
1463        });
1464        groups
1465    }
1466
1467    async fn summarize_dream_group(
1468        &self,
1469        session_id: &str,
1470        topic_hint: Option<&str>,
1471        texts: &[String],
1472        summary_stats: &serde_json::Value,
1473    ) -> String {
1474        if let Some(provider) = &self.llm_provider {
1475            match provider.summarize(texts, 300).await {
1476                Ok(summary) if !summary.trim().is_empty() => {
1477                    let redacted = summary_stats["redacted_total"].as_u64().unwrap_or(0);
1478                    if redacted > 0 {
1479                        return format!(
1480                            "{summary}\n\n[{} raw record(s) redacted from summary]",
1481                            redacted
1482                        );
1483                    }
1484                    return summary;
1485                }
1486                Ok(_) | Err(_) => {}
1487            }
1488        }
1489
1490        let heading = match topic_hint.filter(|topic| !topic.is_empty()) {
1491            Some(topic) => format!("Dream summary for session {session_id} / topic {topic}:"),
1492            None => format!("Dream summary for session {session_id}:"),
1493        };
1494        let snippets: Vec<String> = texts
1495            .iter()
1496            .take(8)
1497            .map(|text| {
1498                if text.len() > 160 {
1499                    format!("{}...", truncate_str(text, 160))
1500                } else {
1501                    text.clone()
1502                }
1503            })
1504            .collect();
1505        let body = snippets.join(" | ");
1506        let redacted_total = summary_stats["redacted_total"].as_u64().unwrap_or(0);
1507        let suffix = if redacted_total > 0 {
1508            format!(" [Redacted raw records: {redacted_total}]")
1509        } else {
1510            String::new()
1511        };
1512        let combined = format!("{heading} {body}{suffix}");
1513        if combined.len() > 1200 {
1514            format!("{}...", truncate_str(&combined, 1200))
1515        } else {
1516            combined
1517        }
1518    }
1519
1520    fn build_dream_summary_metadata(
1521        session_id: &str,
1522        raw_topic_id: Option<&str>,
1523        topic_hint: Option<&str>,
1524        inferred_topic: bool,
1525        raw_records: &[RawJournalRecord],
1526        dreamed_at: chrono::DateTime<Utc>,
1527        summary_stats: serde_json::Value,
1528    ) -> serde_json::Value {
1529        let raw_ids: Vec<String> = raw_records
1530            .iter()
1531            .map(|record| record.id.to_string())
1532            .collect();
1533        let start = raw_records
1534            .first()
1535            .map(|record| record.created_at.to_rfc3339());
1536        let end = raw_records
1537            .last()
1538            .map(|record| record.created_at.to_rfc3339());
1539
1540        serde_json::json!({
1541            "_origin": {
1542                "raw_session_id": session_id,
1543                "raw_topic_id": raw_topic_id,
1544                "raw_topic_hint": topic_hint,
1545                "raw_topic_inferred": inferred_topic,
1546                "raw_record_ids": raw_ids,
1547                "dream_tick_at": dreamed_at.to_rfc3339(),
1548                "raw_record_count": raw_records.len(),
1549                "range_start": start,
1550                "range_end": end
1551            },
1552            "_dream": {
1553                "kind": "episodic_summary",
1554                "source": "raw_journal",
1555                "summary_stats": summary_stats
1556            }
1557        })
1558    }
1559
1560    #[allow(clippy::too_many_arguments)]
1561    fn build_dream_semantic_metadata(
1562        session_id: &str,
1563        raw_topic_id: Option<&str>,
1564        topic_hint: Option<&str>,
1565        inferred_topic: bool,
1566        raw_records: &[RawJournalRecord],
1567        summary_record_id: Uuid,
1568        dreamed_at: chrono::DateTime<Utc>,
1569        summary_stats: serde_json::Value,
1570    ) -> serde_json::Value {
1571        let raw_ids: Vec<String> = raw_records
1572            .iter()
1573            .map(|record| record.id.to_string())
1574            .collect();
1575
1576        serde_json::json!({
1577            "_origin": {
1578                "raw_session_id": session_id,
1579                "raw_topic_id": raw_topic_id,
1580                "raw_topic_hint": topic_hint,
1581                "raw_topic_inferred": inferred_topic,
1582                "raw_record_ids": raw_ids,
1583                "dream_tick_at": dreamed_at.to_rfc3339(),
1584                "raw_record_count": raw_records.len(),
1585                "raw_summary_record_id": summary_record_id.to_string()
1586            },
1587            "_dream": {
1588                "kind": "semantic_summary",
1589                "source": "raw_journal",
1590                "summary_stats": summary_stats
1591            }
1592        })
1593    }
1594
1595    /// encode.store_raw — append a raw journal record to the preservation plane.
1596    pub async fn encode_store_raw(
1597        &self,
1598        req: EncodeStoreRawRequest,
1599    ) -> Result<EncodeStoreRawResponse, CerememoryError> {
1600        let metadata = req.metadata.unwrap_or_else(|| serde_json::json!({}));
1601        let record = RawJournalRecord {
1602            id: Uuid::now_v7(),
1603            session_id: req.session_id,
1604            turn_id: req.turn_id,
1605            topic_id: req.topic_id,
1606            source: req.source,
1607            speaker: req.speaker,
1608            visibility: req.visibility,
1609            secrecy_level: req.secrecy_level,
1610            created_at: Utc::now(),
1611            updated_at: Utc::now(),
1612            content: req.content,
1613            metadata,
1614            derived_memory_ids: Vec::new(),
1615            suppressed: false,
1616        };
1617        record.validate()?;
1618        let record_id = self.raw_journal.append(record.clone()).await?;
1619
1620        Ok(EncodeStoreRawResponse {
1621            record_id,
1622            session_id: record.session_id,
1623            visibility: record.visibility,
1624            secrecy_level: record.secrecy_level,
1625        })
1626    }
1627
1628    /// encode.batch_raw — append multiple raw journal records.
1629    pub async fn encode_batch_store_raw(
1630        &self,
1631        req: EncodeBatchStoreRawRequest,
1632    ) -> Result<EncodeBatchStoreRawResponse, CerememoryError> {
1633        let mut results = Vec::with_capacity(req.records.len());
1634        for record in req.records {
1635            results.push(self.encode_store_raw(record).await?);
1636        }
1637        Ok(EncodeBatchStoreRawResponse { results })
1638    }
1639
1640    /// recall.raw_query — explicit retrieval from the raw journal.
1641    pub async fn recall_raw_query(
1642        &self,
1643        req: RecallRawQueryRequest,
1644    ) -> Result<RecallRawQueryResponse, CerememoryError> {
1645        let secrecy_levels = req.secrecy_levels.as_deref();
1646        let query_lower = req.query.as_ref().map(|query| query.trim().to_lowercase());
1647        let session_filter = req
1648            .session_id
1649            .as_ref()
1650            .map(|session_id| session_id.trim())
1651            .filter(|session_id| !session_id.is_empty())
1652            .map(str::to_string);
1653
1654        let mut records = match (req.query.as_deref(), &session_filter, &req.temporal) {
1655            (Some(query), Some(session_id), _) if !query.trim().is_empty() => {
1656                self.raw_journal
1657                    .search_text(
1658                        query,
1659                        Some(session_id),
1660                        (req.limit as usize).saturating_mul(5),
1661                    )
1662                    .await?
1663            }
1664            (Some(query), None, _) if !query.trim().is_empty() => {
1665                self.raw_journal
1666                    .search_text(query, None, (req.limit as usize).saturating_mul(5))
1667                    .await?
1668            }
1669            (None, Some(session_id), Some(temporal)) => {
1670                self.raw_journal
1671                    .query_session_range(session_id, temporal.start, temporal.end)
1672                    .await?
1673            }
1674            (None, Some(session_id), None) => self.raw_journal.query_session(session_id).await?,
1675            _ => self.raw_journal.get_all().await?,
1676        };
1677        records.retain(|record| {
1678            if record.suppressed {
1679                return false;
1680            }
1681            if !Self::raw_query_allowed_visibility(
1682                record,
1683                req.include_private_scratch,
1684                req.include_sealed,
1685            ) {
1686                return false;
1687            }
1688            if !Self::raw_query_allowed_secrecy(record, secrecy_levels) {
1689                return false;
1690            }
1691            if session_filter.is_none() {
1692                if let Some(ref temporal) = req.temporal {
1693                    if record.created_at < temporal.start || record.created_at > temporal.end {
1694                        return false;
1695                    }
1696                }
1697            }
1698            if let Some(ref query_lower) = query_lower {
1699                if !record.matches_text(query_lower) {
1700                    return false;
1701                }
1702            }
1703            true
1704        });
1705        records.sort_by(|a, b| {
1706            a.created_at
1707                .cmp(&b.created_at)
1708                .then_with(|| a.id.cmp(&b.id))
1709        });
1710        let total_candidates = records.len() as u32;
1711        records.truncate(req.limit as usize);
1712
1713        Ok(RecallRawQueryResponse {
1714            records,
1715            total_candidates,
1716        })
1717    }
1718
1719    // ─── CMP Encode Operations ───────────────────────────────────────
1720
1721    /// encode.store — Store a new memory record (CMP Spec §3.1).
1722    pub async fn encode_store(
1723        &self,
1724        req: EncodeStoreRequest,
1725    ) -> Result<EncodeStoreResponse, CerememoryError> {
1726        let _timer = TimerGuard::new("cerememory_encode_duration_seconds");
1727        let EncodeStoreRequest {
1728            content,
1729            store,
1730            emotion,
1731            context,
1732            metadata,
1733            associations,
1734            ..
1735        } = req;
1736        let store_type = store.unwrap_or_else(|| self.route_store(&content));
1737
1738        let mut record = MemoryRecord {
1739            id: Uuid::now_v7(),
1740            store: store_type,
1741            created_at: Utc::now(),
1742            updated_at: Utc::now(),
1743            last_accessed_at: Utc::now(),
1744            access_count: 0,
1745            content,
1746            fidelity: FidelityState::default(),
1747            emotion: emotion.unwrap_or_default(),
1748            associations: Vec::new(),
1749            metadata: Self::build_record_metadata(context, metadata),
1750            version: 1,
1751        };
1752
1753        // Add manual associations
1754        let mut assoc_count = 0u32;
1755        if let Some(manual) = associations {
1756            for ma in manual {
1757                record.associations.push(Association {
1758                    target_id: ma.target_id,
1759                    association_type: ma.association_type,
1760                    weight: ma.weight,
1761                    created_at: Utc::now(),
1762                    last_co_activation: Utc::now(),
1763                });
1764                assoc_count += 1;
1765            }
1766        }
1767
1768        // Auto-generate embeddings and transcriptions if provider is available
1769        if let Some(ref provider) = self.llm_provider {
1770            let caps = provider.capabilities();
1771
1772            // Auto-embed text blocks that lack embeddings
1773            let has_text_embedding = record
1774                .content
1775                .blocks
1776                .iter()
1777                .any(|b| b.modality == Modality::Text && b.embedding.is_some());
1778            if !has_text_embedding && caps.text_embedding {
1779                if let Some(text) = record.text_content().map(|s| s.to_string()) {
1780                    match provider.embed(&text).await {
1781                        Ok(embedding) if !embedding.is_empty() => {
1782                            if let Some(block) = record
1783                                .content
1784                                .blocks
1785                                .iter_mut()
1786                                .find(|b| b.modality == Modality::Text)
1787                            {
1788                                block.embedding = Some(embedding);
1789                            }
1790                        }
1791                        Ok(_) => {}
1792                        Err(e) => {
1793                            warn!(error = %e, "LLM text auto-embed failed, continuing without embedding");
1794                        }
1795                    }
1796                }
1797            }
1798
1799            // Collect image block data (cloned) for concurrent processing.
1800            // We clone data+format out of the record so the borrow is released
1801            // before we mutate the blocks with the results.
1802            let image_tasks: Vec<(usize, Vec<u8>, String)> = if caps.image_embedding {
1803                record
1804                    .content
1805                    .blocks
1806                    .iter()
1807                    .enumerate()
1808                    .filter(|(_, b)| b.modality == Modality::Image && b.embedding.is_none())
1809                    .map(|(i, b)| (i, b.data.clone(), b.format.clone()))
1810                    .collect()
1811            } else {
1812                Vec::new()
1813            };
1814
1815            let audio_tasks: Vec<(Vec<u8>, String)> = if caps.audio_transcription {
1816                record
1817                    .content
1818                    .blocks
1819                    .iter()
1820                    .filter(|b| b.modality == Modality::Audio)
1821                    .map(|b| (b.data.clone(), b.format.clone()))
1822                    .collect()
1823            } else {
1824                Vec::new()
1825            };
1826
1827            // Process image embeddings concurrently
1828            if !image_tasks.is_empty() {
1829                let image_results: Vec<(usize, Result<Vec<f32>, _>)> =
1830                    futures::future::join_all(image_tasks.iter().map(|(idx, data, fmt)| {
1831                        let idx = *idx;
1832                        async move { (idx, provider.embed_image(data, fmt).await) }
1833                    }))
1834                    .await;
1835
1836                for (idx, result) in image_results {
1837                    match result {
1838                        Ok(embedding) if !embedding.is_empty() => {
1839                            record.content.blocks[idx].embedding = Some(embedding);
1840                        }
1841                        Ok(_) => {}
1842                        Err(e) => {
1843                            warn!(error = %e, "LLM image auto-embed failed, continuing without embedding");
1844                        }
1845                    }
1846                }
1847            }
1848
1849            // Process audio transcriptions concurrently
1850            if !audio_tasks.is_empty() {
1851                let audio_results: Vec<Result<String, _>> =
1852                    futures::future::join_all(audio_tasks.iter().map(|(data, fmt)| async move {
1853                        provider.transcribe_audio(data, fmt).await
1854                    }))
1855                    .await;
1856
1857                let mut new_text_blocks = Vec::new();
1858                for result in audio_results {
1859                    match result {
1860                        Ok(transcript) if !transcript.is_empty() => {
1861                            let mut text_block = ContentBlock {
1862                                modality: Modality::Text,
1863                                format: "text/plain".to_string(),
1864                                data: transcript.as_bytes().to_vec(),
1865                                embedding: None,
1866                            };
1867                            if caps.text_embedding {
1868                                match provider.embed(&transcript).await {
1869                                    Ok(emb) if !emb.is_empty() => {
1870                                        text_block.embedding = Some(emb);
1871                                    }
1872                                    Err(error) => {
1873                                        warn!(error = %error, "LLM transcript auto-embed failed, continuing without embedding");
1874                                    }
1875                                    Ok(_) => {}
1876                                }
1877                            }
1878                            new_text_blocks.push(text_block);
1879                        }
1880                        Ok(_) => {}
1881                        Err(e) => {
1882                            warn!(error = %e, "LLM audio transcription failed, continuing without transcript");
1883                        }
1884                    }
1885                }
1886                record.content.blocks.extend(new_text_blocks);
1887            }
1888        }
1889
1890        record.validate()?;
1891        let id = record.id;
1892        let fidelity = record.fidelity.score;
1893
1894        // 1. Store first (source of truth)
1895        if store_type == StoreType::Working {
1896            let (_, evicted) = self.working.store_with_eviction(record.clone()).await?;
1897            if let Some(evicted_id) = evicted {
1898                self.cleanup_deleted_records(&[(evicted_id, StoreType::Working)])
1899                    .await?;
1900            }
1901        } else {
1902            dispatch_store!(self, store_type, store(record.clone()))?;
1903        }
1904
1905        // 2. Register in coordinator (in-memory, rebuildable)
1906        self.coordinator
1907            .register(id, store_type, record.associations.clone())
1908            .await;
1909
1910        // 3. Index (rebuildable, log on failure)
1911        if let Err(e) = self.index_record(&record) {
1912            warn!(error = %e, record_id = %id, "Failed to index record, will be indexed on rebuild");
1913        }
1914
1915        info!(record_id = %id, store = %store_type, "Encoded memory record");
1916
1917        metrics::counter!("cerememory_encode_total", "store" => store_type.to_string())
1918            .increment(1);
1919
1920        Ok(EncodeStoreResponse {
1921            record_id: id,
1922            store: store_type,
1923            initial_fidelity: fidelity,
1924            associations_created: assoc_count,
1925        })
1926    }
1927
1928    /// encode.batch — Store multiple records (CMP Spec §3.2).
1929    pub async fn encode_batch(
1930        &self,
1931        req: EncodeBatchRequest,
1932    ) -> Result<EncodeBatchResponse, CerememoryError> {
1933        const MAX_BATCH_SIZE: usize = 1000;
1934        if req.records.len() > MAX_BATCH_SIZE {
1935            return Err(CerememoryError::Validation(format!(
1936                "Batch size {} exceeds maximum of {MAX_BATCH_SIZE}",
1937                req.records.len()
1938            )));
1939        }
1940        let mut results = Vec::with_capacity(req.records.len());
1941        let mut total_inferred = 0u32;
1942        let mut prev_id: Option<Uuid> = None;
1943
1944        for store_req in req.records {
1945            let resp = self.encode_store(store_req).await?;
1946
1947            // Infer sequential associations between batch items
1948            if req.infer_associations {
1949                if let Some(prev) = prev_id {
1950                    let assoc_fwd = Association {
1951                        target_id: resp.record_id,
1952                        association_type: AssociationType::Sequential,
1953                        weight: DEFAULT_BATCH_SEQUENTIAL_WEIGHT,
1954                        created_at: Utc::now(),
1955                        last_co_activation: Utc::now(),
1956                    };
1957                    let assoc_bwd = Association {
1958                        target_id: prev,
1959                        association_type: AssociationType::Sequential,
1960                        weight: DEFAULT_BATCH_SEQUENTIAL_WEIGHT,
1961                        created_at: Utc::now(),
1962                        last_co_activation: Utc::now(),
1963                    };
1964                    self.add_persisted_association(&prev, assoc_fwd).await?;
1965                    self.add_persisted_association(&resp.record_id, assoc_bwd)
1966                        .await?;
1967                    total_inferred += 2;
1968                }
1969            }
1970
1971            prev_id = Some(resp.record_id);
1972            results.push(resp);
1973        }
1974
1975        Ok(EncodeBatchResponse {
1976            results,
1977            associations_inferred: total_inferred,
1978        })
1979    }
1980
1981    /// encode.update — Update an existing record (CMP Spec §3.3).
1982    pub async fn encode_update(&self, req: EncodeUpdateRequest) -> Result<(), CerememoryError> {
1983        let (mut record, store_type) = self
1984            .get_store_record(&req.record_id)
1985            .await?
1986            .ok_or_else(|| CerememoryError::RecordNotFound(req.record_id.to_string()))?;
1987
1988        // Apply updates to a clone and validate before persisting
1989        record.apply_updates(
1990            req.content.clone(),
1991            req.emotion.clone(),
1992            req.metadata.clone(),
1993        );
1994        record.validate()?;
1995
1996        // Persist the update
1997        dispatch_store!(
1998            self,
1999            store_type,
2000            update_record(
2001                &req.record_id,
2002                req.content.clone(),
2003                req.emotion,
2004                req.metadata
2005            )
2006        )?;
2007
2008        // Update indexes if content changed
2009        if req.content.is_some() {
2010            let text = Self::build_searchable_text(&record).unwrap_or_default();
2011            if Self::has_indexable_content(&text, &record) {
2012                if let Err(e) = self.text_index.update(
2013                    req.record_id,
2014                    store_type,
2015                    &text,
2016                    record.content.summary.as_deref(),
2017                ) {
2018                    warn!(error = %e, record_id = %req.record_id, "Failed to update text index");
2019                }
2020            } else if let Err(e) = self.text_index.remove(req.record_id) {
2021                warn!(error = %e, record_id = %req.record_id, "Failed to clear text index");
2022            }
2023
2024            // Remove old vector, then insert first new embedding
2025            let _ = self.vector_index.remove(req.record_id);
2026            if let Some(embedding) = Self::primary_embedding(&record) {
2027                if let Err(e) = self.vector_index.upsert(req.record_id, embedding) {
2028                    warn!(error = %e, record_id = %req.record_id, "Failed to update vector index");
2029                }
2030            }
2031        }
2032
2033        Ok(())
2034    }
2035
2036    // ─── CMP Recall Operations ───────────────────────────────────────
2037
2038    /// recall.query — Retrieve memories (CMP Spec §4.1).
2039    ///
2040    /// Phase 2 recall pipeline:
2041    /// 1. Text search via Tantivy (if cue.text present)
2042    /// 2. Vector similarity search (if cue.embedding present)
2043    /// 3. Hybrid score merging
2044    /// 4. Temporal range filter
2045    /// 5. Spreading activation
2046    /// 6. Reconsolidation + human noise rendering
2047    pub async fn recall_query(
2048        &self,
2049        req: RecallQueryRequest,
2050    ) -> Result<RecallQueryResponse, CerememoryError> {
2051        let _timer = TimerGuard::new("cerememory_recall_duration_seconds");
2052        let mode = *self.recall_mode.read().await;
2053        let recall_mode = req.recall_mode;
2054        let effective_mode = if mode == RecallMode::Perfect {
2055            RecallMode::Perfect
2056        } else {
2057            recall_mode
2058        };
2059        let (cue_text, cue_embedding) = self.resolve_recall_cues(&req.cue).await?;
2060
2061        let stores = req.stores.clone().unwrap_or_else(|| {
2062            vec![
2063                StoreType::Episodic,
2064                StoreType::Semantic,
2065                StoreType::Procedural,
2066                StoreType::Emotional,
2067                StoreType::Working,
2068            ]
2069        });
2070
2071        let mut candidates: Vec<(MemoryRecord, f64)> = Vec::new();
2072        let mut seen_ids: HashSet<Uuid> = HashSet::new();
2073
2074        // Score maps for hybrid merging
2075        let mut text_scores: HashMap<Uuid, f64> = HashMap::new();
2076        let mut vec_scores: HashMap<Uuid, f64> = HashMap::new();
2077
2078        // 1. Tantivy full-text search
2079        if let Some(ref text) = cue_text {
2080            let search_limit = req.limit as usize * 3;
2081            match self.text_index.search(text, Some(&stores), search_limit) {
2082                Ok(hits) => {
2083                    for hit in hits {
2084                        text_scores.insert(hit.record_id, hit.score as f64);
2085                    }
2086                }
2087                Err(e) => {
2088                    // Fallback to store-level text search if Tantivy fails
2089                    warn!(error = %e, "Text index search failed, falling back to store query");
2090                    for store_type in &stores {
2091                        let results = dispatch_store!(
2092                            self,
2093                            *store_type,
2094                            query_text(text, req.limit as usize * 2)
2095                        )?;
2096                        for record in results {
2097                            text_scores.insert(record.id, record.fidelity.score);
2098                        }
2099                    }
2100                }
2101            }
2102        }
2103
2104        // 2. Vector similarity search
2105        if let Some(ref embedding) = cue_embedding {
2106            if let Ok(hits) = self.vector_index.search(embedding, req.limit as usize * 3) {
2107                for hit in hits {
2108                    if hit.similarity > 0.0 {
2109                        vec_scores.insert(hit.record_id, hit.similarity as f64);
2110                    }
2111                }
2112            }
2113        }
2114
2115        // 3. Merge scores: collect all candidate IDs
2116        let all_ids: HashSet<Uuid> = text_scores
2117            .keys()
2118            .chain(vec_scores.keys())
2119            .copied()
2120            .collect();
2121        let mut scanned_ids = all_ids.clone();
2122        let mut total_records_scanned = scanned_ids.len() as u32;
2123        let mut fidelity_filtered: u32 = 0;
2124
2125        for id in all_ids {
2126            if !seen_ids.insert(id) {
2127                continue;
2128            }
2129            if let Some((record, _)) = self.get_store_record(&id).await? {
2130                // Filter by requested stores
2131                if !stores.contains(&record.store) {
2132                    continue;
2133                }
2134                if let Some(min_f) = req.min_fidelity {
2135                    if record.fidelity.score < min_f && !req.include_decayed {
2136                        fidelity_filtered += 1;
2137                        continue;
2138                    }
2139                }
2140
2141                // Hybrid scoring
2142                let ts = text_scores.get(&id);
2143                let vs = vec_scores.get(&id);
2144                let score = match (ts, vs) {
2145                    (Some(&t), Some(&v)) => t * 0.6 + v * 0.4,
2146                    (Some(&t), None) => t,
2147                    (None, Some(&v)) => v,
2148                    (None, None) => 0.0,
2149                };
2150
2151                candidates.push((record, score));
2152            }
2153        }
2154
2155        // If no index search was performed (no text or embedding cue),
2156        // fall back to store-level text search for backward compatibility
2157        if cue_text.is_none() && cue_embedding.is_none() {
2158            // No search cue — candidates will come from temporal or activation only
2159        }
2160
2161        // 4. Temporal range enrichment across all requested stores
2162        if let Some(ref temporal) = req.cue.temporal {
2163            for store_type in &stores {
2164                let records = dispatch_store!(self, *store_type, get_all())?;
2165                for record in records {
2166                    if record.created_at < temporal.start || record.created_at > temporal.end {
2167                        continue;
2168                    }
2169                    if scanned_ids.insert(record.id) {
2170                        total_records_scanned += 1;
2171                    }
2172                    if !seen_ids.insert(record.id) {
2173                        continue;
2174                    }
2175                    if let Some(min_f) = req.min_fidelity {
2176                        if record.fidelity.score < min_f && !req.include_decayed {
2177                            fidelity_filtered += 1;
2178                            continue;
2179                        }
2180                    }
2181                    let score = record.fidelity.score;
2182                    candidates.push((record, score));
2183                }
2184            }
2185        }
2186
2187        // 5. Spreading activation for top candidates
2188        let mut activated_ids: HashMap<Uuid, f64> = HashMap::new();
2189        if req.activation_depth > 0 && !candidates.is_empty() {
2190            let top_id = candidates
2191                .iter()
2192                .max_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal))
2193                .map(|(r, _)| r.id);
2194
2195            if let Some(source_id) = top_id {
2196                if let Ok(activated) = self
2197                    .activation
2198                    .activate(&source_id, req.activation_depth, 0.05)
2199                    .await
2200                {
2201                    for act in &activated {
2202                        activated_ids.insert(act.record_id, act.activation_level);
2203                        if seen_ids.insert(act.record_id) {
2204                            if let Some((record, _store)) =
2205                                self.get_store_record(&act.record_id).await?
2206                            {
2207                                if !stores.contains(&record.store) {
2208                                    continue;
2209                                }
2210                                if let Some(ref temporal) = req.cue.temporal {
2211                                    if record.created_at < temporal.start
2212                                        || record.created_at > temporal.end
2213                                    {
2214                                        continue;
2215                                    }
2216                                }
2217                                if let Some(min_f) = req.min_fidelity {
2218                                    if record.fidelity.score < min_f && !req.include_decayed {
2219                                        fidelity_filtered += 1;
2220                                        continue;
2221                                    }
2222                                }
2223                                candidates.push((record, act.activation_level * 0.5));
2224                            }
2225                        }
2226                    }
2227                }
2228            }
2229        }
2230
2231        // Boost relevance with activation scores
2232        for (record, relevance) in &mut candidates {
2233            if let Some(activation) = activated_ids.get(&record.id) {
2234                *relevance += activation * 0.3;
2235            }
2236        }
2237
2238        // Final filter pass: catches index-search results outside the temporal
2239        // window and any edge cases from spreading activation.
2240        if let Some(ref temporal) = req.cue.temporal {
2241            candidates.retain(|(record, _)| {
2242                record.created_at >= temporal.start && record.created_at <= temporal.end
2243            });
2244        }
2245        candidates.retain(|(record, _)| stores.contains(&record.store));
2246        if let Some(min_fidelity) = req.min_fidelity {
2247            let before = candidates.len();
2248            candidates
2249                .retain(|(record, _)| req.include_decayed || record.fidelity.score >= min_fidelity);
2250            fidelity_filtered += (before - candidates.len()) as u32;
2251        }
2252
2253        // Sort by relevance descending, track pre-truncation count
2254        candidates.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
2255        let total_candidates = candidates.len() as u32;
2256        candidates.truncate(req.limit as usize);
2257
2258        // 6. Build response with reconsolidation
2259        let mut memories = Vec::with_capacity(candidates.len());
2260        for (mut record, relevance) in candidates {
2261            // Reconsolidate: update access metadata + fidelity
2262            if req.reconsolidate {
2263                record.access_count += 1;
2264                record.last_accessed_at = Utc::now();
2265
2266                // Stability boost
2267                let new_stability = self.decay.boost_stability(record.fidelity.stability);
2268                record.fidelity.stability = new_stability;
2269                record.fidelity.reinforcement_count += 1;
2270
2271                if let Some(store_type) = self.coordinator.get_record_store_type(&record.id).await?
2272                {
2273                    if let Err(e) = dispatch_store!(
2274                        self,
2275                        store_type,
2276                        update_fidelity(&record.id, record.fidelity.clone())
2277                    ) {
2278                        warn!(record_id = %record.id, error = %e, "Failed to update fidelity during reconsolidation");
2279                    }
2280                    if let Err(e) = dispatch_store!(
2281                        self,
2282                        store_type,
2283                        update_access(&record.id, record.access_count, record.last_accessed_at)
2284                    ) {
2285                        warn!(record_id = %record.id, error = %e, "Failed to update access during reconsolidation");
2286                    }
2287                }
2288            }
2289
2290            // Render content with or without noise
2291            let rendered_content = match effective_mode {
2292                RecallMode::Perfect => record.content.clone(),
2293                RecallMode::Human => apply_human_noise(&record.content, record.fidelity.score),
2294            };
2295
2296            memories.push(RecalledMemory {
2297                activation_path: activated_ids.get(&record.id).map(|_| vec![record.id]),
2298                relevance_score: relevance,
2299                rendered_content,
2300                record,
2301            });
2302        }
2303
2304        // Notify evolution engine about recall performance
2305        if !memories.is_empty() {
2306            let hit_count = memories.iter().filter(|m| m.relevance_score > 0.0).count();
2307            let hit_rate = hit_count as f64 / memories.len() as f64;
2308            // Use the first memory's store type as representative
2309            let store = memories[0].record.store;
2310            self.evolution.observe_recall(store, hit_rate);
2311        }
2312
2313        metrics::counter!("cerememory_recall_total").increment(1);
2314
2315        Ok(RecallQueryResponse {
2316            memories,
2317            activation_trace: None,
2318            total_candidates,
2319            query_metadata: Some(QueryMetadata {
2320                total_records_scanned,
2321                stores_searched: stores,
2322                fidelity_filtered,
2323            }),
2324        })
2325    }
2326
2327    /// recall.associate — Get associated memories (CMP Spec §4.2).
2328    pub async fn recall_associate(
2329        &self,
2330        req: RecallAssociateRequest,
2331    ) -> Result<RecallAssociateResponse, CerememoryError> {
2332        // Verify record exists
2333        self.get_store_record(&req.record_id)
2334            .await?
2335            .ok_or_else(|| CerememoryError::RecordNotFound(req.record_id.to_string()))?;
2336
2337        let activated = self
2338            .activation
2339            .activate(&req.record_id, req.depth, req.min_weight)
2340            .await?;
2341
2342        let source_assocs = if req.association_types.is_some() {
2343            Some(self.coordinator.get_associations(&req.record_id).await?)
2344        } else {
2345            None
2346        };
2347
2348        let mut memories = Vec::new();
2349        for act in activated.iter().take(req.limit as usize) {
2350            if let Some(types) = &req.association_types {
2351                let assocs = source_assocs.as_ref().expect("pre-fetched above");
2352                let matches = assocs
2353                    .iter()
2354                    .any(|a| a.target_id == act.record_id && types.contains(&a.association_type));
2355                if !matches && act.path.len() <= 2 {
2356                    continue;
2357                }
2358            }
2359
2360            if let Some((record, _)) = self.get_store_record(&act.record_id).await? {
2361                memories.push(RecalledMemory {
2362                    rendered_content: record.content.clone(),
2363                    relevance_score: act.activation_level,
2364                    activation_path: Some(act.path.clone()),
2365                    record,
2366                });
2367            }
2368        }
2369
2370        Ok(RecallAssociateResponse {
2371            total_candidates: memories.len() as u32,
2372            memories,
2373        })
2374    }
2375
2376    /// recall.timeline — Temporal bucketed recall (CMP Spec §4.3, OPTIONAL).
2377    pub async fn recall_timeline(
2378        &self,
2379        req: RecallTimelineRequest,
2380    ) -> Result<RecallTimelineResponse, CerememoryError> {
2381        let records = self
2382            .episodic
2383            .query_temporal_range(req.range.start, req.range.end)
2384            .await?;
2385
2386        // Group records into time buckets based on granularity
2387        let mut bucket_map: std::collections::BTreeMap<i64, Vec<MemoryRecord>> =
2388            std::collections::BTreeMap::new();
2389
2390        for record in records {
2391            // Apply min_fidelity filter
2392            if let Some(min_f) = req.min_fidelity {
2393                if record.fidelity.score < min_f {
2394                    continue;
2395                }
2396            }
2397
2398            // Apply emotion filter: cosine similarity between emotion vectors
2399            if let Some(ref filter) = req.emotion_filter {
2400                if !Self::emotion_matches(&record.emotion, filter) {
2401                    continue;
2402                }
2403            }
2404
2405            let bucket_key = Self::bucket_key(record.created_at, req.granularity);
2406            bucket_map.entry(bucket_key).or_default().push(record);
2407        }
2408
2409        // Also scan other stores for records created in the range
2410        for store_type in [
2411            StoreType::Semantic,
2412            StoreType::Procedural,
2413            StoreType::Emotional,
2414        ] {
2415            let records = dispatch_store!(self, store_type, get_all())?;
2416            for record in records {
2417                if record.created_at >= req.range.start && record.created_at <= req.range.end {
2418                    if let Some(min_f) = req.min_fidelity {
2419                        if record.fidelity.score < min_f {
2420                            continue;
2421                        }
2422                    }
2423                    if let Some(ref filter) = req.emotion_filter {
2424                        if !Self::emotion_matches(&record.emotion, filter) {
2425                            continue;
2426                        }
2427                    }
2428                    let bucket_key = Self::bucket_key(record.created_at, req.granularity);
2429                    bucket_map.entry(bucket_key).or_default().push(record);
2430                }
2431            }
2432        }
2433
2434        // Convert to response buckets
2435        let buckets: Vec<TimelineBucket> = bucket_map
2436            .into_iter()
2437            .map(|(key, records)| {
2438                let (start, end) = Self::bucket_range(key, req.granularity);
2439                let count = records.len() as u32;
2440                let memories = records
2441                    .into_iter()
2442                    .map(|record| RecalledMemory {
2443                        relevance_score: record.fidelity.score,
2444                        rendered_content: record.content.clone(),
2445                        activation_path: None,
2446                        record,
2447                    })
2448                    .collect();
2449                TimelineBucket {
2450                    start,
2451                    end,
2452                    memories,
2453                    count,
2454                }
2455            })
2456            .collect();
2457
2458        Ok(RecallTimelineResponse { buckets })
2459    }
2460
2461    /// Check if a record's emotion matches the filter.
2462    /// Uses cosine similarity on the 8-dimensional emotion vector (Plutchik axes).
2463    /// A match requires similarity > 0.5 (moderate alignment).
2464    fn emotion_matches(record_emotion: &EmotionVector, filter: &EmotionVector) -> bool {
2465        let r = [
2466            record_emotion.joy,
2467            record_emotion.trust,
2468            record_emotion.fear,
2469            record_emotion.surprise,
2470            record_emotion.sadness,
2471            record_emotion.disgust,
2472            record_emotion.anger,
2473            record_emotion.anticipation,
2474        ];
2475        let f = [
2476            filter.joy,
2477            filter.trust,
2478            filter.fear,
2479            filter.surprise,
2480            filter.sadness,
2481            filter.disgust,
2482            filter.anger,
2483            filter.anticipation,
2484        ];
2485
2486        let dot: f64 = r.iter().zip(f.iter()).map(|(a, b)| a * b).sum();
2487        let norm_r: f64 = r.iter().map(|x| x * x).sum::<f64>().sqrt();
2488        let norm_f: f64 = f.iter().map(|x| x * x).sum::<f64>().sqrt();
2489
2490        if norm_r < f64::EPSILON || norm_f < f64::EPSILON {
2491            return false; // Neutral emotion — no match
2492        }
2493
2494        let similarity = dot / (norm_r * norm_f);
2495        similarity > 0.5
2496    }
2497
2498    fn bucket_key(ts: chrono::DateTime<Utc>, granularity: TimeGranularity) -> i64 {
2499        use chrono::Datelike;
2500        let secs = ts.timestamp();
2501        match granularity {
2502            TimeGranularity::Minute => secs / 60,
2503            TimeGranularity::Hour => secs / 3600,
2504            TimeGranularity::Day => secs / 86400,
2505            TimeGranularity::Week => secs / 604800,
2506            TimeGranularity::Month => ts.year() as i64 * 12 + (ts.month() as i64 - 1),
2507        }
2508    }
2509
2510    fn bucket_range(
2511        key: i64,
2512        granularity: TimeGranularity,
2513    ) -> (chrono::DateTime<Utc>, chrono::DateTime<Utc>) {
2514        use chrono::TimeZone;
2515
2516        // Fixed-duration granularities share one formula.
2517        let epoch_secs = |g: TimeGranularity| -> Option<i64> {
2518            match g {
2519                TimeGranularity::Minute => Some(60),
2520                TimeGranularity::Hour => Some(3600),
2521                TimeGranularity::Day => Some(86400),
2522                TimeGranularity::Week => Some(604800),
2523                TimeGranularity::Month => None,
2524            }
2525        };
2526
2527        if let Some(secs) = epoch_secs(granularity) {
2528            let start = Utc
2529                .timestamp_opt(key * secs, 0)
2530                .single()
2531                .unwrap_or_default();
2532            let end = Utc
2533                .timestamp_opt((key + 1) * secs, 0)
2534                .single()
2535                .unwrap_or_default();
2536            return (start, end);
2537        }
2538
2539        // Month: calendar-aware boundaries
2540        let year = key / 12;
2541        let month = (key % 12) + 1;
2542        let start = Utc
2543            .with_ymd_and_hms(year as i32, month as u32, 1, 0, 0, 0)
2544            .single()
2545            .unwrap_or_default();
2546        let next_month = if month == 12 { 1 } else { month + 1 };
2547        let next_year = if month == 12 { year + 1 } else { year };
2548        let end = Utc
2549            .with_ymd_and_hms(next_year as i32, next_month as u32, 1, 0, 0, 0)
2550            .single()
2551            .unwrap_or_default();
2552        (start, end)
2553    }
2554
2555    /// recall.graph — Local graph extraction (CMP Spec §4.4, OPTIONAL).
2556    pub async fn recall_graph(
2557        &self,
2558        req: RecallGraphRequest,
2559    ) -> Result<RecallGraphResponse, CerememoryError> {
2560        let mut nodes: Vec<GraphNode> = Vec::new();
2561        let mut edges: Vec<GraphEdge> = Vec::new();
2562        let mut visited: HashSet<Uuid> = HashSet::new();
2563        let mut queue: std::collections::VecDeque<(Uuid, u32)> = std::collections::VecDeque::new();
2564        let limit = req.limit_nodes as usize;
2565
2566        // Start from center_id or all registered records
2567        if let Some(center) = req.center_id {
2568            queue.push_back((center, 0));
2569        } else {
2570            // No center: return a sample of the graph
2571            let reg = self.coordinator.records_by_store().await;
2572            let all_ids: Vec<Uuid> = {
2573                let mut ids = Vec::new();
2574                for st in [
2575                    StoreType::Episodic,
2576                    StoreType::Semantic,
2577                    StoreType::Procedural,
2578                    StoreType::Emotional,
2579                ] {
2580                    if reg.contains_key(&st) {
2581                        let store_ids = dispatch_store!(self, st, list_ids())?;
2582                        ids.extend(store_ids.into_iter().take(limit));
2583                    }
2584                    if ids.len() >= limit {
2585                        break;
2586                    }
2587                }
2588                ids
2589            };
2590            for id in all_ids {
2591                queue.push_back((id, 0));
2592            }
2593        }
2594
2595        // BFS traversal
2596        while let Some((id, depth)) = queue.pop_front() {
2597            if !visited.insert(id) || nodes.len() >= limit {
2598                continue;
2599            }
2600
2601            if let Some((record, _store)) = self.get_store_record(&id).await? {
2602                nodes.push(GraphNode {
2603                    id: record.id,
2604                    store: record.store,
2605                    summary: record.content.summary.clone().or_else(|| {
2606                        record.text_content().map(|t| {
2607                            if t.len() > 80 {
2608                                format!("{}...", truncate_str(t, 80))
2609                            } else {
2610                                t.to_string()
2611                            }
2612                        })
2613                    }),
2614                    fidelity: record.fidelity.score,
2615                });
2616
2617                if depth < req.depth {
2618                    let assocs = self.coordinator.get_associations(&id).await?;
2619                    for assoc in assocs {
2620                        // Filter by edge_types if specified
2621                        if let Some(ref types) = req.edge_types {
2622                            let type_str = serde_json::to_value(assoc.association_type)
2623                                .ok()
2624                                .and_then(|v| v.as_str().map(|s| s.to_string()))
2625                                .unwrap_or_default();
2626                            if !types.iter().any(|t| t.to_lowercase() == type_str) {
2627                                continue;
2628                            }
2629                        }
2630
2631                        // Cap edges at 10x node limit to prevent unbounded growth
2632                        if edges.len() < limit * 10 {
2633                            edges.push(GraphEdge {
2634                                source: id,
2635                                target: assoc.target_id,
2636                                edge_type: assoc.association_type,
2637                                weight: assoc.weight,
2638                            });
2639                        }
2640
2641                        if !visited.contains(&assoc.target_id) {
2642                            queue.push_back((assoc.target_id, depth + 1));
2643                        }
2644                    }
2645                }
2646            }
2647        }
2648
2649        let total_nodes = nodes.len() as u32;
2650        Ok(RecallGraphResponse {
2651            nodes,
2652            edges,
2653            total_nodes,
2654        })
2655    }
2656
2657    // ─── CMP Lifecycle Operations ────────────────────────────────────
2658
2659    /// lifecycle.dream_tick — summarize unprocessed raw journal entries into episodic summaries.
2660    pub async fn lifecycle_dream_tick(
2661        &self,
2662        req: DreamTickRequest,
2663    ) -> Result<DreamTickResponse, CerememoryError> {
2664        let secrecy_levels = req.secrecy_levels.as_deref();
2665        let session_filter = req
2666            .session_id
2667            .as_ref()
2668            .map(|session_id| session_id.trim())
2669            .filter(|session_id| !session_id.is_empty())
2670            .map(str::to_string);
2671
2672        let raw_records = self.raw_journal.get_all().await?;
2673        let mut candidate_records = Vec::new();
2674
2675        for record in raw_records {
2676            if record.suppressed || Self::raw_record_processed(&record) {
2677                continue;
2678            }
2679            if !Self::raw_query_allowed_visibility(
2680                &record,
2681                req.include_private_scratch,
2682                req.include_sealed,
2683            ) {
2684                continue;
2685            }
2686            if !Self::raw_query_allowed_secrecy(&record, secrecy_levels) {
2687                continue;
2688            }
2689            if let Some(ref session_id) = session_filter {
2690                if &record.session_id != session_id {
2691                    continue;
2692                }
2693            }
2694            candidate_records.push(record);
2695        }
2696
2697        let mut groups_processed = 0u32;
2698        let mut raw_records_processed = 0u32;
2699        let mut episodic_summaries_created = 0u32;
2700        let mut semantic_nodes_created = 0u32;
2701        let max_groups = req.max_groups as usize;
2702
2703        for group in Self::infer_dream_groups(candidate_records)
2704            .into_iter()
2705            .take(max_groups)
2706        {
2707            if group.records.is_empty() {
2708                continue;
2709            }
2710
2711            groups_processed += 1;
2712            raw_records_processed += group.records.len() as u32;
2713
2714            if req.dry_run {
2715                episodic_summaries_created += 1;
2716                continue;
2717            }
2718
2719            let session_id = group.session_id.clone();
2720            let (texts, summary_stats) = Self::prepare_dream_summary_inputs(&group.records);
2721
2722            let summary_text = if texts.is_empty() {
2723                let redacted_total = summary_stats["redacted_total"].as_u64().unwrap_or(0);
2724                format!(
2725                    "Dream summary for session {}: {} raw record(s) preserved. {} redacted from summary.",
2726                    session_id,
2727                    group.records.len(),
2728                    redacted_total
2729                )
2730            } else {
2731                self.summarize_dream_group(
2732                    &session_id,
2733                    group.topic_hint.as_deref(),
2734                    &texts,
2735                    &summary_stats,
2736                )
2737                .await
2738            };
2739
2740            let dreamed_at = Utc::now();
2741            let mut summary_record =
2742                MemoryRecord::new_text(StoreType::Episodic, summary_text.clone());
2743            summary_record.content.summary = Some(if summary_text.len() > 160 {
2744                format!("{}...", truncate_str(&summary_text, 160))
2745            } else {
2746                summary_text.clone()
2747            });
2748            summary_record.metadata = Self::build_dream_summary_metadata(
2749                &session_id,
2750                group.raw_topic_id.as_deref(),
2751                group.topic_hint.as_deref(),
2752                group.inferred_topic,
2753                &group.records,
2754                dreamed_at,
2755                summary_stats.clone(),
2756            );
2757
2758            let promote_semantic = req.promote_semantic
2759                && Self::should_promote_dream_group(
2760                    group.raw_topic_id.as_deref(),
2761                    group.topic_hint.as_deref(),
2762                    &summary_stats,
2763                );
2764            let semantic_record = if promote_semantic {
2765                let mut semantic_record =
2766                    MemoryRecord::new_text(StoreType::Semantic, summary_text.clone());
2767                semantic_record.content.summary = summary_record.content.summary.clone();
2768                semantic_record.metadata = Self::build_dream_semantic_metadata(
2769                    &session_id,
2770                    group.raw_topic_id.as_deref(),
2771                    group.topic_hint.as_deref(),
2772                    group.inferred_topic,
2773                    &group.records,
2774                    summary_record.id,
2775                    dreamed_at,
2776                    summary_stats.clone(),
2777                );
2778                Some(semantic_record)
2779            } else {
2780                None
2781            };
2782
2783            if let Some(semantic_record) = &semantic_record {
2784                let assoc = Association {
2785                    target_id: semantic_record.id,
2786                    association_type: AssociationType::Semantic,
2787                    weight: 1.0,
2788                    created_at: dreamed_at,
2789                    last_co_activation: dreamed_at,
2790                };
2791                summary_record.associations.push(assoc);
2792                Self::attach_summary_semantic_link(&mut summary_record, semantic_record.id);
2793            }
2794
2795            let summary_id = summary_record.id;
2796            dispatch_store!(self, StoreType::Episodic, store(summary_record.clone()))?;
2797            self.coordinator
2798                .register(
2799                    summary_record.id,
2800                    StoreType::Episodic,
2801                    summary_record.associations.clone(),
2802                )
2803                .await;
2804            if let Err(e) = self.index_record(&summary_record) {
2805                warn!(error = %e, record_id = %summary_record.id, "Failed to index dream summary");
2806            }
2807
2808            let semantic_id = if let Some(mut semantic_record) = semantic_record {
2809                semantic_record.associations.push(Association {
2810                    target_id: summary_id,
2811                    association_type: AssociationType::Semantic,
2812                    weight: 1.0,
2813                    created_at: dreamed_at,
2814                    last_co_activation: dreamed_at,
2815                });
2816                let semantic_id = semantic_record.id;
2817                if let Err(err) =
2818                    dispatch_store!(self, StoreType::Semantic, store(semantic_record.clone()))
2819                {
2820                    if dispatch_store!(self, StoreType::Episodic, delete(&summary_id))? {
2821                        self.cleanup_deleted_records(&[(summary_id, StoreType::Episodic)])
2822                            .await?;
2823                    }
2824                    return Err(err);
2825                }
2826                self.coordinator
2827                    .register(
2828                        semantic_record.id,
2829                        StoreType::Semantic,
2830                        semantic_record.associations.clone(),
2831                    )
2832                    .await;
2833                if let Err(e) = self.index_record(&semantic_record) {
2834                    warn!(error = %e, record_id = %semantic_record.id, "Failed to index dream semantic summary");
2835                }
2836                semantic_nodes_created += 1;
2837                Some(semantic_id)
2838            } else {
2839                None
2840            };
2841
2842            let original_group = group.records.clone();
2843            let mut update_failed: Option<CerememoryError> = None;
2844            let mut updated_group = group.records.clone();
2845            for raw_record in &mut updated_group {
2846                Self::mark_raw_record_dream_processed(
2847                    raw_record,
2848                    summary_id,
2849                    semantic_id,
2850                    dreamed_at,
2851                );
2852                if let Err(err) = self.raw_journal.update(raw_record.clone()).await {
2853                    update_failed = Some(err);
2854                    break;
2855                }
2856            }
2857
2858            if let Some(err) = update_failed {
2859                for original in &original_group {
2860                    let _ = self.raw_journal.update(original.clone()).await;
2861                }
2862                let mut cleanup_targets = Vec::new();
2863                if dispatch_store!(self, StoreType::Episodic, delete(&summary_id))? {
2864                    cleanup_targets.push((summary_id, StoreType::Episodic));
2865                }
2866                if let Some(semantic_id) = semantic_id {
2867                    if dispatch_store!(self, StoreType::Semantic, delete(&semantic_id))? {
2868                        cleanup_targets.push((semantic_id, StoreType::Semantic));
2869                    }
2870                }
2871                if !cleanup_targets.is_empty() {
2872                    self.cleanup_deleted_records(&cleanup_targets).await?;
2873                }
2874                return Err(CerememoryError::Internal(format!(
2875                    "Dream tick failed while updating raw journal state: {err}"
2876                )));
2877            }
2878
2879            episodic_summaries_created += 1;
2880        }
2881
2882        Ok(DreamTickResponse {
2883            groups_processed,
2884            raw_records_processed,
2885            episodic_summaries_created,
2886            semantic_nodes_created,
2887        })
2888    }
2889
2890    /// lifecycle.consolidate — Smart Consolidation (CMP Spec §5.1).
2891    ///
2892    /// Phase 4 enhancements:
2893    /// - **Duplicate detection**: Vector similarity > 0.92 identifies near-duplicates.
2894    ///   Higher-fidelity record is kept; associations are merged.
2895    /// - **LLM summarization**: When a provider is configured, related episodic records
2896    ///   are summarized into a single semantic node (otherwise, truncation fallback).
2897    /// - **Relation extraction**: LLM extracts semantic relations, stored as associations.
2898    pub async fn lifecycle_consolidate(
2899        &self,
2900        req: ConsolidateRequest,
2901    ) -> Result<ConsolidateResponse, CerememoryError> {
2902        let ids = self.episodic.list_ids().await?;
2903        let mut processed = 0u32;
2904        let mut migrated = 0u32;
2905        let mut semantic_created = 0u32;
2906        let mut compressed = 0u32;
2907        let mut pruned = 0u32;
2908
2909        // Phase 1: Detect and merge near-duplicate records
2910        let mut duplicate_groups: Vec<(Uuid, Uuid)> = Vec::new();
2911        if !req.dry_run {
2912            let mut checked: HashSet<Uuid> = HashSet::new();
2913            for &id in &ids {
2914                if checked.contains(&id) {
2915                    continue;
2916                }
2917                // Use vector index to find similar records
2918                if let Some(record) = self.episodic.get(&id).await? {
2919                    if let Some(emb) = Self::primary_embedding(&record) {
2920                        if let Ok(hits) = self.vector_index.search(emb, 5) {
2921                            for hit in hits {
2922                                if hit.record_id != id
2923                                    && hit.similarity > 0.92
2924                                    && !checked.contains(&hit.record_id)
2925                                {
2926                                    let Some((_, hit_store)) =
2927                                        self.get_store_record(&hit.record_id).await?
2928                                    else {
2929                                        continue;
2930                                    };
2931                                    if hit_store != StoreType::Episodic {
2932                                        continue;
2933                                    }
2934                                    duplicate_groups.push((id, hit.record_id));
2935                                    checked.insert(hit.record_id);
2936                                }
2937                            }
2938                        }
2939                    }
2940                }
2941                checked.insert(id);
2942            }
2943
2944            // Merge duplicates: keep higher-fidelity, merge associations
2945            for (keep_id, remove_id) in &duplicate_groups {
2946                if let (Some(keep_rec), Some(remove_rec)) = (
2947                    self.episodic.get(keep_id).await?,
2948                    self.get_store_record(remove_id).await?,
2949                ) {
2950                    let (remove_record, remove_store) = remove_rec;
2951                    // Decide which to keep based on fidelity
2952                    let (actual_keep, actual_remove, actual_remove_store) =
2953                        if keep_rec.fidelity.score >= remove_record.fidelity.score {
2954                            (*keep_id, *remove_id, remove_store)
2955                        } else {
2956                            (*remove_id, *keep_id, StoreType::Episodic)
2957                        };
2958
2959                    // Merge associations from removed to kept
2960                    let removed_assocs = self.coordinator.get_associations(&actual_remove).await?;
2961                    for assoc in removed_assocs {
2962                        self.add_persisted_association(&actual_keep, assoc).await?;
2963                    }
2964
2965                    // Delete the duplicate
2966                    if dispatch_store!(self, actual_remove_store, delete(&actual_remove))? {
2967                        self.cleanup_deleted_records(&[(actual_remove, actual_remove_store)])
2968                            .await?;
2969                        compressed += 1;
2970                    }
2971                }
2972            }
2973        }
2974
2975        // Phase 2: Migrate eligible episodic records to semantic
2976        let remaining_records = self.episodic.get_all().await?;
2977
2978        for record in remaining_records {
2979            processed += 1;
2980            let age_hours = (Utc::now() - record.created_at).num_hours() as u32;
2981            if age_hours < req.min_age_hours {
2982                continue;
2983            }
2984            if record.access_count < req.min_access_count {
2985                continue;
2986            }
2987
2988            if req.dry_run {
2989                migrated += 1;
2990                continue;
2991            }
2992
2993            // Create semantic node
2994            let mut semantic_record = record.clone();
2995            semantic_record.store = StoreType::Semantic;
2996            semantic_record.id = Uuid::now_v7();
2997
2998            // Generate summary: use LLM if available, otherwise truncate
2999            if semantic_record.content.summary.is_none() {
3000                if let Some(ref provider) = self.llm_provider {
3001                    if let Some(text) = record.text_content() {
3002                        match provider.summarize(&[text.to_string()], 200).await {
3003                            Ok(summary) if !summary.is_empty() => {
3004                                semantic_record.content.summary = Some(summary);
3005                            }
3006                            _ => {
3007                                // Fallback to truncation
3008                                semantic_record.content.summary = Some(
3009                                    record
3010                                        .text_content()
3011                                        .map(|t| {
3012                                            if t.len() > 100 {
3013                                                format!("{}...", truncate_str(t, 100))
3014                                            } else {
3015                                                t.to_string()
3016                                            }
3017                                        })
3018                                        .unwrap_or_default(),
3019                                );
3020                            }
3021                        }
3022                    }
3023                } else {
3024                    semantic_record.content.summary = semantic_record.text_content().map(|t| {
3025                        if t.len() > 100 {
3026                            format!("{}...", truncate_str(t, 100))
3027                        } else {
3028                            t.to_string()
3029                        }
3030                    });
3031                }
3032            }
3033
3034            // Extract relations via LLM if available
3035            if let Some(ref provider) = self.llm_provider {
3036                if let Some(text) = record.text_content() {
3037                    if let Ok(relations) = provider.extract_relations(text).await {
3038                        for rel in relations {
3039                            // Store as metadata on the semantic record
3040                            if let serde_json::Value::Object(ref mut map) = semantic_record.metadata
3041                            {
3042                                let relations_arr = map
3043                                    .entry("extracted_relations".to_string())
3044                                    .or_insert_with(|| serde_json::json!([]));
3045                                if let serde_json::Value::Array(ref mut arr) = relations_arr {
3046                                    arr.push(serde_json::json!({
3047                                        "subject": rel.subject,
3048                                        "predicate": rel.predicate,
3049                                        "object": rel.object,
3050                                        "confidence": rel.confidence,
3051                                    }));
3052                                }
3053                            }
3054                        }
3055                    }
3056                }
3057            }
3058
3059            dispatch_store!(self, StoreType::Semantic, store(semantic_record.clone()))?;
3060            semantic_created += 1;
3061            self.coordinator
3062                .register(
3063                    semantic_record.id,
3064                    StoreType::Semantic,
3065                    semantic_record.associations.clone(),
3066                )
3067                .await;
3068
3069            if let Err(e) = self.index_record(&semantic_record) {
3070                warn!(error = %e, "Failed to index consolidated record");
3071            }
3072
3073            let assoc = Association {
3074                target_id: semantic_record.id,
3075                association_type: AssociationType::Semantic,
3076                weight: 1.0,
3077                created_at: Utc::now(),
3078                last_co_activation: Utc::now(),
3079            };
3080            self.add_persisted_association(&record.id, assoc).await?;
3081
3082            migrated += 1;
3083
3084            if record.fidelity.score < 0.1 && self.episodic.delete(&record.id).await? {
3085                self.cleanup_deleted_records(&[(record.id, StoreType::Episodic)])
3086                    .await?;
3087                pruned += 1;
3088            }
3089        }
3090
3091        info!(
3092            processed,
3093            migrated, semantic_created, compressed, pruned, "Smart consolidation completed"
3094        );
3095
3096        Ok(ConsolidateResponse {
3097            records_processed: processed,
3098            records_migrated: migrated,
3099            records_compressed: compressed,
3100            records_pruned: pruned,
3101            semantic_nodes_created: semantic_created,
3102        })
3103    }
3104
3105    /// lifecycle.decay_tick — Advance decay (CMP Spec §5.2).
3106    pub async fn lifecycle_decay_tick(
3107        &self,
3108        req: DecayTickRequest,
3109    ) -> Result<DecayTickResponse, CerememoryError> {
3110        let tick_secs = req.tick_duration_seconds.unwrap_or(3600) as f64;
3111
3112        let mut all_inputs = Vec::new();
3113        let mut record_stores: HashMap<Uuid, StoreType> = HashMap::new();
3114
3115        for store_type in [
3116            StoreType::Episodic,
3117            StoreType::Semantic,
3118            StoreType::Procedural,
3119            StoreType::Emotional,
3120        ] {
3121            let records = dispatch_store!(self, store_type, get_all())?;
3122            for record in records {
3123                all_inputs.push(DecayInput {
3124                    id: record.id,
3125                    fidelity: record.fidelity.clone(),
3126                    emotion: record.emotion.clone(),
3127                    last_accessed_at: record.last_accessed_at,
3128                    access_count: record.access_count,
3129                });
3130                record_stores.insert(record.id, store_type);
3131            }
3132        }
3133
3134        let decay = self.decay.clone();
3135        let result =
3136            tokio::task::spawn_blocking(move || decay.compute_tick(&all_inputs, tick_secs))
3137                .await
3138                .map_err(|e| CerememoryError::Internal(format!("Decay task failed: {e}")))?;
3139
3140        for output in &result.updates {
3141            if let Some(&store_type) = record_stores.get(&output.id) {
3142                if output.should_prune {
3143                    if dispatch_store!(self, store_type, delete(&output.id))? {
3144                        self.cleanup_deleted_records(&[(output.id, store_type)])
3145                            .await?;
3146                    }
3147                } else {
3148                    dispatch_store!(
3149                        self,
3150                        store_type,
3151                        update_fidelity(&output.id, output.new_fidelity.clone())
3152                    )?;
3153                }
3154            }
3155        }
3156
3157        // Notify evolution engine about fidelity distributions
3158        let mut fidelity_by_store: HashMap<StoreType, Vec<f64>> = HashMap::new();
3159        for output in &result.updates {
3160            if let Some(&store_type) = record_stores.get(&output.id) {
3161                fidelity_by_store
3162                    .entry(store_type)
3163                    .or_default()
3164                    .push(output.new_fidelity.score);
3165            }
3166        }
3167        for (store_type, scores) in &fidelity_by_store {
3168            self.evolution.observe_decay_tick(*store_type, scores);
3169        }
3170
3171        info!(
3172            updated = result.records_updated,
3173            below_threshold = result.records_below_threshold,
3174            pruned = result.records_pruned,
3175            "Decay tick completed"
3176        );
3177
3178        Ok(DecayTickResponse {
3179            records_updated: result.records_updated,
3180            records_below_threshold: result.records_below_threshold,
3181            records_pruned: result.records_pruned,
3182        })
3183    }
3184
3185    /// lifecycle.set_mode (CMP Spec §5.3).
3186    pub async fn lifecycle_set_mode(&self, req: SetModeRequest) -> Result<(), CerememoryError> {
3187        *self.recall_mode.write().await = req.mode;
3188        info!(mode = ?req.mode, "Recall mode changed");
3189        Ok(())
3190    }
3191
3192    /// lifecycle.forget — Delete memory records (CMP Spec §5.4).
3193    pub async fn lifecycle_forget(&self, req: ForgetRequest) -> Result<u32, CerememoryError> {
3194        if !req.confirm {
3195            return Err(CerememoryError::ForgetUnconfirmed);
3196        }
3197
3198        let ForgetRequest {
3199            record_ids,
3200            store,
3201            temporal_range,
3202            cascade,
3203            ..
3204        } = req;
3205        let mut deleted = 0u32;
3206        let mut delete_targets: HashMap<Uuid, StoreType> = HashMap::new();
3207
3208        if let Some(ids) = record_ids {
3209            for id in ids {
3210                if let Some((record, store_type)) = self.get_store_record(&id).await? {
3211                    delete_targets.insert(id, store_type);
3212                    if cascade {
3213                        for assoc in &record.associations {
3214                            if let Some((_, st)) = self.get_store_record(&assoc.target_id).await? {
3215                                delete_targets.insert(assoc.target_id, st);
3216                            }
3217                        }
3218                    }
3219                }
3220            }
3221        }
3222
3223        if let Some(store_type) = store {
3224            let ids = dispatch_store!(self, store_type, list_ids())?;
3225            for id in ids {
3226                delete_targets.insert(id, store_type);
3227            }
3228        }
3229
3230        if let Some(range) = temporal_range {
3231            for store_type in ALL_STORES {
3232                let records = dispatch_store!(self, store_type, get_all())?;
3233                for record in records {
3234                    if record.created_at >= range.start && record.created_at <= range.end {
3235                        delete_targets.insert(record.id, store_type);
3236                    }
3237                }
3238            }
3239        }
3240
3241        let mut deleted_records = Vec::new();
3242        for (id, store_type) in delete_targets {
3243            if dispatch_store!(self, store_type, delete(&id))? {
3244                deleted_records.push((id, store_type));
3245                deleted += 1;
3246            }
3247        }
3248        self.cleanup_deleted_records(&deleted_records).await?;
3249
3250        warn!(deleted, "Forget operation completed");
3251        Ok(deleted)
3252    }
3253
3254    /// lifecycle.export — Export records to a CMA archive with optional store
3255    /// filtering and encryption.
3256    pub async fn lifecycle_export(
3257        &self,
3258        req: ExportRequest,
3259    ) -> Result<(Vec<u8>, ExportResponse), CerememoryError> {
3260        match Self::normalize_export_format(&req.format)? {
3261            "cma" => {}
3262            _ => unreachable!("normalize_export_format only returns supported formats"),
3263        }
3264
3265        let records = self
3266            .collect_records_for_stores(req.stores.as_deref())
3267            .await?;
3268        let raw_records = if req.include_raw_journal {
3269            self.collect_all_raw_journal_records().await?
3270        } else {
3271            Vec::new()
3272        };
3273
3274        let encryption_key = if req.encrypt {
3275            let key_str = req.encryption_key.as_deref().ok_or_else(|| {
3276                CerememoryError::Validation(
3277                    "encryption_key is required when encrypt=true".to_string(),
3278                )
3279            })?;
3280            Some(cerememory_archive::crypto::derive_key(key_str))
3281        } else {
3282            None
3283        };
3284
3285        if req.include_raw_journal {
3286            cerememory_archive::export_bundle_filtered(
3287                &records,
3288                &raw_records,
3289                req.stores.as_deref(),
3290                encryption_key.as_ref(),
3291            )
3292        } else {
3293            cerememory_archive::export_filtered(
3294                &records,
3295                req.stores.as_deref(),
3296                encryption_key.as_ref(),
3297            )
3298        }
3299    }
3300
3301    /// lifecycle.import — Import records from a CMA archive with optional
3302    /// decryption and conflict resolution.
3303    pub async fn lifecycle_import(&self, req: ImportRequest) -> Result<u32, CerememoryError> {
3304        let data = req.archive_data.ok_or_else(|| {
3305            CerememoryError::Validation("Import requires archive_data".to_string())
3306        })?;
3307
3308        let decryption_key = req
3309            .decryption_key
3310            .as_deref()
3311            .map(cerememory_archive::crypto::derive_key);
3312        let bundle = cerememory_archive::import_bundle_with_key(&data, decryption_key.as_ref())?;
3313        let imported = match req.strategy {
3314            ImportStrategy::Merge => {
3315                let imported_curated = self
3316                    .import_records_with_conflict_resolution(
3317                        bundle.records,
3318                        req.conflict_resolution,
3319                    )
3320                    .await?;
3321                let imported_raw = self
3322                    .import_raw_records_with_conflict_resolution(
3323                        bundle.raw_records,
3324                        req.conflict_resolution,
3325                    )
3326                    .await?;
3327                imported_curated + imported_raw
3328            }
3329            ImportStrategy::Replace => {
3330                let snapshot = self.collect_records_for_stores(None).await?;
3331                let raw_snapshot = self.collect_all_raw_journal_records().await?;
3332
3333                if let Err(err) = self.clear_all_records().await {
3334                    if let Err(restore_err) = self.restore_records(&snapshot).await {
3335                        return Err(CerememoryError::ImportConflict(format!(
3336                            "Replace import failed while clearing existing records: {err}. Rollback failed: {restore_err}"
3337                        )));
3338                    }
3339                    return Err(err);
3340                }
3341                if let Err(err) = self.clear_raw_journal().await {
3342                    let _ = self.restore_records(&snapshot).await;
3343                    if let Err(restore_err) = self.restore_raw_journal(&raw_snapshot).await {
3344                        return Err(CerememoryError::ImportConflict(format!(
3345                            "Replace import failed while clearing raw journal: {err}. Rollback failed: {restore_err}"
3346                        )));
3347                    }
3348                    return Err(CerememoryError::ImportConflict(format!(
3349                        "Replace import failed while clearing raw journal: {err}"
3350                    )));
3351                }
3352
3353                match async {
3354                    let imported_curated = self
3355                        .import_records_with_conflict_resolution(
3356                            bundle.records,
3357                            req.conflict_resolution,
3358                        )
3359                        .await?;
3360                    let imported_raw = self
3361                        .import_raw_records_with_conflict_resolution(
3362                            bundle.raw_records,
3363                            req.conflict_resolution,
3364                        )
3365                        .await?;
3366                    Ok::<u32, CerememoryError>(imported_curated + imported_raw)
3367                }
3368                .await
3369                {
3370                    Ok(imported) => imported,
3371                    Err(err) => {
3372                        if let Err(restore_err) = self.restore_records(&snapshot).await {
3373                            return Err(CerememoryError::ImportConflict(format!(
3374                                "Replace import failed: {err}. Rollback failed: {restore_err}"
3375                            )));
3376                        }
3377                        if let Err(restore_err) = self.restore_raw_journal(&raw_snapshot).await {
3378                            return Err(CerememoryError::ImportConflict(format!(
3379                                "Replace import failed: {err}. Raw rollback failed: {restore_err}"
3380                            )));
3381                        }
3382                        return Err(err);
3383                    }
3384                }
3385            }
3386        };
3387
3388        info!(imported, strategy = ?req.strategy, "Import completed");
3389        Ok(imported)
3390    }
3391
3392    /// Import records from a serialized CMA archive (convenience method).
3393    ///
3394    /// Uses `KeepExisting` conflict resolution (skips duplicates).
3395    pub async fn import_records(&self, data: &[u8]) -> Result<u32, CerememoryError> {
3396        let records = cerememory_archive::import_records(data)?;
3397        let mut imported = 0u32;
3398
3399        for record in records {
3400            let store_type = record.store;
3401
3402            // Check for ID conflicts — skip if record already exists
3403            if self.get_store_record(&record.id).await?.is_some() {
3404                continue;
3405            }
3406
3407            // Store first (source of truth)
3408            dispatch_store!(self, store_type, store(record.clone()))?;
3409
3410            // Then coordinator + index (rebuildable)
3411            self.coordinator
3412                .register(record.id, store_type, record.associations.clone())
3413                .await;
3414            if let Err(e) = self.index_record(&record) {
3415                warn!(error = %e, record_id = %record.id, "Failed to index imported record");
3416            }
3417            imported += 1;
3418        }
3419
3420        info!(imported, "Import completed");
3421        Ok(imported)
3422    }
3423
3424    /// Collect all records for export (used by archive module).
3425    pub async fn collect_all_records(&self) -> Result<Vec<MemoryRecord>, CerememoryError> {
3426        self.collect_records_for_stores(None).await
3427    }
3428
3429    /// Collect records from specified stores, or all stores if `None`.
3430    pub async fn collect_records_for_stores(
3431        &self,
3432        stores: Option<&[StoreType]>,
3433    ) -> Result<Vec<MemoryRecord>, CerememoryError> {
3434        let target_stores = stores.unwrap_or(&ALL_STORES);
3435
3436        let mut records = Vec::new();
3437        let mut seen_stores = HashSet::with_capacity(target_stores.len());
3438        for &store_type in target_stores {
3439            if !seen_stores.insert(store_type) {
3440                continue;
3441            }
3442
3443            let store_records = dispatch_store!(self, store_type, get_all())?;
3444            records.extend(store_records);
3445        }
3446        Ok(records)
3447    }
3448
3449    // ─── CMP Introspect Operations ───────────────────────────────────
3450
3451    /// introspect.stats (CMP Spec §6.1).
3452    pub async fn introspect_stats(&self) -> Result<StatsResponse, CerememoryError> {
3453        let mut records_by_store = HashMap::new();
3454        let mut avg_fidelity_by_store = HashMap::new();
3455        let mut total_records = 0u32;
3456        let mut total_fidelity = 0.0f64;
3457        let mut dream_episodic_summaries = 0u32;
3458        let mut dream_semantic_nodes = 0u32;
3459        let mut last_dream_tick_at: Option<chrono::DateTime<Utc>> = None;
3460
3461        for store_type in ALL_STORES {
3462            let count = dispatch_store!(self, store_type, count())? as u32;
3463            records_by_store.insert(store_type, count);
3464            total_records += count;
3465
3466            if count > 0 {
3467                let records = dispatch_store!(self, store_type, get_all())?;
3468                let mut store_fidelity = 0.0f64;
3469                for record in &records {
3470                    store_fidelity += record.fidelity.score;
3471                    total_fidelity += record.fidelity.score;
3472
3473                    if let Some(kind) = record
3474                        .metadata
3475                        .get("_dream")
3476                        .and_then(|value| value.get("kind"))
3477                        .and_then(|value| value.as_str())
3478                    {
3479                        match kind {
3480                            "episodic_summary" if store_type == StoreType::Episodic => {
3481                                dream_episodic_summaries += 1;
3482                            }
3483                            "semantic_summary" if store_type == StoreType::Semantic => {
3484                                dream_semantic_nodes += 1;
3485                            }
3486                            _ => {}
3487                        }
3488                    }
3489                    if let Some(timestamp) = record
3490                        .metadata
3491                        .get("_origin")
3492                        .and_then(|value| value.get("dream_tick_at"))
3493                        .and_then(|value| value.as_str())
3494                        .and_then(|value| chrono::DateTime::parse_from_rfc3339(value).ok())
3495                        .map(|value| value.with_timezone(&Utc))
3496                    {
3497                        if last_dream_tick_at.is_none_or(|current| timestamp > current) {
3498                            last_dream_tick_at = Some(timestamp);
3499                        }
3500                    }
3501                }
3502                avg_fidelity_by_store.insert(store_type, store_fidelity / count as f64);
3503            }
3504        }
3505
3506        let raw_journal_all = self.raw_journal.get_all().await?;
3507        let raw_journal_records = raw_journal_all.len() as u32;
3508        let raw_journal_pending_dream = raw_journal_all
3509            .iter()
3510            .filter(|record| !Self::raw_record_processed(record))
3511            .count() as u32;
3512
3513        let avg_fidelity = if total_records > 0 {
3514            total_fidelity / total_records as f64
3515        } else {
3516            0.0
3517        };
3518
3519        Ok(StatsResponse {
3520            total_records,
3521            records_by_store,
3522            total_associations: self.coordinator.total_associations().await,
3523            avg_fidelity,
3524            avg_fidelity_by_store,
3525            oldest_record: None,
3526            newest_record: None,
3527            total_recall_count: 0,
3528            raw_journal_records,
3529            raw_journal_pending_dream,
3530            dream_episodic_summaries,
3531            dream_semantic_nodes,
3532            last_dream_tick_at,
3533            evolution_metrics: Some(self.evolution.get_metrics()),
3534            background_decay_enabled: self.is_background_decay_enabled().await,
3535            background_dream_enabled: self.is_background_dream_enabled().await,
3536        })
3537    }
3538
3539    /// introspect.record (CMP Spec §6.2).
3540    pub async fn introspect_record(
3541        &self,
3542        req: RecordIntrospectRequest,
3543    ) -> Result<MemoryRecord, CerememoryError> {
3544        let (record, _) = self
3545            .get_store_record(&req.record_id)
3546            .await?
3547            .ok_or_else(|| CerememoryError::RecordNotFound(req.record_id.to_string()))?;
3548
3549        Ok(record)
3550    }
3551
3552    /// introspect.decay_forecast — Forward-calculate fidelity (CMP Spec §6.3, OPTIONAL).
3553    pub async fn introspect_decay_forecast(
3554        &self,
3555        req: DecayForecastRequest,
3556    ) -> Result<DecayForecastResponse, CerememoryError> {
3557        let mut forecasts = Vec::with_capacity(req.record_ids.len());
3558
3559        for record_id in &req.record_ids {
3560            let (record, _) = self
3561                .get_store_record(record_id)
3562                .await?
3563                .ok_or_else(|| CerememoryError::RecordNotFound(record_id.to_string()))?;
3564
3565            let current_fidelity = record.fidelity.score;
3566            // Match real decay engine: baseline = max(last_accessed_at, last_decay_tick)
3567            let last_access_secs = record.last_accessed_at.timestamp() as f64;
3568            let last_tick_secs = record.fidelity.last_decay_tick.timestamp() as f64;
3569            let baseline_secs = last_access_secs.max(last_tick_secs);
3570            let forecast_secs = req.forecast_at.timestamp() as f64;
3571            let elapsed_secs = (forecast_secs - baseline_secs).max(0.0);
3572
3573            // Use per-record decay_rate (not global params), matching real decay engine
3574            let decay_exponent = record.fidelity.decay_rate;
3575            let emotion_mod = cerememory_decay::math::compute_emotion_mod(record.emotion.intensity);
3576            let forecasted_fidelity = cerememory_decay::math::compute_fidelity(
3577                current_fidelity,
3578                elapsed_secs,
3579                record.fidelity.stability,
3580                decay_exponent,
3581                emotion_mod,
3582            );
3583
3584            // Binary search for threshold crossing date
3585            let params = self.decay.params();
3586            let estimated_threshold_date = if current_fidelity > params.prune_threshold {
3587                Self::estimate_threshold_date(
3588                    &record,
3589                    decay_exponent,
3590                    params.prune_threshold,
3591                    emotion_mod,
3592                )
3593            } else {
3594                // Already below threshold
3595                None
3596            };
3597
3598            forecasts.push(DecayForecast {
3599                record_id: *record_id,
3600                current_fidelity,
3601                forecasted_fidelity,
3602                estimated_threshold_date,
3603            });
3604        }
3605
3606        Ok(DecayForecastResponse { forecasts })
3607    }
3608
3609    /// Binary search for the date when fidelity drops below the prune threshold.
3610    fn estimate_threshold_date(
3611        record: &MemoryRecord,
3612        decay_exponent: f64,
3613        prune_threshold: f64,
3614        emotion_mod: f64,
3615    ) -> Option<chrono::DateTime<Utc>> {
3616        // Match real decay engine: baseline = max(last_accessed_at, last_decay_tick)
3617        let base_time = record.last_accessed_at.max(record.fidelity.last_decay_tick);
3618        let f0 = record.fidelity.score;
3619        let stability = record.fidelity.stability;
3620
3621        // Search up to ~10 years in seconds
3622        let mut lo: f64 = 0.0;
3623        let mut hi: f64 = 315_360_000.0;
3624
3625        // Check if fidelity ever drops below threshold within search range
3626        let f_hi = cerememory_decay::math::compute_fidelity(
3627            f0,
3628            hi,
3629            stability,
3630            decay_exponent,
3631            emotion_mod,
3632        );
3633        if f_hi >= prune_threshold {
3634            return None; // Won't cross threshold within 10 years
3635        }
3636
3637        // Binary search (30 iterations gives sub-second precision over 10yr range)
3638        for _ in 0..30 {
3639            let mid = (lo + hi) / 2.0;
3640            let f_mid = cerememory_decay::math::compute_fidelity(
3641                f0,
3642                mid,
3643                stability,
3644                decay_exponent,
3645                emotion_mod,
3646            );
3647            if f_mid > prune_threshold {
3648                lo = mid;
3649            } else {
3650                hi = mid;
3651            }
3652        }
3653
3654        let threshold_secs = ((lo + hi) / 2.0) as i64;
3655        Some(base_time + chrono::Duration::seconds(threshold_secs))
3656    }
3657
3658    /// introspect.evolution — Return evolution engine metrics (CMP Spec §6.4, OPTIONAL).
3659    pub async fn introspect_evolution(&self) -> Result<EvolutionMetrics, CerememoryError> {
3660        Ok(self.evolution.get_metrics())
3661    }
3662}
3663
3664// Re-export truncate_str from core to avoid duplication.
3665use cerememory_core::truncate_str;
3666
3667/// Apply human-mode noise to content based on fidelity.
3668/// Only applies to text blocks — non-text modalities are returned unchanged.
3669fn apply_human_noise(content: &MemoryContent, fidelity: f64) -> MemoryContent {
3670    if fidelity >= 0.95 {
3671        return content.clone();
3672    }
3673
3674    let mut noised = content.clone();
3675    for block in &mut noised.blocks {
3676        if block.modality == Modality::Text {
3677            if let Ok(text) = std::str::from_utf8(&block.data) {
3678                let degraded = degrade_text(text, fidelity);
3679                block.data = degraded.into_bytes();
3680            }
3681        }
3682    }
3683    noised
3684}
3685
3686/// Degrade text based on fidelity level.
3687fn degrade_text(text: &str, fidelity: f64) -> String {
3688    if fidelity >= 0.9 {
3689        return text.to_string();
3690    }
3691
3692    let words: Vec<&str> = text.split_whitespace().collect();
3693    if words.is_empty() {
3694        return text.to_string();
3695    }
3696
3697    let degrade_fraction = (1.0 - fidelity).min(0.8);
3698    let step = (1.0 / degrade_fraction).max(2.0) as usize;
3699
3700    let mut result = Vec::with_capacity(words.len());
3701    for (i, word) in words.iter().enumerate() {
3702        if i % step == 0 {
3703            result.push("...");
3704        } else {
3705            result.push(word);
3706        }
3707    }
3708
3709    result.join(" ")
3710}
3711
3712#[cfg(test)]
3713mod tests {
3714    use super::*;
3715
3716    async fn make_engine() -> CerememoryEngine {
3717        CerememoryEngine::in_memory().unwrap()
3718    }
3719
3720    fn text_store_req(text: &str, store: Option<StoreType>) -> EncodeStoreRequest {
3721        EncodeStoreRequest {
3722            header: None,
3723            content: MemoryContent {
3724                blocks: vec![ContentBlock {
3725                    modality: Modality::Text,
3726                    format: "text/plain".to_string(),
3727                    data: text.as_bytes().to_vec(),
3728                    embedding: None,
3729                }],
3730                summary: None,
3731            },
3732            store,
3733            emotion: None,
3734            context: None,
3735            metadata: None,
3736            associations: None,
3737        }
3738    }
3739
3740    fn structured_store_req(json: &str, store: Option<StoreType>) -> EncodeStoreRequest {
3741        EncodeStoreRequest {
3742            header: None,
3743            content: MemoryContent {
3744                blocks: vec![ContentBlock {
3745                    modality: Modality::Structured,
3746                    format: "application/json".to_string(),
3747                    data: json.as_bytes().to_vec(),
3748                    embedding: None,
3749                }],
3750                summary: None,
3751            },
3752            store,
3753            emotion: None,
3754            context: None,
3755            metadata: None,
3756            associations: None,
3757        }
3758    }
3759
3760    fn raw_text_record(session_id: &str, text: &str) -> RawJournalRecord {
3761        RawJournalRecord::new_text(
3762            session_id,
3763            RawSource::Conversation,
3764            RawSpeaker::User,
3765            RawVisibility::Normal,
3766            SecrecyLevel::Public,
3767            text,
3768        )
3769    }
3770
3771    fn raw_text_store_req(
3772        session_id: &str,
3773        text: &str,
3774        visibility: RawVisibility,
3775        secrecy_level: SecrecyLevel,
3776    ) -> EncodeStoreRawRequest {
3777        EncodeStoreRawRequest {
3778            header: None,
3779            session_id: session_id.to_string(),
3780            turn_id: None,
3781            topic_id: None,
3782            source: RawSource::Conversation,
3783            speaker: RawSpeaker::User,
3784            visibility,
3785            secrecy_level,
3786            content: MemoryContent {
3787                blocks: vec![ContentBlock {
3788                    modality: Modality::Text,
3789                    format: "text/plain".to_string(),
3790                    data: text.as_bytes().to_vec(),
3791                    embedding: None,
3792                }],
3793                summary: None,
3794            },
3795            metadata: None,
3796        }
3797    }
3798
3799    #[tokio::test]
3800    async fn raw_journal_append_and_get_roundtrip() {
3801        let engine = make_engine().await;
3802        let record = raw_text_record("sess-raw-1", "hello from raw journal");
3803        let id = record.id;
3804
3805        let stored_id = engine.append_raw_journal(record).await.unwrap();
3806        assert_eq!(stored_id, id);
3807
3808        let restored = engine.get_raw_journal_record(&id).await.unwrap().unwrap();
3809        assert_eq!(restored.session_id, "sess-raw-1");
3810        assert_eq!(restored.text_content(), Some("hello from raw journal"));
3811    }
3812
3813    #[tokio::test]
3814    async fn raw_journal_query_session_filters_records() {
3815        let engine = make_engine().await;
3816        engine
3817            .append_raw_journal(raw_text_record("sess-a", "first"))
3818            .await
3819            .unwrap();
3820        engine
3821            .append_raw_journal(raw_text_record("sess-b", "second"))
3822            .await
3823            .unwrap();
3824        engine
3825            .append_raw_journal(raw_text_record("sess-a", "third"))
3826            .await
3827            .unwrap();
3828
3829        let records = engine.query_raw_journal_by_session("sess-a").await.unwrap();
3830        assert_eq!(records.len(), 2);
3831        assert!(records.iter().all(|record| record.session_id == "sess-a"));
3832    }
3833
3834    #[tokio::test]
3835    async fn raw_journal_count_tracks_records() {
3836        let engine = make_engine().await;
3837        assert_eq!(engine.raw_journal_count().await.unwrap(), 0);
3838
3839        engine
3840            .append_raw_journal(raw_text_record("sess-a", "first"))
3841            .await
3842            .unwrap();
3843        engine
3844            .append_raw_journal(raw_text_record("sess-a", "second"))
3845            .await
3846            .unwrap();
3847
3848        assert_eq!(engine.raw_journal_count().await.unwrap(), 2);
3849    }
3850
3851    #[tokio::test]
3852    async fn encode_store_raw_and_recall_raw_roundtrip() {
3853        let engine = make_engine().await;
3854        let response = engine
3855            .encode_store_raw(raw_text_store_req(
3856                "sess-raw",
3857                "forensic memory",
3858                RawVisibility::Normal,
3859                SecrecyLevel::Public,
3860            ))
3861            .await
3862            .unwrap();
3863
3864        let recalled = engine
3865            .recall_raw_query(RecallRawQueryRequest {
3866                header: None,
3867                session_id: Some("sess-raw".to_string()),
3868                query: Some("forensic".to_string()),
3869                temporal: None,
3870                limit: 10,
3871                include_private_scratch: false,
3872                include_sealed: false,
3873                secrecy_levels: None,
3874            })
3875            .await
3876            .unwrap();
3877
3878        assert_eq!(recalled.total_candidates, 1);
3879        assert_eq!(recalled.records.len(), 1);
3880        assert_eq!(recalled.records[0].id, response.record_id);
3881        assert_eq!(recalled.records[0].text_content(), Some("forensic memory"));
3882    }
3883
3884    #[tokio::test]
3885    async fn recall_raw_defaults_exclude_private_scratch_and_secret() {
3886        let engine = make_engine().await;
3887        engine
3888            .encode_batch_store_raw(EncodeBatchStoreRawRequest {
3889                header: None,
3890                records: vec![
3891                    raw_text_store_req(
3892                        "sess-secure",
3893                        "public normal",
3894                        RawVisibility::Normal,
3895                        SecrecyLevel::Public,
3896                    ),
3897                    raw_text_store_req(
3898                        "sess-secure",
3899                        "private scratch",
3900                        RawVisibility::PrivateScratch,
3901                        SecrecyLevel::Sensitive,
3902                    ),
3903                    raw_text_store_req(
3904                        "sess-secure",
3905                        "sealed secret",
3906                        RawVisibility::Sealed,
3907                        SecrecyLevel::Secret,
3908                    ),
3909                ],
3910            })
3911            .await
3912            .unwrap();
3913
3914        let recalled = engine
3915            .recall_raw_query(RecallRawQueryRequest {
3916                header: None,
3917                session_id: Some("sess-secure".to_string()),
3918                query: None,
3919                temporal: None,
3920                limit: 10,
3921                include_private_scratch: false,
3922                include_sealed: false,
3923                secrecy_levels: None,
3924            })
3925            .await
3926            .unwrap();
3927
3928        assert_eq!(recalled.total_candidates, 1);
3929        assert_eq!(recalled.records[0].text_content(), Some("public normal"));
3930    }
3931
3932    #[tokio::test]
3933    async fn raw_journal_does_not_leak_into_normal_recall() {
3934        let engine = make_engine().await;
3935        engine
3936            .encode_store_raw(raw_text_store_req(
3937                "sess-leak",
3938                "hidden raw only",
3939                RawVisibility::Normal,
3940                SecrecyLevel::Public,
3941            ))
3942            .await
3943            .unwrap();
3944
3945        let response = engine
3946            .recall_query(RecallQueryRequest {
3947                header: None,
3948                cue: RecallCue {
3949                    text: Some("hidden raw".to_string()),
3950                    ..Default::default()
3951                },
3952                stores: None,
3953                limit: 10,
3954                min_fidelity: None,
3955                include_decayed: false,
3956                reconsolidate: false,
3957                activation_depth: 0,
3958                recall_mode: RecallMode::Perfect,
3959            })
3960            .await
3961            .unwrap();
3962
3963        assert_eq!(response.total_candidates, 0);
3964        assert!(response.memories.is_empty());
3965    }
3966
3967    #[tokio::test]
3968    async fn dream_tick_creates_episodic_summary_and_marks_raw_processed() {
3969        let engine = make_engine().await;
3970        let first = engine
3971            .encode_store_raw(raw_text_store_req(
3972                "sess-dream",
3973                "Discussed API timeout policy",
3974                RawVisibility::Normal,
3975                SecrecyLevel::Public,
3976            ))
3977            .await
3978            .unwrap();
3979        let second = engine
3980            .encode_store_raw(raw_text_store_req(
3981                "sess-dream",
3982                "Decided to keep retries idempotent-only",
3983                RawVisibility::Normal,
3984                SecrecyLevel::Public,
3985            ))
3986            .await
3987            .unwrap();
3988
3989        let resp = engine
3990            .lifecycle_dream_tick(DreamTickRequest {
3991                header: None,
3992                session_id: Some("sess-dream".to_string()),
3993                dry_run: false,
3994                max_groups: 10,
3995                include_private_scratch: false,
3996                include_sealed: false,
3997                promote_semantic: true,
3998                secrecy_levels: None,
3999            })
4000            .await
4001            .unwrap();
4002
4003        assert_eq!(resp.groups_processed, 1);
4004        assert_eq!(resp.raw_records_processed, 2);
4005        assert_eq!(resp.episodic_summaries_created, 1);
4006        assert_eq!(resp.semantic_nodes_created, 1);
4007
4008        let summaries = engine.episodic.get_all().await.unwrap();
4009        assert_eq!(summaries.len(), 1);
4010        let summary = &summaries[0];
4011        let semantic_records = engine.semantic.get_all().await.unwrap();
4012        assert_eq!(semantic_records.len(), 1);
4013        let semantic = &semantic_records[0];
4014        assert_eq!(summary.store, StoreType::Episodic);
4015        assert_eq!(summary.metadata["_origin"]["raw_session_id"], "sess-dream");
4016        assert_eq!(summary.metadata["_origin"]["raw_record_count"], 2);
4017        assert_eq!(
4018            summary.metadata["_origin"]["raw_record_ids"][0],
4019            first.record_id.to_string()
4020        );
4021        assert_eq!(
4022            summary.metadata["_origin"]["raw_record_ids"][1],
4023            second.record_id.to_string()
4024        );
4025
4026        let raw_one = engine
4027            .get_raw_journal_record(&first.record_id)
4028            .await
4029            .unwrap()
4030            .unwrap();
4031        let raw_two = engine
4032            .get_raw_journal_record(&second.record_id)
4033            .await
4034            .unwrap()
4035            .unwrap();
4036        assert_eq!(raw_one.derived_memory_ids, vec![summary.id, semantic.id]);
4037        assert_eq!(raw_two.derived_memory_ids, vec![summary.id, semantic.id]);
4038        assert_eq!(
4039            raw_one.metadata["_dream"]["last_summary_id"],
4040            serde_json::Value::String(summary.id.to_string())
4041        );
4042        assert!(raw_one.metadata["_dream"]["processed_at"].is_string());
4043    }
4044
4045    #[tokio::test]
4046    async fn dream_tick_is_idempotent_for_processed_raw_records() {
4047        let engine = make_engine().await;
4048        engine
4049            .encode_store_raw(raw_text_store_req(
4050                "sess-dream-repeat",
4051                "First raw note",
4052                RawVisibility::Normal,
4053                SecrecyLevel::Public,
4054            ))
4055            .await
4056            .unwrap();
4057
4058        let first = engine
4059            .lifecycle_dream_tick(DreamTickRequest {
4060                header: None,
4061                session_id: Some("sess-dream-repeat".to_string()),
4062                dry_run: false,
4063                max_groups: 10,
4064                include_private_scratch: false,
4065                include_sealed: false,
4066                promote_semantic: true,
4067                secrecy_levels: None,
4068            })
4069            .await
4070            .unwrap();
4071        let second = engine
4072            .lifecycle_dream_tick(DreamTickRequest {
4073                header: None,
4074                session_id: Some("sess-dream-repeat".to_string()),
4075                dry_run: false,
4076                max_groups: 10,
4077                include_private_scratch: false,
4078                include_sealed: false,
4079                promote_semantic: true,
4080                secrecy_levels: None,
4081            })
4082            .await
4083            .unwrap();
4084
4085        assert_eq!(first.episodic_summaries_created, 1);
4086        assert_eq!(first.semantic_nodes_created, 0);
4087        assert_eq!(second.episodic_summaries_created, 0);
4088        assert_eq!(second.semantic_nodes_created, 0);
4089        assert_eq!(engine.episodic.count().await.unwrap(), 1);
4090    }
4091
4092    #[tokio::test]
4093    async fn dream_tick_redacts_sealed_secret_and_private_scratch_content() {
4094        let engine = make_engine().await;
4095        engine
4096            .encode_batch_store_raw(EncodeBatchStoreRawRequest {
4097                header: None,
4098                records: vec![
4099                    raw_text_store_req(
4100                        "sess-dream-redact",
4101                        "Visible public note",
4102                        RawVisibility::Normal,
4103                        SecrecyLevel::Public,
4104                    ),
4105                    raw_text_store_req(
4106                        "sess-dream-redact",
4107                        "Private scratch hypothesis",
4108                        RawVisibility::PrivateScratch,
4109                        SecrecyLevel::Sensitive,
4110                    ),
4111                    raw_text_store_req(
4112                        "sess-dream-redact",
4113                        "Sealed customer secret",
4114                        RawVisibility::Sealed,
4115                        SecrecyLevel::Secret,
4116                    ),
4117                ],
4118            })
4119            .await
4120            .unwrap();
4121
4122        let resp = engine
4123            .lifecycle_dream_tick(DreamTickRequest {
4124                header: None,
4125                session_id: Some("sess-dream-redact".to_string()),
4126                dry_run: false,
4127                max_groups: 10,
4128                include_private_scratch: true,
4129                include_sealed: true,
4130                promote_semantic: true,
4131                secrecy_levels: Some(vec![
4132                    SecrecyLevel::Public,
4133                    SecrecyLevel::Sensitive,
4134                    SecrecyLevel::Secret,
4135                ]),
4136            })
4137            .await
4138            .unwrap();
4139
4140        assert_eq!(resp.groups_processed, 1);
4141        assert_eq!(resp.semantic_nodes_created, 0);
4142        let summaries = engine.episodic.get_all().await.unwrap();
4143        assert_eq!(summaries.len(), 1);
4144        let summary = &summaries[0];
4145
4146        let summary_text = summary.text_content().unwrap_or("");
4147        assert!(summary_text.contains("Visible public note"));
4148        assert!(!summary_text.contains("Private scratch hypothesis"));
4149        assert!(!summary_text.contains("Sealed customer secret"));
4150        assert!(summary_text.contains("Redacted raw records: 2"));
4151
4152        assert_eq!(
4153            summary.metadata["_dream"]["summary_stats"]["normal_records"],
4154            1
4155        );
4156        assert_eq!(
4157            summary.metadata["_dream"]["summary_stats"]["private_scratch_redacted"],
4158            1
4159        );
4160        assert_eq!(
4161            summary.metadata["_dream"]["summary_stats"]["secret_redacted"],
4162            1
4163        );
4164    }
4165
4166    #[tokio::test]
4167    async fn dream_tick_infers_multiple_topics_with_time_gap_and_lexical_shift() {
4168        let engine = make_engine().await;
4169        let base = Utc::now() - chrono::Duration::hours(2);
4170
4171        let mut first = raw_text_record("sess-topic-infer", "API timeout retries idempotent only");
4172        first.created_at = base;
4173        first.updated_at = base;
4174
4175        let mut second = raw_text_record("sess-topic-infer", "Backoff budget and timeout policy");
4176        second.created_at = base + chrono::Duration::minutes(5);
4177        second.updated_at = second.created_at;
4178
4179        let mut third = raw_text_record("sess-topic-infer", "Landing page hero typography palette");
4180        third.created_at = base + chrono::Duration::minutes(25);
4181        third.updated_at = third.created_at;
4182
4183        engine.append_raw_journal(first).await.unwrap();
4184        engine.append_raw_journal(second).await.unwrap();
4185        engine.append_raw_journal(third).await.unwrap();
4186
4187        let resp = engine
4188            .lifecycle_dream_tick(DreamTickRequest {
4189                header: None,
4190                session_id: Some("sess-topic-infer".to_string()),
4191                dry_run: false,
4192                max_groups: 10,
4193                include_private_scratch: false,
4194                include_sealed: false,
4195                promote_semantic: true,
4196                secrecy_levels: None,
4197            })
4198            .await
4199            .unwrap();
4200
4201        assert_eq!(resp.groups_processed, 2);
4202        assert_eq!(resp.episodic_summaries_created, 2);
4203        assert_eq!(resp.semantic_nodes_created, 1);
4204
4205        let summaries = engine.episodic.get_all().await.unwrap();
4206        assert_eq!(summaries.len(), 2);
4207        assert!(summaries.iter().all(|summary| {
4208            summary.metadata["_origin"]["raw_topic_inferred"] == serde_json::Value::Bool(true)
4209        }));
4210        assert!(summaries
4211            .iter()
4212            .all(|summary| { summary.metadata["_origin"]["raw_topic_hint"].is_string() }));
4213    }
4214
4215    #[tokio::test]
4216    async fn encode_recall_roundtrip() {
4217        let engine = make_engine().await;
4218
4219        let resp = engine
4220            .encode_store(text_store_req(
4221                "The quick brown fox",
4222                Some(StoreType::Episodic),
4223            ))
4224            .await
4225            .unwrap();
4226        assert_eq!(resp.store, StoreType::Episodic);
4227        assert_eq!(resp.initial_fidelity, 1.0);
4228
4229        let query = RecallQueryRequest {
4230            header: None,
4231            cue: RecallCue {
4232                text: Some("quick brown".to_string()),
4233                ..Default::default()
4234            },
4235            stores: None,
4236            limit: 10,
4237            min_fidelity: None,
4238            include_decayed: false,
4239            reconsolidate: true,
4240            activation_depth: 0,
4241            recall_mode: RecallMode::Perfect,
4242        };
4243
4244        let recall_resp = engine.recall_query(query).await.unwrap();
4245        assert!(!recall_resp.memories.is_empty());
4246        assert_eq!(
4247            recall_resp.memories[0].record.text_content(),
4248            Some("The quick brown fox")
4249        );
4250    }
4251
4252    #[tokio::test]
4253    async fn encode_store_persists_metadata_and_context() {
4254        let engine = make_engine().await;
4255
4256        let resp = engine
4257            .encode_store(EncodeStoreRequest {
4258                header: None,
4259                content: MemoryContent {
4260                    blocks: vec![ContentBlock {
4261                        modality: Modality::Text,
4262                        format: "text/plain".to_string(),
4263                        data: b"metadata roundtrip".to_vec(),
4264                        embedding: None,
4265                    }],
4266                    summary: None,
4267                },
4268                store: Some(StoreType::Episodic),
4269                emotion: None,
4270                context: Some(EncodeContext {
4271                    source: Some("chat".to_string()),
4272                    session_id: Some("sess-1".to_string()),
4273                    spatial: None,
4274                    temporal: None,
4275                }),
4276                metadata: Some(serde_json::json!({"tag": "important"})),
4277                associations: None,
4278            })
4279            .await
4280            .unwrap();
4281
4282        let record = engine
4283            .introspect_record(RecordIntrospectRequest {
4284                header: None,
4285                record_id: resp.record_id,
4286                include_history: false,
4287                include_associations: false,
4288                include_versions: false,
4289            })
4290            .await
4291            .unwrap();
4292        assert_eq!(record.metadata["tag"], "important");
4293        assert_eq!(record.metadata["_context"]["source"], "chat");
4294        assert_eq!(record.metadata["_context"]["session_id"], "sess-1");
4295    }
4296
4297    #[tokio::test]
4298    async fn tantivy_tokenized_search() {
4299        let engine = make_engine().await;
4300
4301        engine
4302            .encode_store(text_store_req(
4303                "The quick brown fox jumps over the lazy dog",
4304                Some(StoreType::Episodic),
4305            ))
4306            .await
4307            .unwrap();
4308
4309        // Tantivy should find "quick" via tokenized search
4310        let query = RecallQueryRequest {
4311            header: None,
4312            cue: RecallCue {
4313                text: Some("quick".to_string()),
4314                ..Default::default()
4315            },
4316            stores: None,
4317            limit: 10,
4318            min_fidelity: None,
4319            include_decayed: false,
4320            reconsolidate: false,
4321            activation_depth: 0,
4322            recall_mode: RecallMode::Perfect,
4323        };
4324
4325        let resp = engine.recall_query(query).await.unwrap();
4326        assert!(!resp.memories.is_empty());
4327        assert_eq!(
4328            resp.memories[0].record.text_content(),
4329            Some("The quick brown fox jumps over the lazy dog")
4330        );
4331    }
4332
4333    #[tokio::test]
4334    async fn vector_search_recall() {
4335        let engine = make_engine().await;
4336
4337        // Store with embedding
4338        let req = EncodeStoreRequest {
4339            header: None,
4340            content: MemoryContent {
4341                blocks: vec![ContentBlock {
4342                    modality: Modality::Text,
4343                    format: "text/plain".to_string(),
4344                    data: b"Cats are fluffy animals".to_vec(),
4345                    embedding: Some(vec![1.0, 0.0, 0.0]),
4346                }],
4347                summary: None,
4348            },
4349            store: Some(StoreType::Episodic),
4350            emotion: None,
4351            context: None,
4352            metadata: None,
4353            associations: None,
4354        };
4355        engine.encode_store(req).await.unwrap();
4356
4357        // Recall with embedding
4358        let query = RecallQueryRequest {
4359            header: None,
4360            cue: RecallCue {
4361                embedding: Some(vec![1.0, 0.1, 0.0]),
4362                ..Default::default()
4363            },
4364            stores: None,
4365            limit: 10,
4366            min_fidelity: None,
4367            include_decayed: false,
4368            reconsolidate: false,
4369            activation_depth: 0,
4370            recall_mode: RecallMode::Perfect,
4371        };
4372
4373        let resp = engine.recall_query(query).await.unwrap();
4374        assert!(!resp.memories.is_empty());
4375        assert_eq!(
4376            resp.memories[0].record.text_content(),
4377            Some("Cats are fluffy animals")
4378        );
4379    }
4380
4381    #[tokio::test]
4382    async fn hybrid_text_vector_search() {
4383        let engine = make_engine().await;
4384
4385        // Store two records, one with both text and embedding
4386        let req1 = EncodeStoreRequest {
4387            header: None,
4388            content: MemoryContent {
4389                blocks: vec![ContentBlock {
4390                    modality: Modality::Text,
4391                    format: "text/plain".to_string(),
4392                    data: b"Machine learning is fascinating".to_vec(),
4393                    embedding: Some(vec![1.0, 0.0, 0.0]),
4394                }],
4395                summary: None,
4396            },
4397            store: Some(StoreType::Episodic),
4398            emotion: None,
4399            context: None,
4400            metadata: None,
4401            associations: None,
4402        };
4403        engine.encode_store(req1).await.unwrap();
4404
4405        // Search with both text and embedding
4406        let query = RecallQueryRequest {
4407            header: None,
4408            cue: RecallCue {
4409                text: Some("machine learning".to_string()),
4410                embedding: Some(vec![1.0, 0.0, 0.0]),
4411                ..Default::default()
4412            },
4413            stores: None,
4414            limit: 10,
4415            min_fidelity: None,
4416            include_decayed: false,
4417            reconsolidate: false,
4418            activation_depth: 0,
4419            recall_mode: RecallMode::Perfect,
4420        };
4421
4422        let resp = engine.recall_query(query).await.unwrap();
4423        assert!(!resp.memories.is_empty());
4424    }
4425
4426    #[tokio::test]
4427    async fn recall_query_metadata_counts_temporal_only_searches() {
4428        let engine = make_engine().await;
4429
4430        engine
4431            .encode_store(text_store_req(
4432                "Temporal-only query metadata should count scanned records",
4433                Some(StoreType::Episodic),
4434            ))
4435            .await
4436            .unwrap();
4437
4438        let now = Utc::now();
4439        let resp = engine
4440            .recall_query(RecallQueryRequest {
4441                header: None,
4442                cue: RecallCue {
4443                    temporal: Some(TemporalRange {
4444                        start: now - chrono::Duration::minutes(1),
4445                        end: now + chrono::Duration::minutes(1),
4446                    }),
4447                    ..Default::default()
4448                },
4449                stores: None,
4450                limit: 10,
4451                min_fidelity: None,
4452                include_decayed: false,
4453                reconsolidate: false,
4454                activation_depth: 0,
4455                recall_mode: RecallMode::Perfect,
4456            })
4457            .await
4458            .unwrap();
4459
4460        let metadata = resp
4461            .query_metadata
4462            .expect("query metadata should be present");
4463        assert_eq!(metadata.total_records_scanned, 1);
4464        assert_eq!(resp.memories.len(), 1);
4465    }
4466
4467    #[tokio::test]
4468    async fn temporal_query_filters_activation_results_across_stores() {
4469        let engine = make_engine().await;
4470
4471        let mut old_semantic = MemoryRecord::new_text(StoreType::Semantic, "alpha out of range");
4472        old_semantic.created_at = Utc::now() - chrono::Duration::days(7);
4473        old_semantic.updated_at = old_semantic.created_at;
4474        old_semantic.last_accessed_at = old_semantic.created_at;
4475        engine.semantic.store(old_semantic.clone()).await.unwrap();
4476        engine
4477            .coordinator
4478            .register(
4479                old_semantic.id,
4480                StoreType::Semantic,
4481                old_semantic.associations.clone(),
4482            )
4483            .await;
4484        engine.index_record(&old_semantic).unwrap();
4485
4486        let current = MemoryRecord {
4487            associations: vec![Association {
4488                target_id: old_semantic.id,
4489                association_type: AssociationType::Semantic,
4490                weight: 0.9,
4491                created_at: Utc::now(),
4492                last_co_activation: Utc::now(),
4493            }],
4494            ..MemoryRecord::new_text(StoreType::Episodic, "alpha in range")
4495        };
4496        engine.episodic.store(current.clone()).await.unwrap();
4497        engine
4498            .coordinator
4499            .register(
4500                current.id,
4501                StoreType::Episodic,
4502                current.associations.clone(),
4503            )
4504            .await;
4505        engine.index_record(&current).unwrap();
4506
4507        let now = Utc::now();
4508        let resp = engine
4509            .recall_query(RecallQueryRequest {
4510                header: None,
4511                cue: RecallCue {
4512                    text: Some("alpha".to_string()),
4513                    temporal: Some(TemporalRange {
4514                        start: now - chrono::Duration::minutes(1),
4515                        end: now + chrono::Duration::minutes(1),
4516                    }),
4517                    ..Default::default()
4518                },
4519                stores: None,
4520                limit: 10,
4521                min_fidelity: None,
4522                include_decayed: false,
4523                reconsolidate: false,
4524                activation_depth: 1,
4525                recall_mode: RecallMode::Perfect,
4526            })
4527            .await
4528            .unwrap();
4529
4530        assert_eq!(resp.memories.len(), 1);
4531        assert_eq!(resp.memories[0].record.id, current.id);
4532    }
4533
4534    #[tokio::test]
4535    async fn encode_batch_with_associations() {
4536        let engine = make_engine().await;
4537
4538        let batch = EncodeBatchRequest {
4539            header: None,
4540            records: vec![
4541                text_store_req("First memory", Some(StoreType::Episodic)),
4542                text_store_req("Second memory", Some(StoreType::Episodic)),
4543                text_store_req("Third memory", Some(StoreType::Episodic)),
4544            ],
4545            infer_associations: true,
4546        };
4547
4548        let resp = engine.encode_batch(batch).await.unwrap();
4549        assert_eq!(resp.results.len(), 3);
4550        assert_eq!(resp.associations_inferred, 4);
4551    }
4552
4553    #[tokio::test]
4554    async fn encode_batch_associations_survive_rebuild() {
4555        let engine = make_engine().await;
4556
4557        let resp = engine
4558            .encode_batch(EncodeBatchRequest {
4559                header: None,
4560                records: vec![
4561                    text_store_req("Persisted first", Some(StoreType::Episodic)),
4562                    text_store_req("Persisted second", Some(StoreType::Episodic)),
4563                ],
4564                infer_associations: true,
4565            })
4566            .await
4567            .unwrap();
4568
4569        engine.rebuild_coordinator().await.unwrap();
4570
4571        let first = engine
4572            .introspect_record(RecordIntrospectRequest {
4573                header: None,
4574                record_id: resp.results[0].record_id,
4575                include_history: false,
4576                include_associations: true,
4577                include_versions: false,
4578            })
4579            .await
4580            .unwrap();
4581        assert_eq!(first.associations.len(), 1);
4582
4583        let assoc_resp = engine
4584            .recall_associate(RecallAssociateRequest {
4585                header: None,
4586                record_id: resp.results[0].record_id,
4587                association_types: Some(vec![AssociationType::Sequential]),
4588                depth: 1,
4589                min_weight: 0.1,
4590                limit: 10,
4591            })
4592            .await
4593            .unwrap();
4594        assert_eq!(assoc_resp.memories.len(), 1);
4595        assert_eq!(assoc_resp.memories[0].record.id, resp.results[1].record_id);
4596    }
4597
4598    #[tokio::test]
4599    async fn decay_tick_integration() {
4600        let engine = make_engine().await;
4601
4602        for i in 0..5 {
4603            engine
4604                .encode_store(text_store_req(
4605                    &format!("Memory {i}"),
4606                    Some(StoreType::Episodic),
4607                ))
4608                .await
4609                .unwrap();
4610        }
4611
4612        let resp = engine
4613            .lifecycle_decay_tick(DecayTickRequest {
4614                header: None,
4615                tick_duration_seconds: Some(86400),
4616            })
4617            .await
4618            .unwrap();
4619
4620        assert_eq!(resp.records_updated, 5);
4621    }
4622
4623    #[tokio::test]
4624    async fn forget_requires_confirmation() {
4625        let engine = make_engine().await;
4626
4627        let result = engine
4628            .lifecycle_forget(ForgetRequest {
4629                header: None,
4630                record_ids: None,
4631                store: None,
4632                temporal_range: None,
4633                cascade: false,
4634                confirm: false,
4635            })
4636            .await;
4637
4638        assert!(matches!(result, Err(CerememoryError::ForgetUnconfirmed)));
4639    }
4640
4641    #[tokio::test]
4642    async fn forget_deletes_records() {
4643        let engine = make_engine().await;
4644
4645        let resp = engine
4646            .encode_store(text_store_req("To forget", Some(StoreType::Episodic)))
4647            .await
4648            .unwrap();
4649
4650        let deleted = engine
4651            .lifecycle_forget(ForgetRequest {
4652                header: None,
4653                record_ids: Some(vec![resp.record_id]),
4654                store: None,
4655                temporal_range: None,
4656                cascade: false,
4657                confirm: true,
4658            })
4659            .await
4660            .unwrap();
4661
4662        assert_eq!(deleted, 1);
4663        let record = engine.get_store_record(&resp.record_id).await.unwrap();
4664        assert!(record.is_none());
4665
4666        // Also verify removed from text index
4667        let hits = engine.text_index.search("forget", None, 10).unwrap();
4668        assert!(hits.is_empty());
4669    }
4670
4671    #[tokio::test]
4672    async fn forget_temporal_range_deletes_matching_records() {
4673        let engine = make_engine().await;
4674
4675        let mut old_record = MemoryRecord::new_text(StoreType::Episodic, "old");
4676        old_record.created_at = Utc::now() - chrono::Duration::days(2);
4677        old_record.updated_at = old_record.created_at;
4678        old_record.last_accessed_at = old_record.created_at;
4679        engine.episodic.store(old_record.clone()).await.unwrap();
4680        engine
4681            .coordinator
4682            .register(
4683                old_record.id,
4684                StoreType::Episodic,
4685                old_record.associations.clone(),
4686            )
4687            .await;
4688        engine.index_record(&old_record).unwrap();
4689
4690        let current = MemoryRecord::new_text(StoreType::Episodic, "current");
4691        engine.episodic.store(current.clone()).await.unwrap();
4692        engine
4693            .coordinator
4694            .register(
4695                current.id,
4696                StoreType::Episodic,
4697                current.associations.clone(),
4698            )
4699            .await;
4700        engine.index_record(&current).unwrap();
4701
4702        let deleted = engine
4703            .lifecycle_forget(ForgetRequest {
4704                header: None,
4705                record_ids: None,
4706                store: None,
4707                temporal_range: Some(TemporalRange {
4708                    start: Utc::now() - chrono::Duration::minutes(1),
4709                    end: Utc::now() + chrono::Duration::minutes(1),
4710                }),
4711                cascade: false,
4712                confirm: true,
4713            })
4714            .await
4715            .unwrap();
4716
4717        assert_eq!(deleted, 1);
4718        assert!(engine
4719            .get_store_record(&current.id)
4720            .await
4721            .unwrap()
4722            .is_none());
4723        assert!(engine
4724            .get_store_record(&old_record.id)
4725            .await
4726            .unwrap()
4727            .is_some());
4728    }
4729
4730    #[tokio::test]
4731    async fn mode_switch() {
4732        let engine = make_engine().await;
4733
4734        engine
4735            .lifecycle_set_mode(SetModeRequest {
4736                header: None,
4737                mode: RecallMode::Perfect,
4738                scope: None,
4739            })
4740            .await
4741            .unwrap();
4742
4743        assert_eq!(*engine.recall_mode.read().await, RecallMode::Perfect);
4744    }
4745
4746    #[tokio::test]
4747    async fn introspect_stats() {
4748        let engine = make_engine().await;
4749
4750        engine
4751            .encode_store(text_store_req("Stats test", Some(StoreType::Episodic)))
4752            .await
4753            .unwrap();
4754        engine
4755            .encode_store_raw(raw_text_store_req(
4756                "sess-stats",
4757                "Raw stats note",
4758                RawVisibility::Normal,
4759                SecrecyLevel::Public,
4760            ))
4761            .await
4762            .unwrap();
4763
4764        let stats = engine.introspect_stats().await.unwrap();
4765        assert_eq!(stats.total_records, 1);
4766        assert_eq!(stats.records_by_store[&StoreType::Episodic], 1);
4767        assert!((stats.avg_fidelity - 1.0).abs() < f64::EPSILON);
4768        assert!(!stats.background_decay_enabled);
4769        assert!(!stats.background_dream_enabled);
4770        assert_eq!(stats.raw_journal_records, 1);
4771        assert_eq!(stats.raw_journal_pending_dream, 1);
4772    }
4773
4774    #[tokio::test]
4775    async fn auto_store_routing() {
4776        let engine = make_engine().await;
4777
4778        let resp1 = engine
4779            .encode_store(text_store_req("An event", None))
4780            .await
4781            .unwrap();
4782        assert_eq!(resp1.store, StoreType::Episodic);
4783
4784        let req = EncodeStoreRequest {
4785            header: None,
4786            content: MemoryContent {
4787                blocks: vec![ContentBlock {
4788                    modality: Modality::Text,
4789                    format: "text/plain".to_string(),
4790                    data: b"Rust is a systems language".to_vec(),
4791                    embedding: None,
4792                }],
4793                summary: Some("Rust programming facts".to_string()),
4794            },
4795            store: None,
4796            emotion: None,
4797            context: None,
4798            metadata: None,
4799            associations: None,
4800        };
4801        let resp2 = engine.encode_store(req).await.unwrap();
4802        assert_eq!(resp2.store, StoreType::Semantic);
4803    }
4804
4805    #[tokio::test]
4806    async fn human_mode_degrades_content() {
4807        let content = MemoryContent {
4808            blocks: vec![ContentBlock {
4809                modality: Modality::Text,
4810                format: "text/plain".to_string(),
4811                data: b"The quick brown fox jumps over the lazy dog".to_vec(),
4812                embedding: None,
4813            }],
4814            summary: None,
4815        };
4816
4817        let result = apply_human_noise(&content, 0.95);
4818        assert_eq!(result.blocks[0].data, content.blocks[0].data);
4819
4820        let result = apply_human_noise(&content, 0.3);
4821        let text = std::str::from_utf8(&result.blocks[0].data).unwrap();
4822        assert!(text.contains("..."));
4823    }
4824
4825    #[tokio::test]
4826    async fn encode_update() {
4827        let engine = make_engine().await;
4828
4829        let resp = engine
4830            .encode_store(text_store_req("Original", Some(StoreType::Episodic)))
4831            .await
4832            .unwrap();
4833
4834        engine
4835            .encode_update(EncodeUpdateRequest {
4836                header: None,
4837                record_id: resp.record_id,
4838                content: Some(MemoryContent {
4839                    blocks: vec![ContentBlock {
4840                        modality: Modality::Text,
4841                        format: "text/plain".to_string(),
4842                        data: b"Updated content".to_vec(),
4843                        embedding: None,
4844                    }],
4845                    summary: None,
4846                }),
4847                emotion: None,
4848                metadata: None,
4849            })
4850            .await
4851            .unwrap();
4852
4853        let record = engine
4854            .introspect_record(RecordIntrospectRequest {
4855                header: None,
4856                record_id: resp.record_id,
4857                include_history: false,
4858                include_associations: false,
4859                include_versions: false,
4860            })
4861            .await
4862            .unwrap();
4863
4864        assert_eq!(record.text_content(), Some("Updated content"));
4865
4866        // Verify text index was updated
4867        let hits = engine.text_index.search("Updated", None, 10).unwrap();
4868        assert_eq!(hits.len(), 1);
4869        let hits = engine.text_index.search("Original", None, 10).unwrap();
4870        assert!(hits.is_empty());
4871    }
4872
4873    #[tokio::test]
4874    async fn encode_update_refreshes_structured_index() {
4875        let engine = make_engine().await;
4876
4877        let resp = engine
4878            .encode_store(EncodeStoreRequest {
4879                header: None,
4880                content: MemoryContent {
4881                    blocks: vec![ContentBlock {
4882                        modality: Modality::Structured,
4883                        format: "application/json".to_string(),
4884                        data: br#"{"user":{"name":"Alice"}}"#.to_vec(),
4885                        embedding: None,
4886                    }],
4887                    summary: None,
4888                },
4889                store: Some(StoreType::Episodic),
4890                emotion: None,
4891                context: None,
4892                metadata: None,
4893                associations: None,
4894            })
4895            .await
4896            .unwrap();
4897
4898        assert_eq!(
4899            engine.text_index.search("Alice", None, 10).unwrap().len(),
4900            1
4901        );
4902
4903        engine
4904            .encode_update(EncodeUpdateRequest {
4905                header: None,
4906                record_id: resp.record_id,
4907                content: Some(MemoryContent {
4908                    blocks: vec![ContentBlock {
4909                        modality: Modality::Structured,
4910                        format: "application/json".to_string(),
4911                        data: br#"{"user":{"name":"Bob"}}"#.to_vec(),
4912                        embedding: None,
4913                    }],
4914                    summary: None,
4915                }),
4916                emotion: None,
4917                metadata: None,
4918            })
4919            .await
4920            .unwrap();
4921
4922        assert!(engine
4923            .text_index
4924            .search("Alice", None, 10)
4925            .unwrap()
4926            .is_empty());
4927        assert_eq!(engine.text_index.search("Bob", None, 10).unwrap().len(), 1);
4928    }
4929
4930    #[tokio::test]
4931    async fn rebuild_coordinator_reindexes_structured_records() {
4932        let engine = make_engine().await;
4933
4934        let resp = engine
4935            .encode_store(EncodeStoreRequest {
4936                header: None,
4937                content: MemoryContent {
4938                    blocks: vec![ContentBlock {
4939                        modality: Modality::Structured,
4940                        format: "application/json".to_string(),
4941                        data: br#"{"project":{"name":"Cerememory"}}"#.to_vec(),
4942                        embedding: None,
4943                    }],
4944                    summary: None,
4945                },
4946                store: Some(StoreType::Episodic),
4947                emotion: None,
4948                context: None,
4949                metadata: None,
4950                associations: None,
4951            })
4952            .await
4953            .unwrap();
4954
4955        engine.text_index.remove(resp.record_id).unwrap();
4956        assert!(engine
4957            .text_index
4958            .search("Cerememory", None, 10)
4959            .unwrap()
4960            .is_empty());
4961
4962        engine.rebuild_coordinator().await.unwrap();
4963
4964        assert_eq!(
4965            engine
4966                .text_index
4967                .search("Cerememory", None, 10)
4968                .unwrap()
4969                .len(),
4970            1
4971        );
4972    }
4973
4974    #[tokio::test]
4975    async fn background_decay_runs() {
4976        let config = EngineConfig {
4977            background_decay_interval_secs: Some(1), // 1 second for test
4978            ..EngineConfig::default()
4979        };
4980        let engine = Arc::new(CerememoryEngine::new(config).unwrap());
4981
4982        // Store a record
4983        engine
4984            .encode_store(text_store_req("Decay me", Some(StoreType::Episodic)))
4985            .await
4986            .unwrap();
4987
4988        engine.start_background_decay();
4989        assert!(engine.is_background_decay_enabled().await);
4990
4991        // Wait for at least one tick
4992        tokio::time::sleep(std::time::Duration::from_millis(2500)).await;
4993
4994        engine.stop_background_decay().await;
4995        assert!(!engine.is_background_decay_enabled().await);
4996    }
4997
4998    #[tokio::test]
4999    async fn background_decay_disabled_by_default() {
5000        let engine = Arc::new(make_engine().await);
5001        engine.start_background_decay(); // should be a no-op
5002        assert!(!engine.is_background_decay_enabled().await);
5003    }
5004
5005    #[tokio::test]
5006    async fn background_dream_runs() {
5007        let config = EngineConfig {
5008            background_dream_interval_secs: Some(1),
5009            ..EngineConfig::default()
5010        };
5011        let engine = Arc::new(CerememoryEngine::new(config).unwrap());
5012        engine
5013            .encode_store_raw(raw_text_store_req(
5014                "sess-bg-dream",
5015                "Background dream me",
5016                RawVisibility::Normal,
5017                SecrecyLevel::Public,
5018            ))
5019            .await
5020            .unwrap();
5021
5022        engine.start_background_dream();
5023        assert!(engine.is_background_dream_enabled().await);
5024
5025        tokio::time::sleep(std::time::Duration::from_millis(2500)).await;
5026
5027        engine.stop_background_dream().await;
5028        assert!(!engine.is_background_dream_enabled().await);
5029        assert_eq!(engine.episodic.count().await.unwrap(), 1);
5030    }
5031
5032    #[tokio::test]
5033    async fn background_dream_disabled_by_default() {
5034        let engine = Arc::new(make_engine().await);
5035        engine.start_background_dream();
5036        assert!(!engine.is_background_dream_enabled().await);
5037    }
5038
5039    #[tokio::test]
5040    async fn multimodal_store_and_retrieve() {
5041        let engine = make_engine().await;
5042
5043        // Store an image record with embedding
5044        let req = EncodeStoreRequest {
5045            header: None,
5046            content: MemoryContent {
5047                blocks: vec![ContentBlock {
5048                    modality: Modality::Image,
5049                    format: "image/png".to_string(),
5050                    data: vec![0x89, 0x50, 0x4E, 0x47], // PNG header bytes
5051                    embedding: Some(vec![0.5, 0.3, 0.8]),
5052                }],
5053                summary: Some("A photo of a sunset".to_string()),
5054            },
5055            store: Some(StoreType::Episodic),
5056            emotion: None,
5057            context: None,
5058            metadata: None,
5059            associations: None,
5060        };
5061
5062        let resp = engine.encode_store(req).await.unwrap();
5063
5064        // Retrieve by ID
5065        let record = engine
5066            .introspect_record(RecordIntrospectRequest {
5067                header: None,
5068                record_id: resp.record_id,
5069                include_history: false,
5070                include_associations: false,
5071                include_versions: false,
5072            })
5073            .await
5074            .unwrap();
5075
5076        assert_eq!(record.content.blocks[0].modality, Modality::Image);
5077        assert_eq!(record.content.blocks[0].data, vec![0x89, 0x50, 0x4E, 0x47]);
5078
5079        // Search by embedding should find it
5080        let query = RecallQueryRequest {
5081            header: None,
5082            cue: RecallCue {
5083                embedding: Some(vec![0.5, 0.3, 0.8]),
5084                ..Default::default()
5085            },
5086            stores: None,
5087            limit: 10,
5088            min_fidelity: None,
5089            include_decayed: false,
5090            reconsolidate: false,
5091            activation_depth: 0,
5092            recall_mode: RecallMode::Perfect,
5093        };
5094
5095        let recall_resp = engine.recall_query(query).await.unwrap();
5096        assert!(!recall_resp.memories.is_empty());
5097    }
5098
5099    #[tokio::test]
5100    async fn summary_only_records_are_text_searchable() {
5101        let engine = make_engine().await;
5102
5103        let resp = engine
5104            .encode_store(EncodeStoreRequest {
5105                header: None,
5106                content: MemoryContent {
5107                    blocks: vec![ContentBlock {
5108                        modality: Modality::Image,
5109                        format: "image/png".to_string(),
5110                        data: vec![0x89, 0x50, 0x4E, 0x47],
5111                        embedding: None,
5112                    }],
5113                    summary: Some("sunset skyline".to_string()),
5114                },
5115                store: Some(StoreType::Semantic),
5116                emotion: None,
5117                context: None,
5118                metadata: None,
5119                associations: None,
5120            })
5121            .await
5122            .unwrap();
5123
5124        let recalled = engine
5125            .recall_query(RecallQueryRequest {
5126                header: None,
5127                cue: RecallCue {
5128                    text: Some("skyline".to_string()),
5129                    ..Default::default()
5130                },
5131                stores: Some(vec![StoreType::Semantic]),
5132                limit: 10,
5133                min_fidelity: None,
5134                include_decayed: false,
5135                reconsolidate: false,
5136                activation_depth: 0,
5137                recall_mode: RecallMode::Perfect,
5138            })
5139            .await
5140            .unwrap();
5141
5142        assert_eq!(recalled.memories.len(), 1);
5143        assert_eq!(recalled.memories[0].record.id, resp.record_id);
5144    }
5145
5146    #[tokio::test]
5147    async fn multiple_text_blocks_are_all_indexed() {
5148        let engine = make_engine().await;
5149
5150        engine
5151            .encode_store(EncodeStoreRequest {
5152                header: None,
5153                content: MemoryContent {
5154                    blocks: vec![
5155                        ContentBlock {
5156                            modality: Modality::Text,
5157                            format: "text/plain".to_string(),
5158                            data: b"primary block".to_vec(),
5159                            embedding: None,
5160                        },
5161                        ContentBlock {
5162                            modality: Modality::Text,
5163                            format: "text/plain".to_string(),
5164                            data: b"secondary block".to_vec(),
5165                            embedding: None,
5166                        },
5167                    ],
5168                    summary: None,
5169                },
5170                store: Some(StoreType::Episodic),
5171                emotion: None,
5172                context: None,
5173                metadata: None,
5174                associations: None,
5175            })
5176            .await
5177            .unwrap();
5178
5179        let recalled = engine
5180            .recall_query(RecallQueryRequest {
5181                header: None,
5182                cue: RecallCue {
5183                    text: Some("secondary".to_string()),
5184                    ..Default::default()
5185                },
5186                stores: Some(vec![StoreType::Episodic]),
5187                limit: 10,
5188                min_fidelity: None,
5189                include_decayed: false,
5190                reconsolidate: false,
5191                activation_depth: 0,
5192                recall_mode: RecallMode::Perfect,
5193            })
5194            .await
5195            .unwrap();
5196
5197        assert_eq!(recalled.memories.len(), 1);
5198    }
5199
5200    #[tokio::test]
5201    async fn structured_recall_survives_rebuild() {
5202        let engine = make_engine().await;
5203        engine
5204            .encode_store(structured_store_req(
5205                r#"{"profile":{"city":"Tokyo","skills":["rust","python"]}}"#,
5206                Some(StoreType::Semantic),
5207            ))
5208            .await
5209            .unwrap();
5210
5211        let query = RecallQueryRequest {
5212            header: None,
5213            cue: RecallCue {
5214                text: Some("Tokyo".to_string()),
5215                ..Default::default()
5216            },
5217            stores: Some(vec![StoreType::Semantic]),
5218            limit: 10,
5219            min_fidelity: None,
5220            include_decayed: false,
5221            reconsolidate: false,
5222            activation_depth: 0,
5223            recall_mode: RecallMode::Perfect,
5224        };
5225
5226        let before = engine.recall_query(query.clone()).await.unwrap();
5227        assert_eq!(before.memories.len(), 1);
5228
5229        engine.rebuild_coordinator().await.unwrap();
5230
5231        let after = engine.recall_query(query).await.unwrap();
5232        assert_eq!(after.memories.len(), 1);
5233    }
5234
5235    #[tokio::test]
5236    async fn structured_update_rebuilds_text_index() {
5237        let engine = make_engine().await;
5238        let resp = engine
5239            .encode_store(structured_store_req(
5240                r#"{"profile":{"city":"Tokyo"}}"#,
5241                Some(StoreType::Semantic),
5242            ))
5243            .await
5244            .unwrap();
5245
5246        let tokyo_hits = engine
5247            .recall_query(RecallQueryRequest {
5248                header: None,
5249                cue: RecallCue {
5250                    text: Some("Tokyo".to_string()),
5251                    ..Default::default()
5252                },
5253                stores: Some(vec![StoreType::Semantic]),
5254                limit: 10,
5255                min_fidelity: None,
5256                include_decayed: false,
5257                reconsolidate: false,
5258                activation_depth: 0,
5259                recall_mode: RecallMode::Perfect,
5260            })
5261            .await
5262            .unwrap();
5263        assert_eq!(tokyo_hits.memories.len(), 1);
5264
5265        engine
5266            .encode_update(EncodeUpdateRequest {
5267                header: None,
5268                record_id: resp.record_id,
5269                content: Some(MemoryContent {
5270                    blocks: vec![ContentBlock {
5271                        modality: Modality::Structured,
5272                        format: "application/json".to_string(),
5273                        data: br#"{"profile":{"city":"Osaka"}}"#.to_vec(),
5274                        embedding: None,
5275                    }],
5276                    summary: None,
5277                }),
5278                emotion: None,
5279                metadata: None,
5280            })
5281            .await
5282            .unwrap();
5283
5284        let tokyo_hits = engine
5285            .recall_query(RecallQueryRequest {
5286                header: None,
5287                cue: RecallCue {
5288                    text: Some("Tokyo".to_string()),
5289                    ..Default::default()
5290                },
5291                stores: Some(vec![StoreType::Semantic]),
5292                limit: 10,
5293                min_fidelity: None,
5294                include_decayed: false,
5295                reconsolidate: false,
5296                activation_depth: 0,
5297                recall_mode: RecallMode::Perfect,
5298            })
5299            .await
5300            .unwrap();
5301        assert!(tokyo_hits.memories.is_empty());
5302
5303        let osaka_hits = engine
5304            .recall_query(RecallQueryRequest {
5305                header: None,
5306                cue: RecallCue {
5307                    text: Some("Osaka".to_string()),
5308                    ..Default::default()
5309                },
5310                stores: Some(vec![StoreType::Semantic]),
5311                limit: 10,
5312                min_fidelity: None,
5313                include_decayed: false,
5314                reconsolidate: false,
5315                activation_depth: 0,
5316                recall_mode: RecallMode::Perfect,
5317            })
5318            .await
5319            .unwrap();
5320        assert_eq!(osaka_hits.memories.len(), 1);
5321    }
5322
5323    #[tokio::test]
5324    async fn multimodal_image_recall_uses_provider_embedding() {
5325        let provider = Arc::new(MockLLMProvider::new(4));
5326        let engine = CerememoryEngine::new(EngineConfig {
5327            llm_provider: Some(provider),
5328            ..Default::default()
5329        })
5330        .unwrap();
5331
5332        let image = vec![0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A];
5333        engine
5334            .encode_store(EncodeStoreRequest {
5335                header: None,
5336                content: MemoryContent {
5337                    blocks: vec![ContentBlock {
5338                        modality: Modality::Image,
5339                        format: "image/png".to_string(),
5340                        data: image.clone(),
5341                        embedding: None,
5342                    }],
5343                    summary: Some("indexed image".to_string()),
5344                },
5345                store: Some(StoreType::Episodic),
5346                emotion: None,
5347                context: None,
5348                metadata: None,
5349                associations: None,
5350            })
5351            .await
5352            .unwrap();
5353
5354        let resp = engine
5355            .recall_query(RecallQueryRequest {
5356                header: None,
5357                cue: RecallCue {
5358                    image: Some(image),
5359                    ..Default::default()
5360                },
5361                stores: None,
5362                limit: 10,
5363                min_fidelity: None,
5364                include_decayed: false,
5365                reconsolidate: false,
5366                activation_depth: 0,
5367                recall_mode: RecallMode::Perfect,
5368            })
5369            .await
5370            .unwrap();
5371
5372        assert_eq!(resp.memories.len(), 1);
5373        assert_eq!(
5374            resp.memories[0].record.content.blocks[0].modality,
5375            Modality::Image
5376        );
5377    }
5378
5379    #[tokio::test]
5380    async fn multimodal_audio_recall_uses_provider_transcript() {
5381        let provider = Arc::new(MockLLMProvider::new(4));
5382        let engine = CerememoryEngine::new(EngineConfig {
5383            llm_provider: Some(provider),
5384            ..Default::default()
5385        })
5386        .unwrap();
5387
5388        let wav_bytes = b"RIFFabcdWAVE".to_vec();
5389        engine
5390            .encode_store(text_store_req("audio-12", Some(StoreType::Episodic)))
5391            .await
5392            .unwrap();
5393
5394        let resp = engine
5395            .recall_query(RecallQueryRequest {
5396                header: None,
5397                cue: RecallCue {
5398                    audio: Some(wav_bytes),
5399                    ..Default::default()
5400                },
5401                stores: None,
5402                limit: 10,
5403                min_fidelity: None,
5404                include_decayed: false,
5405                reconsolidate: false,
5406                activation_depth: 0,
5407                recall_mode: RecallMode::Perfect,
5408            })
5409            .await
5410            .unwrap();
5411
5412        assert!(!resp.memories.is_empty());
5413        assert_eq!(resp.memories[0].record.text_content(), Some("audio-12"));
5414    }
5415
5416    #[tokio::test]
5417    async fn encode_store_processes_all_multimodal_blocks() {
5418        let provider = Arc::new(MockLLMProvider::new(4));
5419        let engine = CerememoryEngine::new(EngineConfig {
5420            llm_provider: Some(provider),
5421            ..Default::default()
5422        })
5423        .unwrap();
5424
5425        let image_one = vec![0x89, b'P', b'N', b'G', 1, 2, 3, 4];
5426        let image_two = vec![0x89, b'P', b'N', b'G', 5, 6, 7, 8, 9];
5427        let audio_one = b"RIFFabcdWAVEone".to_vec();
5428        let audio_two = b"RIFFabcdWAVEtwoo".to_vec();
5429
5430        let resp = engine
5431            .encode_store(EncodeStoreRequest {
5432                header: None,
5433                content: MemoryContent {
5434                    blocks: vec![
5435                        ContentBlock {
5436                            modality: Modality::Image,
5437                            format: "image/png".to_string(),
5438                            data: image_one.clone(),
5439                            embedding: None,
5440                        },
5441                        ContentBlock {
5442                            modality: Modality::Audio,
5443                            format: "audio/wav".to_string(),
5444                            data: audio_one.clone(),
5445                            embedding: None,
5446                        },
5447                        ContentBlock {
5448                            modality: Modality::Image,
5449                            format: "image/png".to_string(),
5450                            data: image_two.clone(),
5451                            embedding: None,
5452                        },
5453                        ContentBlock {
5454                            modality: Modality::Audio,
5455                            format: "audio/wav".to_string(),
5456                            data: audio_two.clone(),
5457                            embedding: None,
5458                        },
5459                    ],
5460                    summary: Some("multimodal batch".to_string()),
5461                },
5462                store: Some(StoreType::Episodic),
5463                emotion: None,
5464                context: None,
5465                metadata: None,
5466                associations: None,
5467            })
5468            .await
5469            .unwrap();
5470
5471        let record = engine
5472            .introspect_record(RecordIntrospectRequest {
5473                header: None,
5474                record_id: resp.record_id,
5475                include_history: false,
5476                include_associations: false,
5477                include_versions: false,
5478            })
5479            .await
5480            .unwrap();
5481
5482        assert_eq!(record.content.blocks.len(), 6);
5483        assert_eq!(record.content.blocks[0].modality, Modality::Image);
5484        assert_eq!(record.content.blocks[2].modality, Modality::Image);
5485        assert_eq!(record.content.blocks[4].modality, Modality::Text);
5486        assert_eq!(record.content.blocks[5].modality, Modality::Text);
5487
5488        let image_one_embedding = record.content.blocks[0].embedding.as_ref().unwrap();
5489        let image_two_embedding = record.content.blocks[2].embedding.as_ref().unwrap();
5490        assert_eq!(image_one_embedding[0], image_one.len() as f32);
5491        assert_eq!(image_two_embedding[0], image_two.len() as f32);
5492
5493        assert_eq!(
5494            std::str::from_utf8(&record.content.blocks[4].data).unwrap(),
5495            format!("audio-{}", audio_one.len())
5496        );
5497        assert_eq!(
5498            std::str::from_utf8(&record.content.blocks[5].data).unwrap(),
5499            format!("audio-{}", audio_two.len())
5500        );
5501        assert!(record.content.blocks[4].embedding.is_some());
5502        assert!(record.content.blocks[5].embedding.is_some());
5503    }
5504
5505    #[tokio::test]
5506    async fn size_limit_validation() {
5507        let engine = make_engine().await;
5508
5509        // Text exceeding 1MB should fail
5510        let big_text = vec![b'A'; 1_048_577]; // 1MB + 1
5511        let req = EncodeStoreRequest {
5512            header: None,
5513            content: MemoryContent {
5514                blocks: vec![ContentBlock {
5515                    modality: Modality::Text,
5516                    format: "text/plain".to_string(),
5517                    data: big_text,
5518                    embedding: None,
5519                }],
5520                summary: None,
5521            },
5522            store: Some(StoreType::Episodic),
5523            emotion: None,
5524            context: None,
5525            metadata: None,
5526            associations: None,
5527        };
5528
5529        let result = engine.encode_store(req).await;
5530        assert!(matches!(
5531            result,
5532            Err(CerememoryError::ContentTooLarge { .. })
5533        ));
5534    }
5535
5536    // ─── Export/Import API tests ────────────────────────────────────
5537
5538    #[tokio::test]
5539    async fn lifecycle_export_returns_bytes_and_response() {
5540        let engine = make_engine().await;
5541
5542        engine
5543            .encode_store(text_store_req("Export me", Some(StoreType::Episodic)))
5544            .await
5545            .unwrap();
5546
5547        let (bytes, resp) = engine
5548            .lifecycle_export(ExportRequest {
5549                header: None,
5550                format: "cma".to_string(),
5551                stores: None,
5552                include_raw_journal: false,
5553                encrypt: false,
5554                encryption_key: None,
5555            })
5556            .await
5557            .unwrap();
5558
5559        assert_eq!(resp.record_count, 1);
5560        assert!(!bytes.is_empty());
5561    }
5562
5563    #[tokio::test]
5564    async fn lifecycle_export_accepts_jsonl_alias() {
5565        let engine = make_engine().await;
5566
5567        engine
5568            .encode_store(text_store_req("Export me too", Some(StoreType::Episodic)))
5569            .await
5570            .unwrap();
5571
5572        let (bytes, resp) = engine
5573            .lifecycle_export(ExportRequest {
5574                header: None,
5575                format: "jsonl".to_string(),
5576                stores: None,
5577                include_raw_journal: false,
5578                encrypt: false,
5579                encryption_key: None,
5580            })
5581            .await
5582            .unwrap();
5583
5584        assert_eq!(resp.record_count, 1);
5585        let records = cerememory_archive::import_records(&bytes).unwrap();
5586        assert_eq!(records.len(), 1);
5587    }
5588
5589    #[tokio::test]
5590    async fn lifecycle_export_rejects_unsupported_format() {
5591        let engine = make_engine().await;
5592
5593        let result = engine
5594            .lifecycle_export(ExportRequest {
5595                header: None,
5596                format: "zip".to_string(),
5597                stores: None,
5598                include_raw_journal: false,
5599                encrypt: false,
5600                encryption_key: None,
5601            })
5602            .await;
5603
5604        assert!(matches!(
5605            result,
5606            Err(CerememoryError::Validation(msg)) if msg.contains("Valid options: cma, jsonl")
5607        ));
5608    }
5609
5610    #[tokio::test]
5611    async fn lifecycle_export_store_filter() {
5612        let engine = make_engine().await;
5613
5614        engine
5615            .encode_store(text_store_req("Episodic", Some(StoreType::Episodic)))
5616            .await
5617            .unwrap();
5618        engine
5619            .encode_store(text_store_req("Semantic", Some(StoreType::Semantic)))
5620            .await
5621            .unwrap();
5622
5623        let (_, resp) = engine
5624            .lifecycle_export(ExportRequest {
5625                header: None,
5626                format: "cma".to_string(),
5627                stores: Some(vec![StoreType::Episodic]),
5628                include_raw_journal: false,
5629                encrypt: false,
5630                encryption_key: None,
5631            })
5632            .await
5633            .unwrap();
5634
5635        assert_eq!(resp.record_count, 1);
5636    }
5637
5638    #[tokio::test]
5639    async fn lifecycle_export_store_filter_deduplicates_requested_stores() {
5640        let engine = make_engine().await;
5641
5642        engine
5643            .encode_store(text_store_req("Episodic", Some(StoreType::Episodic)))
5644            .await
5645            .unwrap();
5646
5647        let (bytes, resp) = engine
5648            .lifecycle_export(ExportRequest {
5649                header: None,
5650                format: "cma".to_string(),
5651                stores: Some(vec![StoreType::Episodic, StoreType::Episodic]),
5652                include_raw_journal: false,
5653                encrypt: false,
5654                encryption_key: None,
5655            })
5656            .await
5657            .unwrap();
5658
5659        assert_eq!(resp.record_count, 1);
5660
5661        let records = cerememory_archive::import_records(&bytes).unwrap();
5662        assert_eq!(records.len(), 1);
5663        assert!(records.iter().all(|r| r.store == StoreType::Episodic));
5664    }
5665
5666    #[tokio::test]
5667    async fn lifecycle_export_encrypted_roundtrip() {
5668        let engine = make_engine().await;
5669
5670        engine
5671            .encode_store(text_store_req("Secret data", Some(StoreType::Episodic)))
5672            .await
5673            .unwrap();
5674
5675        let (encrypted_bytes, resp) = engine
5676            .lifecycle_export(ExportRequest {
5677                header: None,
5678                format: "cma".to_string(),
5679                stores: None,
5680                include_raw_journal: false,
5681                encrypt: true,
5682                encryption_key: Some("my-passphrase".to_string()),
5683            })
5684            .await
5685            .unwrap();
5686
5687        assert_eq!(resp.record_count, 1);
5688
5689        // Verify the encrypted data cannot be imported without the key
5690        let result = engine.import_records(&encrypted_bytes).await;
5691        assert!(result.is_err());
5692    }
5693
5694    #[tokio::test]
5695    async fn lifecycle_export_import_with_raw_journal_roundtrip() {
5696        let engine = make_engine().await;
5697        engine
5698            .encode_store(text_store_req("Curated memory", Some(StoreType::Episodic)))
5699            .await
5700            .unwrap();
5701        engine
5702            .encode_store_raw(raw_text_store_req(
5703                "sess-export-raw",
5704                "Raw transcript note",
5705                RawVisibility::Normal,
5706                SecrecyLevel::Public,
5707            ))
5708            .await
5709            .unwrap();
5710
5711        let (bytes, resp) = engine
5712            .lifecycle_export(ExportRequest {
5713                header: None,
5714                format: "cma".to_string(),
5715                stores: None,
5716                include_raw_journal: true,
5717                encrypt: false,
5718                encryption_key: None,
5719            })
5720            .await
5721            .unwrap();
5722        assert_eq!(resp.record_count, 2);
5723
5724        let target = make_engine().await;
5725        let imported = target
5726            .lifecycle_import(ImportRequest {
5727                header: None,
5728                archive_id: "bundle-roundtrip".to_string(),
5729                strategy: ImportStrategy::Merge,
5730                conflict_resolution: ConflictResolution::KeepExisting,
5731                decryption_key: None,
5732                archive_data: Some(bytes),
5733            })
5734            .await
5735            .unwrap();
5736
5737        assert_eq!(imported, 2);
5738        assert_eq!(target.episodic.count().await.unwrap(), 1);
5739        assert_eq!(target.raw_journal_count().await.unwrap(), 1);
5740    }
5741
5742    #[tokio::test]
5743    async fn lifecycle_import_with_archive_data() {
5744        let engine = make_engine().await;
5745
5746        // Store a record and export it
5747        engine
5748            .encode_store(text_store_req("Import me", Some(StoreType::Episodic)))
5749            .await
5750            .unwrap();
5751
5752        let (bytes, _) = engine
5753            .lifecycle_export(ExportRequest {
5754                header: None,
5755                format: "cma".to_string(),
5756                stores: None,
5757                include_raw_journal: false,
5758                encrypt: false,
5759                encryption_key: None,
5760            })
5761            .await
5762            .unwrap();
5763
5764        // Create a fresh engine and import
5765        let engine2 = make_engine().await;
5766        let imported = engine2
5767            .lifecycle_import(ImportRequest {
5768                header: None,
5769                archive_id: "test".to_string(),
5770                strategy: ImportStrategy::Merge,
5771                conflict_resolution: ConflictResolution::KeepExisting,
5772                decryption_key: None,
5773                archive_data: Some(bytes),
5774            })
5775            .await
5776            .unwrap();
5777
5778        assert_eq!(imported, 1);
5779        let stats = engine2.introspect_stats().await.unwrap();
5780        assert_eq!(stats.total_records, 1);
5781    }
5782
5783    #[tokio::test]
5784    async fn import_conflict_keep_existing() {
5785        let engine = make_engine().await;
5786
5787        // Store a record
5788        let resp = engine
5789            .encode_store(text_store_req("Original", Some(StoreType::Episodic)))
5790            .await
5791            .unwrap();
5792
5793        // Export it
5794        let (bytes, _) = engine
5795            .lifecycle_export(ExportRequest {
5796                header: None,
5797                format: "cma".to_string(),
5798                stores: None,
5799                include_raw_journal: false,
5800                encrypt: false,
5801                encryption_key: None,
5802            })
5803            .await
5804            .unwrap();
5805
5806        // Import with keep_existing — should skip the duplicate
5807        let imported = engine
5808            .lifecycle_import(ImportRequest {
5809                header: None,
5810                archive_id: "test".to_string(),
5811                strategy: ImportStrategy::Merge,
5812                conflict_resolution: ConflictResolution::KeepExisting,
5813                decryption_key: None,
5814                archive_data: Some(bytes),
5815            })
5816            .await
5817            .unwrap();
5818
5819        assert_eq!(imported, 0);
5820        let record = engine
5821            .introspect_record(RecordIntrospectRequest {
5822                header: None,
5823                record_id: resp.record_id,
5824                include_history: false,
5825                include_associations: false,
5826                include_versions: false,
5827            })
5828            .await
5829            .unwrap();
5830        assert_eq!(record.text_content(), Some("Original"));
5831    }
5832
5833    #[tokio::test]
5834    async fn import_conflict_keep_imported() {
5835        let engine = make_engine().await;
5836
5837        // Store a record
5838        let resp = engine
5839            .encode_store(text_store_req("Original text", Some(StoreType::Episodic)))
5840            .await
5841            .unwrap();
5842
5843        // Export
5844        let (bytes, _) = engine
5845            .lifecycle_export(ExportRequest {
5846                header: None,
5847                format: "cma".to_string(),
5848                stores: None,
5849                include_raw_journal: false,
5850                encrypt: false,
5851                encryption_key: None,
5852            })
5853            .await
5854            .unwrap();
5855
5856        // Import with keep_imported — should replace the existing record
5857        let imported = engine
5858            .lifecycle_import(ImportRequest {
5859                header: None,
5860                archive_id: "test".to_string(),
5861                strategy: ImportStrategy::Merge,
5862                conflict_resolution: ConflictResolution::KeepImported,
5863                decryption_key: None,
5864                archive_data: Some(bytes),
5865            })
5866            .await
5867            .unwrap();
5868
5869        assert_eq!(imported, 1);
5870        // Record should still exist (replaced with same data)
5871        let record = engine.get_store_record(&resp.record_id).await.unwrap();
5872        assert!(record.is_some());
5873    }
5874
5875    #[tokio::test]
5876    async fn import_conflict_keep_newer() {
5877        let engine = make_engine().await;
5878
5879        // Store a record
5880        let resp = engine
5881            .encode_store(text_store_req("First version", Some(StoreType::Episodic)))
5882            .await
5883            .unwrap();
5884
5885        // Export the archive (captures the record with its current timestamp)
5886        let (bytes, _) = engine
5887            .lifecycle_export(ExportRequest {
5888                header: None,
5889                format: "cma".to_string(),
5890                stores: None,
5891                include_raw_journal: false,
5892                encrypt: false,
5893                encryption_key: None,
5894            })
5895            .await
5896            .unwrap();
5897
5898        // Update the record so the in-store version is newer
5899        engine
5900            .encode_update(EncodeUpdateRequest {
5901                header: None,
5902                record_id: resp.record_id,
5903                content: Some(MemoryContent {
5904                    blocks: vec![ContentBlock {
5905                        modality: Modality::Text,
5906                        format: "text/plain".to_string(),
5907                        data: b"Updated version".to_vec(),
5908                        embedding: None,
5909                    }],
5910                    summary: None,
5911                }),
5912                emotion: None,
5913                metadata: None,
5914            })
5915            .await
5916            .unwrap();
5917
5918        // Import with keep_newer — the archive version is older, so it should be skipped
5919        let imported = engine
5920            .lifecycle_import(ImportRequest {
5921                header: None,
5922                archive_id: "test".to_string(),
5923                strategy: ImportStrategy::Merge,
5924                conflict_resolution: ConflictResolution::KeepNewer,
5925                decryption_key: None,
5926                archive_data: Some(bytes),
5927            })
5928            .await
5929            .unwrap();
5930
5931        assert_eq!(imported, 0);
5932        let record = engine
5933            .introspect_record(RecordIntrospectRequest {
5934                header: None,
5935                record_id: resp.record_id,
5936                include_history: false,
5937                include_associations: false,
5938                include_versions: false,
5939            })
5940            .await
5941            .unwrap();
5942        assert_eq!(record.text_content(), Some("Updated version"));
5943    }
5944
5945    #[tokio::test]
5946    async fn import_encrypted_archive() {
5947        let engine = make_engine().await;
5948
5949        engine
5950            .encode_store(text_store_req(
5951                "Encrypted import",
5952                Some(StoreType::Episodic),
5953            ))
5954            .await
5955            .unwrap();
5956
5957        let (encrypted, _) = engine
5958            .lifecycle_export(ExportRequest {
5959                header: None,
5960                format: "cma".to_string(),
5961                stores: None,
5962                include_raw_journal: false,
5963                encrypt: true,
5964                encryption_key: Some("pass123".to_string()),
5965            })
5966            .await
5967            .unwrap();
5968
5969        // Import into a fresh engine with the correct key
5970        let engine2 = make_engine().await;
5971        let imported = engine2
5972            .lifecycle_import(ImportRequest {
5973                header: None,
5974                archive_id: "test".to_string(),
5975                strategy: ImportStrategy::Merge,
5976                conflict_resolution: ConflictResolution::KeepExisting,
5977                decryption_key: Some("pass123".to_string()),
5978                archive_data: Some(encrypted),
5979            })
5980            .await
5981            .unwrap();
5982
5983        assert_eq!(imported, 1);
5984    }
5985
5986    #[tokio::test]
5987    async fn import_missing_archive_data() {
5988        let engine = make_engine().await;
5989
5990        let result = engine
5991            .lifecycle_import(ImportRequest {
5992                header: None,
5993                archive_id: "test".to_string(),
5994                strategy: ImportStrategy::Merge,
5995                conflict_resolution: ConflictResolution::KeepExisting,
5996                decryption_key: None,
5997                archive_data: None,
5998            })
5999            .await;
6000
6001        assert!(result.is_err());
6002        let err = format!("{:?}", result.unwrap_err());
6003        assert!(err.contains("archive_data"));
6004    }
6005
6006    #[tokio::test]
6007    async fn lifecycle_export_encrypt_without_key_fails() {
6008        let engine = make_engine().await;
6009
6010        engine
6011            .encode_store(text_store_req("Some data", Some(StoreType::Episodic)))
6012            .await
6013            .unwrap();
6014
6015        let result = engine
6016            .lifecycle_export(ExportRequest {
6017                header: None,
6018                format: "cma".to_string(),
6019                stores: None,
6020                include_raw_journal: false,
6021                encrypt: true,
6022                encryption_key: None,
6023            })
6024            .await;
6025
6026        assert!(result.is_err());
6027        let err = format!("{:?}", result.unwrap_err());
6028        assert!(err.contains("encryption_key is required"));
6029    }
6030
6031    #[tokio::test]
6032    async fn import_conflict_cross_store_keep_imported_count_stays_one() {
6033        let engine = make_engine().await;
6034
6035        // Store a record in Episodic
6036        let resp = engine
6037            .encode_store(text_store_req(
6038                "Original in episodic",
6039                Some(StoreType::Episodic),
6040            ))
6041            .await
6042            .unwrap();
6043        let record_id = resp.record_id;
6044
6045        // Export the archive (record is tagged as Episodic)
6046        let (bytes, _) = engine
6047            .lifecycle_export(ExportRequest {
6048                header: None,
6049                format: "cma".to_string(),
6050                stores: None,
6051                include_raw_journal: false,
6052                encrypt: false,
6053                encryption_key: None,
6054            })
6055            .await
6056            .unwrap();
6057
6058        // Import with KeepImported — the same record exists in Episodic,
6059        // and the archive also has it as Episodic. After conflict resolution
6060        // with KeepImported, the old record must be deleted first and then
6061        // the imported one stored. The total count must remain 1, not 2.
6062        let imported = engine
6063            .lifecycle_import(ImportRequest {
6064                header: None,
6065                archive_id: "test".to_string(),
6066                strategy: ImportStrategy::Merge,
6067                conflict_resolution: ConflictResolution::KeepImported,
6068                decryption_key: None,
6069                archive_data: Some(bytes),
6070            })
6071            .await
6072            .unwrap();
6073
6074        assert_eq!(imported, 1);
6075
6076        // Total count must be exactly 1 — proves delete-before-store worked
6077        let stats = engine.introspect_stats().await.unwrap();
6078        assert_eq!(stats.total_records, 1);
6079
6080        // The record should still be retrievable by its original ID
6081        let record = engine.get_store_record(&record_id).await.unwrap();
6082        assert!(record.is_some());
6083    }
6084
6085    #[tokio::test]
6086    async fn import_strategy_replace_replaces_existing_dataset() {
6087        let source = make_engine().await;
6088
6089        let imported_episode = source
6090            .encode_store(text_store_req(
6091                "Imported episodic",
6092                Some(StoreType::Episodic),
6093            ))
6094            .await;
6095        let imported_episode_id = imported_episode.unwrap().record_id;
6096        let imported_semantic = source
6097            .encode_store(text_store_req(
6098                "Imported semantic",
6099                Some(StoreType::Semantic),
6100            ))
6101            .await
6102            .unwrap()
6103            .record_id;
6104
6105        let (archive_data, _) = source
6106            .lifecycle_export(ExportRequest {
6107                header: None,
6108                format: "cma".to_string(),
6109                stores: None,
6110                include_raw_journal: false,
6111                encrypt: false,
6112                encryption_key: None,
6113            })
6114            .await
6115            .unwrap();
6116
6117        let target = make_engine().await;
6118        let old_id = target
6119            .encode_store(text_store_req("Old episodic", Some(StoreType::Episodic)))
6120            .await
6121            .unwrap()
6122            .record_id;
6123        target
6124            .encode_store(text_store_req("Old working", Some(StoreType::Working)))
6125            .await
6126            .unwrap();
6127
6128        let imported = target
6129            .lifecycle_import(ImportRequest {
6130                header: None,
6131                archive_id: "replace-test".to_string(),
6132                strategy: ImportStrategy::Replace,
6133                conflict_resolution: ConflictResolution::KeepNewer,
6134                decryption_key: None,
6135                archive_data: Some(archive_data),
6136            })
6137            .await
6138            .unwrap();
6139
6140        assert_eq!(imported, 2);
6141
6142        let stats = target.introspect_stats().await.unwrap();
6143        assert_eq!(stats.total_records, 2);
6144        assert_eq!(stats.records_by_store[&StoreType::Episodic], 1);
6145        assert_eq!(stats.records_by_store[&StoreType::Semantic], 1);
6146        assert_eq!(
6147            *stats
6148                .records_by_store
6149                .get(&StoreType::Working)
6150                .unwrap_or(&0),
6151            0
6152        );
6153
6154        assert!(target.get_store_record(&old_id).await.unwrap().is_none());
6155        assert!(target
6156            .get_store_record(&imported_episode_id)
6157            .await
6158            .unwrap()
6159            .is_some());
6160        assert!(target
6161            .get_store_record(&imported_semantic)
6162            .await
6163            .unwrap()
6164            .is_some());
6165    }
6166
6167    #[tokio::test]
6168    async fn import_strategy_replace_preserves_existing_data_on_invalid_archive() {
6169        let engine = make_engine().await;
6170        let old_id = engine
6171            .encode_store(text_store_req("Keep me", Some(StoreType::Episodic)))
6172            .await
6173            .unwrap()
6174            .record_id;
6175
6176        let result = engine
6177            .lifecycle_import(ImportRequest {
6178                header: None,
6179                archive_id: "invalid-replace".to_string(),
6180                strategy: ImportStrategy::Replace,
6181                conflict_resolution: ConflictResolution::KeepNewer,
6182                decryption_key: None,
6183                archive_data: Some(b"not-a-valid-cma-archive".to_vec()),
6184            })
6185            .await;
6186
6187        assert!(result.is_err());
6188        let stats = engine.introspect_stats().await.unwrap();
6189        assert_eq!(stats.total_records, 1);
6190        assert!(engine.get_store_record(&old_id).await.unwrap().is_some());
6191    }
6192
6193    // ─── recall.timeline tests ───────────────────────────────────────
6194
6195    #[tokio::test]
6196    async fn timeline_hour_granularity() {
6197        let engine = make_engine().await;
6198        engine
6199            .encode_store(text_store_req("Morning event", Some(StoreType::Episodic)))
6200            .await
6201            .unwrap();
6202
6203        let now = Utc::now();
6204        let resp = engine
6205            .recall_timeline(RecallTimelineRequest {
6206                header: None,
6207                range: TemporalRange {
6208                    start: now - chrono::Duration::hours(1),
6209                    end: now + chrono::Duration::hours(1),
6210                },
6211                granularity: TimeGranularity::Hour,
6212                min_fidelity: None,
6213                emotion_filter: None,
6214            })
6215            .await
6216            .unwrap();
6217
6218        assert!(!resp.buckets.is_empty());
6219        let total: u32 = resp.buckets.iter().map(|b| b.count).sum();
6220        assert!(total >= 1);
6221    }
6222
6223    #[tokio::test]
6224    async fn timeline_day_granularity() {
6225        let engine = make_engine().await;
6226        for i in 0..3 {
6227            engine
6228                .encode_store(text_store_req(
6229                    &format!("Day event {i}"),
6230                    Some(StoreType::Episodic),
6231                ))
6232                .await
6233                .unwrap();
6234        }
6235
6236        let now = Utc::now();
6237        let resp = engine
6238            .recall_timeline(RecallTimelineRequest {
6239                header: None,
6240                range: TemporalRange {
6241                    start: now - chrono::Duration::days(1),
6242                    end: now + chrono::Duration::days(1),
6243                },
6244                granularity: TimeGranularity::Day,
6245                min_fidelity: None,
6246                emotion_filter: None,
6247            })
6248            .await
6249            .unwrap();
6250
6251        let total: u32 = resp.buckets.iter().map(|b| b.count).sum();
6252        assert_eq!(total, 3);
6253    }
6254
6255    #[tokio::test]
6256    async fn timeline_empty_range() {
6257        let engine = make_engine().await;
6258        engine
6259            .encode_store(text_store_req("An event", Some(StoreType::Episodic)))
6260            .await
6261            .unwrap();
6262
6263        // Query a range far in the past
6264        let resp = engine
6265            .recall_timeline(RecallTimelineRequest {
6266                header: None,
6267                range: TemporalRange {
6268                    start: Utc::now() - chrono::Duration::days(365 * 10),
6269                    end: Utc::now() - chrono::Duration::days(365 * 9),
6270                },
6271                granularity: TimeGranularity::Day,
6272                min_fidelity: None,
6273                emotion_filter: None,
6274            })
6275            .await
6276            .unwrap();
6277
6278        assert!(resp.buckets.is_empty());
6279    }
6280
6281    #[tokio::test]
6282    async fn timeline_min_fidelity_filter() {
6283        let engine = make_engine().await;
6284        engine
6285            .encode_store(text_store_req(
6286                "High fidelity event",
6287                Some(StoreType::Episodic),
6288            ))
6289            .await
6290            .unwrap();
6291
6292        let now = Utc::now();
6293        let resp = engine
6294            .recall_timeline(RecallTimelineRequest {
6295                header: None,
6296                range: TemporalRange {
6297                    start: now - chrono::Duration::hours(1),
6298                    end: now + chrono::Duration::hours(1),
6299                },
6300                granularity: TimeGranularity::Hour,
6301                min_fidelity: Some(0.5),
6302                emotion_filter: None,
6303            })
6304            .await
6305            .unwrap();
6306
6307        // New records have fidelity 1.0, should pass filter
6308        let total: u32 = resp.buckets.iter().map(|b| b.count).sum();
6309        assert!(total >= 1);
6310    }
6311
6312    #[tokio::test]
6313    async fn timeline_emotion_filter() {
6314        let engine = make_engine().await;
6315
6316        // Store a record with high joy
6317        let req = EncodeStoreRequest {
6318            header: None,
6319            content: MemoryContent {
6320                blocks: vec![ContentBlock {
6321                    modality: Modality::Text,
6322                    format: "text/plain".to_string(),
6323                    data: b"Happy event".to_vec(),
6324                    embedding: None,
6325                }],
6326                summary: None,
6327            },
6328            store: Some(StoreType::Episodic),
6329            emotion: Some(EmotionVector {
6330                joy: 0.9,
6331                ..Default::default()
6332            }),
6333            context: None,
6334            metadata: None,
6335            associations: None,
6336        };
6337        engine.encode_store(req).await.unwrap();
6338
6339        // Store a record with high sadness
6340        let req2 = EncodeStoreRequest {
6341            header: None,
6342            content: MemoryContent {
6343                blocks: vec![ContentBlock {
6344                    modality: Modality::Text,
6345                    format: "text/plain".to_string(),
6346                    data: b"Sad event".to_vec(),
6347                    embedding: None,
6348                }],
6349                summary: None,
6350            },
6351            store: Some(StoreType::Episodic),
6352            emotion: Some(EmotionVector {
6353                sadness: 0.9,
6354                ..Default::default()
6355            }),
6356            context: None,
6357            metadata: None,
6358            associations: None,
6359        };
6360        engine.encode_store(req2).await.unwrap();
6361
6362        let now = Utc::now();
6363
6364        // Filter for joy — should only include the happy event
6365        let resp = engine
6366            .recall_timeline(RecallTimelineRequest {
6367                header: None,
6368                range: TemporalRange {
6369                    start: now - chrono::Duration::hours(1),
6370                    end: now + chrono::Duration::hours(1),
6371                },
6372                granularity: TimeGranularity::Hour,
6373                min_fidelity: None,
6374                emotion_filter: Some(EmotionVector {
6375                    joy: 1.0,
6376                    ..Default::default()
6377                }),
6378            })
6379            .await
6380            .unwrap();
6381
6382        let total: u32 = resp.buckets.iter().map(|b| b.count).sum();
6383        assert_eq!(total, 1, "Only the joyful event should match");
6384    }
6385
6386    #[tokio::test]
6387    async fn timeline_multi_store() {
6388        let engine = make_engine().await;
6389        engine
6390            .encode_store(text_store_req("Episodic event", Some(StoreType::Episodic)))
6391            .await
6392            .unwrap();
6393        engine
6394            .encode_store(text_store_req(
6395                "Procedural event",
6396                Some(StoreType::Procedural),
6397            ))
6398            .await
6399            .unwrap();
6400
6401        let now = Utc::now();
6402        let resp = engine
6403            .recall_timeline(RecallTimelineRequest {
6404                header: None,
6405                range: TemporalRange {
6406                    start: now - chrono::Duration::hours(1),
6407                    end: now + chrono::Duration::hours(1),
6408                },
6409                granularity: TimeGranularity::Hour,
6410                min_fidelity: None,
6411                emotion_filter: None,
6412            })
6413            .await
6414            .unwrap();
6415
6416        let total: u32 = resp.buckets.iter().map(|b| b.count).sum();
6417        assert!(total >= 2);
6418    }
6419
6420    // ─── recall.graph tests ──────────────────────────────────────────
6421
6422    #[tokio::test]
6423    async fn graph_centered_traversal() {
6424        let engine = make_engine().await;
6425
6426        // Create two linked records
6427        let r1 = engine
6428            .encode_store(text_store_req("Node A", Some(StoreType::Episodic)))
6429            .await
6430            .unwrap();
6431        let r2 = engine
6432            .encode_store(text_store_req("Node B", Some(StoreType::Episodic)))
6433            .await
6434            .unwrap();
6435
6436        // Add association
6437        let assoc = Association {
6438            target_id: r2.record_id,
6439            association_type: AssociationType::Semantic,
6440            weight: 0.9,
6441            created_at: Utc::now(),
6442            last_co_activation: Utc::now(),
6443        };
6444        engine
6445            .coordinator
6446            .add_association(&r1.record_id, assoc)
6447            .await
6448            .unwrap();
6449
6450        let resp = engine
6451            .recall_graph(RecallGraphRequest {
6452                header: None,
6453                center_id: Some(r1.record_id),
6454                depth: 2,
6455                edge_types: None,
6456                limit_nodes: 10,
6457            })
6458            .await
6459            .unwrap();
6460
6461        assert!(resp.nodes.len() >= 2);
6462        assert!(!resp.edges.is_empty());
6463    }
6464
6465    #[tokio::test]
6466    async fn graph_empty() {
6467        let engine = make_engine().await;
6468        let resp = engine
6469            .recall_graph(RecallGraphRequest {
6470                header: None,
6471                center_id: None,
6472                depth: 1,
6473                edge_types: None,
6474                limit_nodes: 10,
6475            })
6476            .await
6477            .unwrap();
6478
6479        assert!(resp.nodes.is_empty());
6480    }
6481
6482    #[tokio::test]
6483    async fn graph_depth_limiting() {
6484        let engine = make_engine().await;
6485        let r1 = engine
6486            .encode_store(text_store_req("Node 1", Some(StoreType::Episodic)))
6487            .await
6488            .unwrap();
6489
6490        let resp = engine
6491            .recall_graph(RecallGraphRequest {
6492                header: None,
6493                center_id: Some(r1.record_id),
6494                depth: 0,
6495                edge_types: None,
6496                limit_nodes: 10,
6497            })
6498            .await
6499            .unwrap();
6500
6501        // Depth 0 = only the center node, no traversal
6502        assert_eq!(resp.nodes.len(), 1);
6503        assert!(resp.edges.is_empty());
6504    }
6505
6506    // ─── introspect.decay_forecast tests ─────────────────────────────
6507
6508    #[tokio::test]
6509    async fn decay_forecast_future() {
6510        let engine = make_engine().await;
6511        let resp = engine
6512            .encode_store(text_store_req("Forecast me", Some(StoreType::Episodic)))
6513            .await
6514            .unwrap();
6515
6516        let forecast = engine
6517            .introspect_decay_forecast(DecayForecastRequest {
6518                header: None,
6519                record_ids: vec![resp.record_id],
6520                forecast_at: Utc::now() + chrono::Duration::days(30),
6521            })
6522            .await
6523            .unwrap();
6524
6525        assert_eq!(forecast.forecasts.len(), 1);
6526        assert!(forecast.forecasts[0].forecasted_fidelity < forecast.forecasts[0].current_fidelity);
6527        assert!(forecast.forecasts[0].forecasted_fidelity > 0.0);
6528    }
6529
6530    #[tokio::test]
6531    async fn decay_forecast_threshold_date() {
6532        let engine = make_engine().await;
6533        let resp = engine
6534            .encode_store(text_store_req("Will decay", Some(StoreType::Episodic)))
6535            .await
6536            .unwrap();
6537
6538        let forecast = engine
6539            .introspect_decay_forecast(DecayForecastRequest {
6540                header: None,
6541                record_ids: vec![resp.record_id],
6542                forecast_at: Utc::now() + chrono::Duration::days(365),
6543            })
6544            .await
6545            .unwrap();
6546
6547        // Should have an estimated threshold date
6548        assert!(forecast.forecasts[0].estimated_threshold_date.is_some());
6549        assert!(forecast.forecasts[0].estimated_threshold_date.unwrap() > Utc::now());
6550    }
6551
6552    #[tokio::test]
6553    async fn decay_forecast_multiple_records() {
6554        let engine = make_engine().await;
6555        let r1 = engine
6556            .encode_store(text_store_req("Record 1", Some(StoreType::Episodic)))
6557            .await
6558            .unwrap();
6559        let r2 = engine
6560            .encode_store(text_store_req("Record 2", Some(StoreType::Semantic)))
6561            .await
6562            .unwrap();
6563
6564        let forecast = engine
6565            .introspect_decay_forecast(DecayForecastRequest {
6566                header: None,
6567                record_ids: vec![r1.record_id, r2.record_id],
6568                forecast_at: Utc::now() + chrono::Duration::days(7),
6569            })
6570            .await
6571            .unwrap();
6572
6573        assert_eq!(forecast.forecasts.len(), 2);
6574    }
6575
6576    #[tokio::test]
6577    async fn decay_forecast_uses_per_record_decay_rate() {
6578        let engine = make_engine().await;
6579        let resp = engine
6580            .encode_store(text_store_req("Test record", Some(StoreType::Episodic)))
6581            .await
6582            .unwrap();
6583
6584        // Run a decay tick so last_decay_tick advances
6585        engine
6586            .lifecycle_decay_tick(DecayTickRequest {
6587                header: None,
6588                tick_duration_seconds: Some(3600),
6589            })
6590            .await
6591            .unwrap();
6592
6593        // Forecast should use the record's own decay_rate, not global params
6594        let forecast = engine
6595            .introspect_decay_forecast(DecayForecastRequest {
6596                header: None,
6597                record_ids: vec![resp.record_id],
6598                forecast_at: Utc::now() + chrono::Duration::days(30),
6599            })
6600            .await
6601            .unwrap();
6602
6603        assert_eq!(forecast.forecasts.len(), 1);
6604        // After a decay tick, last_decay_tick is updated, so the forecast baseline
6605        // should use max(last_accessed_at, last_decay_tick) — not just last_accessed_at
6606        assert!(forecast.forecasts[0].forecasted_fidelity > 0.0);
6607        assert!(forecast.forecasts[0].forecasted_fidelity < 1.0);
6608    }
6609
6610    // ─── introspect.evolution tests ──────────────────────────────────
6611
6612    #[tokio::test]
6613    async fn evolution_returns_metrics() {
6614        let engine = make_engine().await;
6615        let metrics = engine.introspect_evolution().await.unwrap();
6616        // Initially no adjustments
6617        assert!(metrics.parameter_adjustments.is_empty());
6618    }
6619
6620    #[tokio::test]
6621    async fn evolution_after_decay() {
6622        let engine = make_engine().await;
6623        for i in 0..5 {
6624            engine
6625                .encode_store(text_store_req(
6626                    &format!("Record {i}"),
6627                    Some(StoreType::Episodic),
6628                ))
6629                .await
6630                .unwrap();
6631        }
6632
6633        // Run a decay tick to feed evolution engine
6634        engine
6635            .lifecycle_decay_tick(DecayTickRequest {
6636                header: None,
6637                tick_duration_seconds: Some(86400),
6638            })
6639            .await
6640            .unwrap();
6641
6642        let metrics = engine.introspect_evolution().await.unwrap();
6643        // After decay observation, evolution engine may have patterns
6644        assert!(metrics.detected_patterns.is_empty() || !metrics.detected_patterns.is_empty());
6645    }
6646
6647    // ─── LLM provider tests ─────────────────────────────────────────
6648
6649    /// Mock LLM provider for testing auto-embed.
6650    struct MockLLMProvider {
6651        embed_dim: usize,
6652    }
6653
6654    impl MockLLMProvider {
6655        fn new(embed_dim: usize) -> Self {
6656            Self { embed_dim }
6657        }
6658    }
6659
6660    impl LLMProvider for MockLLMProvider {
6661        fn embed(
6662            &self,
6663            text: &str,
6664        ) -> std::pin::Pin<
6665            Box<dyn std::future::Future<Output = Result<Vec<f32>, CerememoryError>> + Send + '_>,
6666        > {
6667            let dim = self.embed_dim;
6668            let hash = text.len() as f32;
6669            Box::pin(async move {
6670                let mut v = vec![0.0f32; dim];
6671                v[0] = hash;
6672                v[1] = 1.0;
6673                Ok(v)
6674            })
6675        }
6676
6677        fn summarize(
6678            &self,
6679            texts: &[String],
6680            max_tokens: usize,
6681        ) -> std::pin::Pin<
6682            Box<dyn std::future::Future<Output = Result<String, CerememoryError>> + Send + '_>,
6683        > {
6684            let joined = texts.join("; ");
6685            let truncated = if joined.len() > max_tokens {
6686                format!("{}...", truncate_str(&joined, max_tokens))
6687            } else {
6688                joined
6689            };
6690            Box::pin(async move { Ok(truncated) })
6691        }
6692
6693        fn extract_relations(
6694            &self,
6695            text: &str,
6696        ) -> std::pin::Pin<
6697            Box<
6698                dyn std::future::Future<Output = Result<Vec<ExtractedRelation>, CerememoryError>>
6699                    + Send
6700                    + '_,
6701            >,
6702        > {
6703            let has_content = !text.is_empty();
6704            Box::pin(async move {
6705                if has_content {
6706                    Ok(vec![ExtractedRelation {
6707                        subject: "test".to_string(),
6708                        predicate: "is_a".to_string(),
6709                        object: "mock".to_string(),
6710                        confidence: 0.9,
6711                    }])
6712                } else {
6713                    Ok(Vec::new())
6714                }
6715            })
6716        }
6717
6718        fn embed_image(
6719            &self,
6720            data: &[u8],
6721            _format: &str,
6722        ) -> std::pin::Pin<
6723            Box<dyn std::future::Future<Output = Result<Vec<f32>, CerememoryError>> + Send + '_>,
6724        > {
6725            let dim = self.embed_dim;
6726            let hash = data.len() as f32;
6727            Box::pin(async move {
6728                let mut v = vec![0.0f32; dim];
6729                v[0] = hash;
6730                v[1] = 2.0;
6731                Ok(v)
6732            })
6733        }
6734
6735        fn transcribe_audio(
6736            &self,
6737            data: &[u8],
6738            _format: &str,
6739        ) -> std::pin::Pin<
6740            Box<dyn std::future::Future<Output = Result<String, CerememoryError>> + Send + '_>,
6741        > {
6742            let transcript = format!("audio-{}", data.len());
6743            Box::pin(async move { Ok(transcript) })
6744        }
6745
6746        fn capabilities(&self) -> ProviderCapabilities {
6747            ProviderCapabilities {
6748                text_embedding: true,
6749                image_embedding: true,
6750                audio_transcription: true,
6751            }
6752        }
6753    }
6754
6755    #[tokio::test]
6756    async fn engine_auto_embed_with_provider() {
6757        let provider = Arc::new(MockLLMProvider::new(4));
6758        let engine = CerememoryEngine::new(EngineConfig {
6759            llm_provider: Some(provider),
6760            ..Default::default()
6761        })
6762        .unwrap();
6763
6764        // Store without embedding — should auto-generate
6765        let resp = engine
6766            .encode_store(text_store_req("Auto embed me", Some(StoreType::Episodic)))
6767            .await
6768            .unwrap();
6769
6770        // Verify the embedding was generated and stored
6771        let record = engine
6772            .introspect_record(RecordIntrospectRequest {
6773                header: None,
6774                record_id: resp.record_id,
6775                include_history: false,
6776                include_associations: false,
6777                include_versions: false,
6778            })
6779            .await
6780            .unwrap();
6781
6782        assert!(record.content.blocks[0].embedding.is_some());
6783        let emb = record.content.blocks[0].embedding.as_ref().unwrap();
6784        assert_eq!(emb.len(), 4);
6785    }
6786
6787    #[tokio::test]
6788    async fn engine_auto_embeds_all_image_blocks() {
6789        let provider = Arc::new(MockLLMProvider::new(4));
6790        let engine = CerememoryEngine::new(EngineConfig {
6791            llm_provider: Some(provider),
6792            ..Default::default()
6793        })
6794        .unwrap();
6795
6796        let resp = engine
6797            .encode_store(EncodeStoreRequest {
6798                header: None,
6799                content: MemoryContent {
6800                    blocks: vec![
6801                        ContentBlock {
6802                            modality: Modality::Image,
6803                            format: "image/png".to_string(),
6804                            data: vec![1; 8],
6805                            embedding: None,
6806                        },
6807                        ContentBlock {
6808                            modality: Modality::Image,
6809                            format: "image/png".to_string(),
6810                            data: vec![2; 13],
6811                            embedding: None,
6812                        },
6813                    ],
6814                    summary: None,
6815                },
6816                store: Some(StoreType::Episodic),
6817                emotion: None,
6818                context: None,
6819                metadata: None,
6820                associations: None,
6821            })
6822            .await
6823            .unwrap();
6824
6825        let record = engine
6826            .introspect_record(RecordIntrospectRequest {
6827                header: None,
6828                record_id: resp.record_id,
6829                include_history: false,
6830                include_associations: false,
6831                include_versions: false,
6832            })
6833            .await
6834            .unwrap();
6835
6836        assert_eq!(record.content.blocks.len(), 2);
6837        assert_eq!(record.content.blocks[0].modality, Modality::Image);
6838        assert_eq!(record.content.blocks[1].modality, Modality::Image);
6839        assert_eq!(record.content.blocks[0].embedding.as_ref().unwrap()[0], 8.0);
6840        assert_eq!(
6841            record.content.blocks[1].embedding.as_ref().unwrap()[0],
6842            13.0
6843        );
6844        assert_eq!(record.content.blocks[0].embedding.as_ref().unwrap()[1], 2.0);
6845        assert_eq!(record.content.blocks[1].embedding.as_ref().unwrap()[1], 2.0);
6846    }
6847
6848    #[tokio::test]
6849    async fn engine_transcribes_all_audio_blocks_in_order() {
6850        let provider = Arc::new(MockLLMProvider::new(4));
6851        let engine = CerememoryEngine::new(EngineConfig {
6852            llm_provider: Some(provider),
6853            ..Default::default()
6854        })
6855        .unwrap();
6856
6857        let resp = engine
6858            .encode_store(EncodeStoreRequest {
6859                header: None,
6860                content: MemoryContent {
6861                    blocks: vec![
6862                        ContentBlock {
6863                            modality: Modality::Audio,
6864                            format: "audio/wav".to_string(),
6865                            data: vec![0; 12],
6866                            embedding: None,
6867                        },
6868                        ContentBlock {
6869                            modality: Modality::Audio,
6870                            format: "audio/wav".to_string(),
6871                            data: vec![1; 123],
6872                            embedding: None,
6873                        },
6874                    ],
6875                    summary: None,
6876                },
6877                store: Some(StoreType::Episodic),
6878                emotion: None,
6879                context: None,
6880                metadata: None,
6881                associations: None,
6882            })
6883            .await
6884            .unwrap();
6885
6886        let record = engine
6887            .introspect_record(RecordIntrospectRequest {
6888                header: None,
6889                record_id: resp.record_id,
6890                include_history: false,
6891                include_associations: false,
6892                include_versions: false,
6893            })
6894            .await
6895            .unwrap();
6896
6897        assert_eq!(record.content.blocks.len(), 4);
6898        assert_eq!(record.content.blocks[0].modality, Modality::Audio);
6899        assert_eq!(record.content.blocks[1].modality, Modality::Audio);
6900        assert_eq!(record.content.blocks[2].modality, Modality::Text);
6901        assert_eq!(record.content.blocks[3].modality, Modality::Text);
6902        assert_eq!(
6903            std::str::from_utf8(&record.content.blocks[2].data).unwrap(),
6904            "audio-12"
6905        );
6906        assert_eq!(
6907            std::str::from_utf8(&record.content.blocks[3].data).unwrap(),
6908            "audio-123"
6909        );
6910        assert_eq!(
6911            record.content.blocks[2].embedding.as_ref().unwrap()[0],
6912            "audio-12".len() as f32
6913        );
6914        assert_eq!(
6915            record.content.blocks[3].embedding.as_ref().unwrap()[0],
6916            "audio-123".len() as f32
6917        );
6918    }
6919
6920    #[tokio::test]
6921    async fn engine_no_provider_passthrough() {
6922        // Without provider, embedding should remain None
6923        let engine = make_engine().await;
6924        let resp = engine
6925            .encode_store(text_store_req("No provider", Some(StoreType::Episodic)))
6926            .await
6927            .unwrap();
6928
6929        let record = engine
6930            .introspect_record(RecordIntrospectRequest {
6931                header: None,
6932                record_id: resp.record_id,
6933                include_history: false,
6934                include_associations: false,
6935                include_versions: false,
6936            })
6937            .await
6938            .unwrap();
6939
6940        assert!(record.content.blocks[0].embedding.is_none());
6941    }
6942
6943    #[tokio::test]
6944    async fn engine_existing_embedding_not_overwritten() {
6945        let provider = Arc::new(MockLLMProvider::new(4));
6946        let engine = CerememoryEngine::new(EngineConfig {
6947            llm_provider: Some(provider),
6948            ..Default::default()
6949        })
6950        .unwrap();
6951
6952        // Store WITH an existing embedding
6953        let req = EncodeStoreRequest {
6954            header: None,
6955            content: MemoryContent {
6956                blocks: vec![ContentBlock {
6957                    modality: Modality::Text,
6958                    format: "text/plain".to_string(),
6959                    data: b"Has embedding".to_vec(),
6960                    embedding: Some(vec![9.0, 9.0, 9.0]),
6961                }],
6962                summary: None,
6963            },
6964            store: Some(StoreType::Episodic),
6965            emotion: None,
6966            context: None,
6967            metadata: None,
6968            associations: None,
6969        };
6970
6971        let resp = engine.encode_store(req).await.unwrap();
6972
6973        let record = engine
6974            .introspect_record(RecordIntrospectRequest {
6975                header: None,
6976                record_id: resp.record_id,
6977                include_history: false,
6978                include_associations: false,
6979                include_versions: false,
6980            })
6981            .await
6982            .unwrap();
6983
6984        // Should keep the original embedding, not overwrite with mock
6985        let emb = record.content.blocks[0].embedding.as_ref().unwrap();
6986        assert_eq!(emb, &vec![9.0, 9.0, 9.0]);
6987    }
6988
6989    #[tokio::test]
6990    async fn noop_provider_embed_returns_empty() {
6991        let provider = NoOpProvider;
6992        let result = provider.embed("test").await;
6993        assert!(result.unwrap().is_empty());
6994    }
6995
6996    #[tokio::test]
6997    async fn noop_provider_summarize_concatenates() {
6998        let provider = NoOpProvider;
6999        let texts = vec!["hello".to_string(), "world".to_string()];
7000        let result = provider.summarize(&texts, 100).await.unwrap();
7001        assert_eq!(result, "hello world");
7002    }
7003
7004    #[tokio::test]
7005    async fn noop_provider_extract_relations_empty() {
7006        let provider = NoOpProvider;
7007        let result = provider.extract_relations("test").await.unwrap();
7008        assert!(result.is_empty());
7009    }
7010
7011    #[tokio::test]
7012    async fn noop_provider_capabilities_are_disabled() {
7013        let provider = NoOpProvider;
7014        let caps = provider.capabilities();
7015        assert!(!caps.text_embedding);
7016        assert!(!caps.image_embedding);
7017        assert!(!caps.audio_transcription);
7018    }
7019
7020    #[tokio::test]
7021    async fn mock_provider_embed_roundtrip() {
7022        let provider = MockLLMProvider::new(8);
7023        let result = provider.embed("hello").await.unwrap();
7024        assert_eq!(result.len(), 8);
7025        assert!(result[0] > 0.0); // hash of "hello" length
7026    }
7027
7028    #[tokio::test]
7029    async fn mock_provider_summarize() {
7030        let provider = MockLLMProvider::new(4);
7031        let texts = vec!["one".to_string(), "two".to_string()];
7032        let result = provider.summarize(&texts, 100).await.unwrap();
7033        assert!(result.contains("one"));
7034        assert!(result.contains("two"));
7035    }
7036
7037    #[tokio::test]
7038    async fn mock_provider_extract_relations() {
7039        let provider = MockLLMProvider::new(4);
7040        let result = provider.extract_relations("some text").await.unwrap();
7041        assert_eq!(result.len(), 1);
7042        assert_eq!(result[0].predicate, "is_a");
7043    }
7044
7045    #[tokio::test]
7046    async fn auto_embed_enables_vector_search() {
7047        let provider = Arc::new(MockLLMProvider::new(4));
7048        let engine = CerememoryEngine::new(EngineConfig {
7049            llm_provider: Some(provider),
7050            ..Default::default()
7051        })
7052        .unwrap();
7053
7054        // Store text — auto-embed generates a vector
7055        engine
7056            .encode_store(text_store_req("Searchable text", Some(StoreType::Episodic)))
7057            .await
7058            .unwrap();
7059
7060        // Should be findable via vector search now
7061        let query = RecallQueryRequest {
7062            header: None,
7063            cue: RecallCue {
7064                embedding: Some(vec!["Searchable text".len() as f32, 1.0, 0.0, 0.0]),
7065                ..Default::default()
7066            },
7067            stores: None,
7068            limit: 10,
7069            min_fidelity: None,
7070            include_decayed: false,
7071            reconsolidate: false,
7072            activation_depth: 0,
7073            recall_mode: RecallMode::Perfect,
7074        };
7075
7076        let resp = engine.recall_query(query).await.unwrap();
7077        assert!(!resp.memories.is_empty());
7078    }
7079
7080    #[tokio::test]
7081    async fn image_recall_cue_uses_provider_embedding() {
7082        let provider = Arc::new(MockLLMProvider::new(4));
7083        let engine = CerememoryEngine::new(EngineConfig {
7084            llm_provider: Some(provider),
7085            ..Default::default()
7086        })
7087        .unwrap();
7088
7089        let image_bytes = vec![0x89, b'P', b'N', b'G', 0x0D, 0x0A, 0x1A, 0x0A, 1, 2, 3, 4];
7090        let resp = engine
7091            .encode_store(EncodeStoreRequest {
7092                header: None,
7093                content: MemoryContent {
7094                    blocks: vec![ContentBlock {
7095                        modality: Modality::Image,
7096                        format: "image/png".to_string(),
7097                        data: image_bytes.clone(),
7098                        embedding: None,
7099                    }],
7100                    summary: None,
7101                },
7102                store: Some(StoreType::Episodic),
7103                emotion: None,
7104                context: None,
7105                metadata: None,
7106                associations: None,
7107            })
7108            .await
7109            .unwrap();
7110
7111        let recalled = engine
7112            .recall_query(RecallQueryRequest {
7113                header: None,
7114                cue: RecallCue {
7115                    image: Some(image_bytes),
7116                    ..Default::default()
7117                },
7118                stores: None,
7119                limit: 10,
7120                min_fidelity: None,
7121                include_decayed: false,
7122                reconsolidate: false,
7123                activation_depth: 0,
7124                recall_mode: RecallMode::Perfect,
7125            })
7126            .await
7127            .unwrap();
7128
7129        assert_eq!(recalled.memories.len(), 1);
7130        assert_eq!(recalled.memories[0].record.id, resp.record_id);
7131    }
7132
7133    #[tokio::test]
7134    async fn audio_recall_cue_uses_transcription() {
7135        let provider = Arc::new(MockLLMProvider::new(4));
7136        let engine = CerememoryEngine::new(EngineConfig {
7137            llm_provider: Some(provider),
7138            ..Default::default()
7139        })
7140        .unwrap();
7141
7142        let audio_bytes = b"RIFFabcdWAVErest".to_vec();
7143        let resp = engine
7144            .encode_store(EncodeStoreRequest {
7145                header: None,
7146                content: MemoryContent {
7147                    blocks: vec![ContentBlock {
7148                        modality: Modality::Audio,
7149                        format: "audio/wav".to_string(),
7150                        data: audio_bytes.clone(),
7151                        embedding: None,
7152                    }],
7153                    summary: None,
7154                },
7155                store: Some(StoreType::Episodic),
7156                emotion: None,
7157                context: None,
7158                metadata: None,
7159                associations: None,
7160            })
7161            .await
7162            .unwrap();
7163
7164        let recalled = engine
7165            .recall_query(RecallQueryRequest {
7166                header: None,
7167                cue: RecallCue {
7168                    audio: Some(audio_bytes),
7169                    ..Default::default()
7170                },
7171                stores: None,
7172                limit: 10,
7173                min_fidelity: None,
7174                include_decayed: false,
7175                reconsolidate: false,
7176                activation_depth: 0,
7177                recall_mode: RecallMode::Perfect,
7178            })
7179            .await
7180            .unwrap();
7181
7182        assert_eq!(recalled.memories.len(), 1);
7183        assert_eq!(recalled.memories[0].record.id, resp.record_id);
7184    }
7185
7186    // ─── Smart Consolidation tests ───────────────────────────────────
7187
7188    #[tokio::test]
7189    async fn consolidation_basic_migration() {
7190        let engine = make_engine().await;
7191        for i in 0..3 {
7192            engine
7193                .encode_store(text_store_req(
7194                    &format!("Record {i}"),
7195                    Some(StoreType::Episodic),
7196                ))
7197                .await
7198                .unwrap();
7199        }
7200
7201        let resp = engine
7202            .lifecycle_consolidate(ConsolidateRequest {
7203                header: None,
7204                strategy: ConsolidationStrategy::Incremental,
7205                min_age_hours: 0,
7206                min_access_count: 0,
7207                dry_run: false,
7208            })
7209            .await
7210            .unwrap();
7211
7212        assert_eq!(resp.records_processed, 3);
7213        assert_eq!(resp.records_migrated, 3);
7214        assert_eq!(resp.semantic_nodes_created, 3);
7215    }
7216
7217    #[tokio::test]
7218    async fn consolidation_dry_run() {
7219        let engine = make_engine().await;
7220        engine
7221            .encode_store(text_store_req("Dry run test", Some(StoreType::Episodic)))
7222            .await
7223            .unwrap();
7224
7225        let resp = engine
7226            .lifecycle_consolidate(ConsolidateRequest {
7227                header: None,
7228                strategy: ConsolidationStrategy::Incremental,
7229                min_age_hours: 0,
7230                min_access_count: 0,
7231                dry_run: true,
7232            })
7233            .await
7234            .unwrap();
7235
7236        assert_eq!(resp.records_migrated, 1);
7237        // Semantic store should still be empty
7238        assert_eq!(engine.semantic.count().await.unwrap(), 0);
7239    }
7240
7241    #[tokio::test]
7242    async fn consolidation_with_llm_summarization() {
7243        let provider = Arc::new(MockLLMProvider::new(4));
7244        let engine = CerememoryEngine::new(EngineConfig {
7245            llm_provider: Some(provider),
7246            ..Default::default()
7247        })
7248        .unwrap();
7249
7250        engine
7251            .encode_store(text_store_req(
7252                "A very long piece of text that needs summarization for consolidation",
7253                Some(StoreType::Episodic),
7254            ))
7255            .await
7256            .unwrap();
7257
7258        let resp = engine
7259            .lifecycle_consolidate(ConsolidateRequest {
7260                header: None,
7261                strategy: ConsolidationStrategy::Incremental,
7262                min_age_hours: 0,
7263                min_access_count: 0,
7264                dry_run: false,
7265            })
7266            .await
7267            .unwrap();
7268
7269        assert_eq!(resp.records_migrated, 1);
7270
7271        // Check that the semantic record has a summary
7272        let sem_ids = engine.semantic.list_ids().await.unwrap();
7273        assert_eq!(sem_ids.len(), 1);
7274        let sem_record = engine.semantic.get(&sem_ids[0]).await.unwrap().unwrap();
7275        assert!(sem_record.content.summary.is_some());
7276    }
7277
7278    #[tokio::test]
7279    async fn consolidation_with_relation_extraction() {
7280        let provider = Arc::new(MockLLMProvider::new(4));
7281        let engine = CerememoryEngine::new(EngineConfig {
7282            llm_provider: Some(provider),
7283            ..Default::default()
7284        })
7285        .unwrap();
7286
7287        engine
7288            .encode_store(text_store_req(
7289                "Cats are mammals",
7290                Some(StoreType::Episodic),
7291            ))
7292            .await
7293            .unwrap();
7294
7295        engine
7296            .lifecycle_consolidate(ConsolidateRequest {
7297                header: None,
7298                strategy: ConsolidationStrategy::Incremental,
7299                min_age_hours: 0,
7300                min_access_count: 0,
7301                dry_run: false,
7302            })
7303            .await
7304            .unwrap();
7305
7306        // Check that extracted relations are in metadata
7307        let sem_ids = engine.semantic.list_ids().await.unwrap();
7308        let sem_record = engine.semantic.get(&sem_ids[0]).await.unwrap().unwrap();
7309        if let serde_json::Value::Object(ref map) = sem_record.metadata {
7310            let relations = map.get("extracted_relations");
7311            assert!(relations.is_some());
7312            if let Some(serde_json::Value::Array(arr)) = relations {
7313                assert!(!arr.is_empty());
7314            }
7315        }
7316    }
7317
7318    #[tokio::test]
7319    async fn consolidation_no_llm_fallback() {
7320        // Without LLM, summary should use truncation
7321        let engine = make_engine().await;
7322        engine
7323            .encode_store(text_store_req(
7324                "Short text without LLM",
7325                Some(StoreType::Episodic),
7326            ))
7327            .await
7328            .unwrap();
7329
7330        let resp = engine
7331            .lifecycle_consolidate(ConsolidateRequest {
7332                header: None,
7333                strategy: ConsolidationStrategy::Incremental,
7334                min_age_hours: 0,
7335                min_access_count: 0,
7336                dry_run: false,
7337            })
7338            .await
7339            .unwrap();
7340
7341        assert_eq!(resp.records_migrated, 1);
7342        let sem_ids = engine.semantic.list_ids().await.unwrap();
7343        let sem_record = engine.semantic.get(&sem_ids[0]).await.unwrap().unwrap();
7344        assert!(sem_record.content.summary.is_some());
7345        assert_eq!(
7346            sem_record.content.summary.as_deref(),
7347            Some("Short text without LLM")
7348        );
7349    }
7350
7351    #[tokio::test]
7352    async fn consolidation_compression_metrics() {
7353        let engine = make_engine().await;
7354        for i in 0..5 {
7355            engine
7356                .encode_store(text_store_req(
7357                    &format!("Test {i}"),
7358                    Some(StoreType::Episodic),
7359                ))
7360                .await
7361                .unwrap();
7362        }
7363
7364        let resp = engine
7365            .lifecycle_consolidate(ConsolidateRequest {
7366                header: None,
7367                strategy: ConsolidationStrategy::Full,
7368                min_age_hours: 0,
7369                min_access_count: 0,
7370                dry_run: false,
7371            })
7372            .await
7373            .unwrap();
7374
7375        assert_eq!(resp.records_processed, 5);
7376        assert!(resp.records_migrated > 0);
7377        // Pipeline should complete successfully regardless of duplicate count
7378        assert!(resp.records_processed > 0);
7379    }
7380
7381    #[tokio::test]
7382    async fn consolidation_min_age_filter() {
7383        let engine = make_engine().await;
7384        engine
7385            .encode_store(text_store_req("Fresh record", Some(StoreType::Episodic)))
7386            .await
7387            .unwrap();
7388
7389        let resp = engine
7390            .lifecycle_consolidate(ConsolidateRequest {
7391                header: None,
7392                strategy: ConsolidationStrategy::Incremental,
7393                min_age_hours: 24, // Record is brand new, won't pass
7394                min_access_count: 0,
7395                dry_run: false,
7396            })
7397            .await
7398            .unwrap();
7399
7400        assert_eq!(resp.records_migrated, 0);
7401    }
7402
7403    #[tokio::test]
7404    async fn consolidation_preserves_associations() {
7405        let engine = make_engine().await;
7406        let r1 = engine
7407            .encode_store(text_store_req("Memory A", Some(StoreType::Episodic)))
7408            .await
7409            .unwrap();
7410        let r2 = engine
7411            .encode_store(text_store_req("Memory B", Some(StoreType::Episodic)))
7412            .await
7413            .unwrap();
7414
7415        // Create association
7416        let assoc = Association {
7417            target_id: r2.record_id,
7418            association_type: AssociationType::Semantic,
7419            weight: 0.8,
7420            created_at: Utc::now(),
7421            last_co_activation: Utc::now(),
7422        };
7423        engine
7424            .coordinator
7425            .add_association(&r1.record_id, assoc)
7426            .await
7427            .unwrap();
7428
7429        engine
7430            .lifecycle_consolidate(ConsolidateRequest {
7431                header: None,
7432                strategy: ConsolidationStrategy::Incremental,
7433                min_age_hours: 0,
7434                min_access_count: 0,
7435                dry_run: false,
7436            })
7437            .await
7438            .unwrap();
7439
7440        // The original episodic record should have an association to the new semantic node
7441        let assocs = engine
7442            .coordinator
7443            .get_associations(&r1.record_id)
7444            .await
7445            .unwrap();
7446        assert!(assocs.len() >= 2); // original + consolidated
7447    }
7448
7449    #[tokio::test]
7450    async fn duplicate_detection_with_embeddings() {
7451        let provider = Arc::new(MockLLMProvider::new(4));
7452        let engine = CerememoryEngine::new(EngineConfig {
7453            llm_provider: Some(provider),
7454            ..Default::default()
7455        })
7456        .unwrap();
7457
7458        // Store two records with very similar text (mock embeddings will differ slightly)
7459        engine
7460            .encode_store(text_store_req("Hello world", Some(StoreType::Episodic)))
7461            .await
7462            .unwrap();
7463        engine
7464            .encode_store(text_store_req("Hello worlds", Some(StoreType::Episodic)))
7465            .await
7466            .unwrap();
7467
7468        let initial_count = engine.episodic.count().await.unwrap();
7469
7470        let resp = engine
7471            .lifecycle_consolidate(ConsolidateRequest {
7472                header: None,
7473                strategy: ConsolidationStrategy::Full,
7474                min_age_hours: 0,
7475                min_access_count: 0,
7476                dry_run: false,
7477            })
7478            .await
7479            .unwrap();
7480
7481        // Whether duplicates are detected depends on the mock embedding similarity
7482        // The test verifies the pipeline doesn't crash
7483        assert!(resp.records_processed > 0 || initial_count > 0);
7484    }
7485}