use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;
const DEFAULT_BATCH_SEQUENTIAL_WEIGHT: f64 = 0.7;
const ALL_STORES: [StoreType; 5] = [
StoreType::Episodic,
StoreType::Semantic,
StoreType::Procedural,
StoreType::Emotional,
StoreType::Working,
];
struct TimerGuard {
name: &'static str,
start: std::time::Instant,
}
struct DreamGroup {
session_id: String,
raw_topic_id: Option<String>,
topic_hint: Option<String>,
inferred_topic: bool,
records: Vec<RawJournalRecord>,
}
impl TimerGuard {
fn new(name: &'static str) -> Self {
Self {
name,
start: std::time::Instant::now(),
}
}
}
impl Drop for TimerGuard {
fn drop(&mut self) {
metrics::histogram!(self.name).record(self.start.elapsed().as_secs_f64());
}
}
macro_rules! dispatch_store {
($self:expr, $store_type:expr, $method:ident ( $($arg:expr),* )) => {
match $store_type {
StoreType::Episodic => $self.episodic.$method($($arg),*).await,
StoreType::Semantic => $self.semantic.$method($($arg),*).await,
StoreType::Procedural => $self.procedural.$method($($arg),*).await,
StoreType::Emotional => $self.emotional.$method($($arg),*).await,
StoreType::Working => $self.working.$method($($arg),*).await,
}
};
}
use chrono::Utc;
use tracing::{info, warn};
use uuid::Uuid;
use cerememory_association::SpreadingActivationEngine;
use cerememory_core::error::CerememoryError;
use cerememory_core::protocol::*;
use cerememory_core::traits::*;
use cerememory_core::types::*;
use cerememory_decay::{DecayParams, PowerLawDecayEngine};
use cerememory_evolution::EvolutionEngine;
use cerememory_index::text_index::TextIndex;
use cerememory_index::vector_index::VectorIndex;
use cerememory_index::HippocampalCoordinator;
use cerememory_store_emotional::EmotionalStore;
use cerememory_store_episodic::EpisodicStore;
use cerememory_store_procedural::ProceduralStore;
use cerememory_store_raw::RawJournalStore;
use cerememory_store_semantic::SemanticStore;
use cerememory_store_working::WorkingMemoryStore;
pub struct EngineConfig {
pub raw_journal_path: Option<String>,
pub episodic_path: Option<String>,
pub semantic_path: Option<String>,
pub procedural_path: Option<String>,
pub emotional_path: Option<String>,
pub working_capacity: usize,
pub decay_params: DecayParams,
pub recall_mode: RecallMode,
pub index_path: Option<String>,
pub vector_index_path: Option<String>,
pub background_decay_interval_secs: Option<u64>,
pub background_dream_interval_secs: Option<u64>,
pub hnsw_threshold: usize,
pub llm_provider: Option<Arc<dyn LLMProvider>>,
}
impl Default for EngineConfig {
fn default() -> Self {
Self {
raw_journal_path: None,
episodic_path: None,
semantic_path: None,
procedural_path: None,
emotional_path: None,
working_capacity: 7,
decay_params: DecayParams::default(),
recall_mode: RecallMode::Human,
index_path: None,
vector_index_path: None,
background_decay_interval_secs: None,
background_dream_interval_secs: None,
hnsw_threshold: cerememory_index::vector_index::DEFAULT_HNSW_THRESHOLD,
llm_provider: None,
}
}
}
pub struct CerememoryEngine {
raw_journal: RawJournalStore,
episodic: EpisodicStore,
semantic: SemanticStore,
procedural: ProceduralStore,
emotional: EmotionalStore,
working: WorkingMemoryStore,
decay: PowerLawDecayEngine,
activation: SpreadingActivationEngine<HippocampalCoordinator>,
evolution: EvolutionEngine,
coordinator: Arc<HippocampalCoordinator>,
text_index: TextIndex,
vector_index: VectorIndex,
recall_mode: tokio::sync::RwLock<RecallMode>,
llm_provider: Option<Arc<dyn LLMProvider>>,
background_decay_interval_secs: Option<u64>,
decay_state: tokio::sync::Mutex<Option<BackgroundDecayState>>,
background_dream_interval_secs: Option<u64>,
dream_state: tokio::sync::Mutex<Option<BackgroundDreamState>>,
}
struct BackgroundDecayState {
shutdown_tx: tokio::sync::watch::Sender<bool>,
handle: tokio::task::JoinHandle<()>,
}
struct BackgroundDreamState {
shutdown_tx: tokio::sync::watch::Sender<bool>,
handle: tokio::task::JoinHandle<()>,
}
impl CerememoryEngine {
pub fn new(config: EngineConfig) -> Result<Self, CerememoryError> {
let raw_journal = match &config.raw_journal_path {
Some(p) => RawJournalStore::open(p)?,
None => RawJournalStore::open_in_memory()?,
};
let episodic = match &config.episodic_path {
Some(p) => EpisodicStore::open(p)?,
None => EpisodicStore::open_in_memory()?,
};
let semantic = match &config.semantic_path {
Some(p) => SemanticStore::open(p)?,
None => SemanticStore::open_in_memory()?,
};
let text_index = match &config.index_path {
Some(p) => match TextIndex::open(p) {
Ok(idx) => idx,
Err(e) => {
warn!(
error = %e,
path = %p,
"Corrupted text index detected, recreating (data will be repopulated via rebuild_coordinator)"
);
let _ = std::fs::remove_dir_all(p);
TextIndex::open(p).map_err(|e2| {
CerememoryError::Storage(format!(
"Failed to recreate text index after corruption: {e2}"
))
})?
}
},
None => TextIndex::open_in_memory()?,
};
let vector_index = match &config.vector_index_path {
Some(p) => VectorIndex::open_with_threshold(p, config.hnsw_threshold)?,
None => VectorIndex::open_in_memory_with_threshold(config.hnsw_threshold)?,
};
let procedural = match &config.procedural_path {
Some(p) => ProceduralStore::open(p)?,
None => ProceduralStore::open_in_memory()?,
};
let emotional = match &config.emotional_path {
Some(p) => EmotionalStore::open(p)?,
None => EmotionalStore::open_in_memory()?,
};
let coordinator = Arc::new(HippocampalCoordinator::new());
let activation = SpreadingActivationEngine::new(Arc::clone(&coordinator));
Ok(Self {
raw_journal,
episodic,
semantic,
procedural,
emotional,
working: WorkingMemoryStore::with_capacity(config.working_capacity.max(1)),
decay: PowerLawDecayEngine::new(config.decay_params),
activation,
evolution: EvolutionEngine::new(),
coordinator,
text_index,
vector_index,
recall_mode: tokio::sync::RwLock::new(config.recall_mode),
llm_provider: config.llm_provider,
background_decay_interval_secs: config.background_decay_interval_secs,
decay_state: tokio::sync::Mutex::new(None),
background_dream_interval_secs: config.background_dream_interval_secs,
dream_state: tokio::sync::Mutex::new(None),
})
}
pub fn in_memory() -> Result<Self, CerememoryError> {
Self::new(EngineConfig::default())
}
pub fn start_background_decay(self: &Arc<Self>) {
let Some(interval_secs) = self.background_decay_interval_secs else {
return;
};
let mut guard = match self.decay_state.try_lock() {
Ok(g) => g,
Err(_) => return,
};
if guard.is_some() {
return; }
let (tx, rx) = tokio::sync::watch::channel(false);
let engine = Arc::clone(self);
let handle = tokio::spawn(async move {
let mut backoff_secs = 1u64;
const MAX_BACKOFF_SECS: u64 = 60;
loop {
let engine_ref = Arc::clone(&engine);
let mut rx_ref = rx.clone();
let result = tokio::spawn(async move {
let mut interval =
tokio::time::interval(std::time::Duration::from_secs(interval_secs));
interval.tick().await;
loop {
tokio::select! {
_ = interval.tick() => {
let req = DecayTickRequest {
header: None,
tick_duration_seconds: Some(interval_secs.min(u32::MAX as u64) as u32),
};
if let Err(e) = engine_ref.lifecycle_decay_tick(req).await {
warn!(error = %e, "Background decay tick failed");
}
}
_ = rx_ref.changed() => {
if *rx_ref.borrow() {
info!("Background decay stopped");
return;
}
}
}
}
})
.await;
if *rx.borrow() {
return;
}
match result {
Ok(()) => return, Err(e) => {
tracing::error!(
error = %e,
backoff_secs,
"Background decay task panicked, restarting"
);
metrics::counter!("cerememory_decay_panics_total").increment(1);
tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)).await;
backoff_secs = (backoff_secs * 2).min(MAX_BACKOFF_SECS);
}
}
}
});
*guard = Some(BackgroundDecayState {
shutdown_tx: tx,
handle,
});
}
pub async fn stop_background_decay(&self) {
let state = {
let mut guard = self.decay_state.lock().await;
guard.take()
};
if let Some(state) = state {
let _ = state.shutdown_tx.send(true);
let _ = state.handle.await;
}
}
pub async fn is_background_decay_enabled(&self) -> bool {
self.decay_state.lock().await.is_some()
}
pub fn start_background_dream(self: &Arc<Self>) {
let Some(interval_secs) = self.background_dream_interval_secs else {
return;
};
let mut guard = match self.dream_state.try_lock() {
Ok(g) => g,
Err(_) => return,
};
if guard.is_some() {
return;
}
let (tx, rx) = tokio::sync::watch::channel(false);
let engine = Arc::clone(self);
let handle = tokio::spawn(async move {
let mut backoff_secs = 1u64;
const MAX_BACKOFF_SECS: u64 = 300;
loop {
let engine_ref = Arc::clone(&engine);
let mut rx_ref = rx.clone();
let result = tokio::spawn(async move {
let mut interval =
tokio::time::interval(std::time::Duration::from_secs(interval_secs));
interval.tick().await;
loop {
tokio::select! {
_ = interval.tick() => {
let req = DreamTickRequest {
header: None,
session_id: None,
dry_run: false,
max_groups: 50,
include_private_scratch: false,
include_sealed: false,
promote_semantic: true,
secrecy_levels: None,
};
if let Err(e) = engine_ref.lifecycle_dream_tick(req).await {
warn!(error = %e, "Background dream tick failed");
}
}
_ = rx_ref.changed() => {
if *rx_ref.borrow() {
info!("Background dream processing stopped");
return;
}
}
}
}
})
.await;
if *rx.borrow() {
return;
}
match result {
Ok(()) => return,
Err(e) => {
tracing::error!(
error = %e,
backoff_secs,
"Background dream task panicked, restarting"
);
tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)).await;
backoff_secs = (backoff_secs * 2).min(MAX_BACKOFF_SECS);
}
}
}
});
*guard = Some(BackgroundDreamState {
shutdown_tx: tx,
handle,
});
}
pub async fn stop_background_dream(&self) {
let state = {
let mut guard = self.dream_state.lock().await;
guard.take()
};
if let Some(state) = state {
let _ = state.shutdown_tx.send(true);
if let Err(e) = state.handle.await {
tracing::error!(error = %e, "Background dream task join failed");
}
}
}
pub async fn is_background_dream_enabled(&self) -> bool {
self.dream_state.lock().await.is_some()
}
pub async fn rebuild_coordinator(&self) -> Result<(), CerememoryError> {
self.vector_index.clear()?;
let mut entries = Vec::new();
let mut text_records = Vec::new();
for store_type in [
StoreType::Episodic,
StoreType::Semantic,
StoreType::Procedural,
StoreType::Emotional,
] {
let records = dispatch_store!(self, store_type, get_all())?;
for record in records {
entries.push((record.id, store_type, record.associations.clone()));
let text = Self::build_searchable_text(&record).unwrap_or_default();
if Self::has_indexable_content(&text, &record) {
text_records.push((
record.id,
store_type,
text,
record.content.summary.clone(),
));
}
if let Some(embedding) = Self::primary_embedding(&record) {
let _ = self.vector_index.upsert(record.id, embedding);
}
}
}
self.coordinator.rebuild(entries).await;
self.text_index.rebuild(&text_records)?;
self.vector_index.rebuild_hnsw()?;
info!(
records = self.coordinator.total_records().await,
hnsw_active = self.vector_index.is_hnsw_active(),
"Coordinator and indexes rebuilt from persistent stores"
);
Ok(())
}
fn has_indexable_content(searchable_text: &str, record: &MemoryRecord) -> bool {
!searchable_text.is_empty()
|| record
.content
.summary
.as_deref()
.map(str::trim)
.is_some_and(|summary| !summary.is_empty())
}
fn build_searchable_text(record: &MemoryRecord) -> Option<String> {
let mut chunks = Vec::new();
for block in &record.content.blocks {
match block.modality {
Modality::Text => match std::str::from_utf8(&block.data) {
Ok(text) => {
let trimmed = text.trim();
if !trimmed.is_empty() {
chunks.push(trimmed.to_string());
}
}
Err(error) => {
warn!(
error = %error,
record_id = %record.id,
"Skipping invalid UTF-8 text block during indexing"
);
}
},
Modality::Structured => {
match cerememory_index::structured_index::flatten_json_to_text(&block.data) {
Ok(flat) if !flat.is_empty() => chunks.push(flat),
Ok(_) => {}
Err(error) => {
warn!(
error = %error,
record_id = %record.id,
"Skipping invalid structured block during indexing"
);
}
}
}
_ => {}
}
}
if chunks.is_empty() {
None
} else {
Some(chunks.join("\n"))
}
}
fn primary_embedding(record: &MemoryRecord) -> Option<&[f32]> {
record
.content
.blocks
.iter()
.find_map(|block| block.embedding.as_deref())
}
fn detect_image_format(data: &[u8]) -> Option<&'static str> {
if data.len() >= 8 && data.starts_with(b"\x89PNG\r\n\x1a\n") {
return Some("png");
}
if data.len() >= 3 && data.starts_with(&[0xFF, 0xD8, 0xFF]) {
return Some("jpeg");
}
if data.len() >= 6 && (data.starts_with(b"GIF87a") || data.starts_with(b"GIF89a")) {
return Some("gif");
}
if data.len() >= 12 && data.starts_with(b"RIFF") && &data[8..12] == b"WEBP" {
return Some("webp");
}
None
}
fn detect_audio_format(data: &[u8]) -> Option<&'static str> {
if data.len() >= 12 && data.starts_with(b"RIFF") && &data[8..12] == b"WAVE" {
return Some("wav");
}
if data.len() >= 4 && data.starts_with(b"fLaC") {
return Some("flac");
}
if data.len() >= 4 && data.starts_with(b"OggS") {
return Some("ogg");
}
if data.len() >= 3 && data.starts_with(b"ID3") {
return Some("mp3");
}
if data.len() >= 2 && data[0] == 0xFF && (data[1] & 0xE0) == 0xE0 {
return Some("mp3");
}
if data.len() >= 12 && &data[4..8] == b"ftyp" {
return Some("mp4");
}
if data.len() >= 4 && data.starts_with(&[0x1A, 0x45, 0xDF, 0xA3]) {
return Some("webm");
}
None
}
async fn resolve_recall_cues(
&self,
cue: &RecallCue,
) -> Result<(Option<String>, Option<Vec<f32>>), CerememoryError> {
let mut text = cue
.text
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_owned);
let mut embedding = cue.embedding.clone();
if let Some(image) = cue.image.as_ref() {
if image.is_empty() {
return Err(CerememoryError::Validation(
"Recall image cue must not be empty".to_string(),
));
}
if image.len() > MAX_IMAGE_SIZE {
return Err(CerememoryError::ContentTooLarge {
size: image.len(),
limit: MAX_IMAGE_SIZE,
});
}
if embedding.is_none() {
let provider = self.llm_provider.as_ref().ok_or_else(|| {
CerememoryError::ModalityUnsupported(
"Image recall requires an LLM provider with image embedding support"
.to_string(),
)
})?;
let caps = provider.capabilities();
if !caps.image_embedding {
return Err(CerememoryError::ModalityUnsupported(
"Configured LLM provider does not support image recall".to_string(),
));
}
let format = Self::detect_image_format(image).ok_or_else(|| {
CerememoryError::Validation("Unsupported image recall cue format".to_string())
})?;
let generated = provider.embed_image(image, format).await?;
if generated.is_empty() {
return Err(CerememoryError::Internal(
"Image recall provider returned an empty embedding".to_string(),
));
}
embedding = Some(generated);
}
}
if let Some(audio) = cue.audio.as_ref() {
if audio.is_empty() {
return Err(CerememoryError::Validation(
"Recall audio cue must not be empty".to_string(),
));
}
if audio.len() > MAX_AUDIO_SIZE {
return Err(CerememoryError::ContentTooLarge {
size: audio.len(),
limit: MAX_AUDIO_SIZE,
});
}
let provider = self.llm_provider.as_ref().ok_or_else(|| {
CerememoryError::ModalityUnsupported(
"Audio recall requires an LLM provider with transcription support".to_string(),
)
})?;
let caps = provider.capabilities();
if !caps.audio_transcription {
return Err(CerememoryError::ModalityUnsupported(
"Configured LLM provider does not support audio recall".to_string(),
));
}
let format = Self::detect_audio_format(audio).ok_or_else(|| {
CerememoryError::Validation("Unsupported audio recall cue format".to_string())
})?;
let transcript = provider.transcribe_audio(audio, format).await?;
let transcript = transcript.trim();
if transcript.is_empty() {
return Err(CerememoryError::Validation(
"Audio recall cue produced an empty transcript".to_string(),
));
}
match &mut text {
Some(existing) => {
existing.push('\n');
existing.push_str(transcript);
}
None => text = Some(transcript.to_string()),
}
if embedding.is_none() && caps.text_embedding {
let generated = provider.embed(transcript).await?;
if !generated.is_empty() {
embedding = Some(generated);
}
}
}
if embedding.is_none() {
if let (Some(provider), Some(query_text)) =
(self.llm_provider.as_ref(), text.as_deref())
{
if provider.capabilities().text_embedding {
match provider.embed(query_text).await {
Ok(generated) if !generated.is_empty() => embedding = Some(generated),
Ok(_) => warn!("Text recall cue embedding returned an empty vector"),
Err(error) => warn!(error = %error, "Failed to embed text recall cue"),
}
}
}
}
Ok((text, embedding))
}
fn index_record(&self, record: &MemoryRecord) -> Result<(), CerememoryError> {
let text = Self::build_searchable_text(record).unwrap_or_default();
if Self::has_indexable_content(&text, record) {
self.text_index.add(
record.id,
record.store,
&text,
record.content.summary.as_deref(),
)?;
}
if let Some(embedding) = Self::primary_embedding(record) {
self.vector_index.upsert(record.id, embedding)?;
}
Ok(())
}
fn unindex_record(&self, id: Uuid) {
let _ = self.text_index.remove(id);
let _ = self.vector_index.remove(id);
}
fn route_store(&self, content: &MemoryContent) -> StoreType {
if content.summary.is_some() {
StoreType::Semantic
} else {
StoreType::Episodic
}
}
async fn get_store_record(
&self,
id: &Uuid,
) -> Result<Option<(MemoryRecord, StoreType)>, CerememoryError> {
if let Some(st) = self.coordinator.get_record_store_type(id).await? {
let record = dispatch_store!(self, st, get(id))?;
return Ok(record.map(|r| (r, st)));
}
for st in [
StoreType::Working,
StoreType::Episodic,
StoreType::Semantic,
StoreType::Procedural,
StoreType::Emotional,
] {
if let Some(r) = dispatch_store!(self, st, get(id))? {
return Ok(Some((r, st)));
}
}
Ok(None)
}
fn build_record_metadata(
context: Option<EncodeContext>,
metadata: Option<serde_json::Value>,
) -> serde_json::Value {
let context_value = context.and_then(|ctx| serde_json::to_value(ctx).ok());
match (metadata, context_value) {
(Some(serde_json::Value::Object(mut map)), Some(context)) => {
map.insert("_context".to_string(), context);
serde_json::Value::Object(map)
}
(Some(other), Some(context)) => serde_json::json!({
"_metadata": other,
"_context": context,
}),
(Some(metadata), None) => metadata,
(None, Some(context)) => serde_json::json!({ "_context": context }),
(None, None) => serde_json::Value::Object(serde_json::Map::new()),
}
}
async fn persist_associations_for_record(
&self,
record_id: &Uuid,
store_type: StoreType,
associations: Vec<Association>,
) -> Result<(), CerememoryError> {
dispatch_store!(
self,
store_type,
replace_associations(record_id, associations.clone())
)?;
self.coordinator
.update_associations(record_id, associations)
.await?;
Ok(())
}
async fn add_persisted_association(
&self,
record_id: &Uuid,
association: Association,
) -> Result<(), CerememoryError> {
let Some((record, store_type)) = self.get_store_record(record_id).await? else {
return Err(CerememoryError::RecordNotFound(record_id.to_string()));
};
let mut associations = self.coordinator.get_associations(record_id).await?;
for persisted in &record.associations {
if !associations.iter().any(|existing| {
existing.target_id == persisted.target_id
&& existing.association_type == persisted.association_type
}) {
associations.push(persisted.clone());
}
}
if associations.iter().any(|existing| {
existing.target_id == association.target_id
&& existing.association_type == association.association_type
}) {
return Ok(());
}
associations.push(association);
self.persist_associations_for_record(record_id, store_type, associations)
.await
}
async fn remove_deleted_targets_from_records(
&self,
deleted_ids: &HashSet<Uuid>,
) -> Result<(), CerememoryError> {
if deleted_ids.is_empty() {
return Ok(());
}
for store_type in ALL_STORES {
let records = dispatch_store!(self, store_type, get_all())?;
for record in records {
let filtered: Vec<_> = record
.associations
.iter()
.filter(|association| !deleted_ids.contains(&association.target_id))
.cloned()
.collect();
if filtered.len() != record.associations.len() {
self.persist_associations_for_record(&record.id, store_type, filtered)
.await?;
}
}
}
Ok(())
}
async fn cleanup_deleted_records(
&self,
deleted_records: &[(Uuid, StoreType)],
) -> Result<(), CerememoryError> {
if deleted_records.is_empty() {
return Ok(());
}
let mut deleted_ids = HashSet::with_capacity(deleted_records.len());
for (record_id, _) in deleted_records {
deleted_ids.insert(*record_id);
self.coordinator.unregister(record_id).await;
self.unindex_record(*record_id);
}
self.remove_deleted_targets_from_records(&deleted_ids).await
}
fn normalize_export_format(format: &str) -> Result<&'static str, CerememoryError> {
let format = format.trim();
if format.eq_ignore_ascii_case("cma") || format.eq_ignore_ascii_case("jsonl") {
Ok("cma")
} else {
Err(CerememoryError::Validation(format!(
"Unsupported export format '{format}'. Valid options: cma, jsonl"
)))
}
}
async fn delete_records(
&self,
delete_targets: Vec<(Uuid, StoreType)>,
) -> Result<u32, CerememoryError> {
let mut deleted_records = Vec::new();
for (id, store_type) in delete_targets {
if dispatch_store!(self, store_type, delete(&id))? {
deleted_records.push((id, store_type));
}
}
let deleted = deleted_records.len() as u32;
self.cleanup_deleted_records(&deleted_records).await?;
Ok(deleted)
}
async fn clear_all_records(&self) -> Result<u32, CerememoryError> {
let mut delete_targets = Vec::new();
for store_type in ALL_STORES {
for id in dispatch_store!(self, store_type, list_ids())? {
delete_targets.push((id, store_type));
}
}
self.delete_records(delete_targets).await
}
async fn restore_records(&self, records: &[MemoryRecord]) -> Result<(), CerememoryError> {
self.clear_all_records().await?;
for record in records {
let store_type = record.store;
dispatch_store!(self, store_type, store(record.clone()))?;
self.coordinator
.register(record.id, store_type, record.associations.clone())
.await;
let _ = self.index_record(record);
}
Ok(())
}
async fn import_records_with_conflict_resolution(
&self,
records: Vec<MemoryRecord>,
conflict_resolution: ConflictResolution,
) -> Result<u32, CerememoryError> {
let mut imported = 0u32;
for record in records {
let store_type = record.store;
let mut replaced_cross_store: Option<StoreType> = None;
if let Some((existing, existing_store)) = self.get_store_record(&record.id).await? {
match conflict_resolution {
ConflictResolution::KeepExisting => continue,
ConflictResolution::KeepImported => {
if existing_store != store_type {
replaced_cross_store = Some(existing_store);
}
}
ConflictResolution::KeepNewer => {
if record.updated_at <= existing.updated_at {
continue;
}
if existing_store != store_type {
replaced_cross_store = Some(existing_store);
}
}
}
}
dispatch_store!(self, store_type, store(record.clone()))?;
if let Some(existing_store) = replaced_cross_store {
if !dispatch_store!(self, existing_store, delete(&record.id))? {
if let Err(e) = dispatch_store!(self, store_type, delete(&record.id)) {
warn!(
record_id = %record.id,
store = %store_type,
error = %e,
"Failed to clean up newly stored record during import conflict rollback"
);
}
return Err(CerememoryError::ImportConflict(format!(
"Failed to replace cross-store record {} from {} to {}",
record.id, existing_store, store_type
)));
}
}
self.coordinator
.register(record.id, store_type, record.associations.clone())
.await;
if let Err(e) = self.index_record(&record) {
warn!(error = %e, record_id = %record.id, "Failed to index imported record");
}
imported += 1;
}
Ok(imported)
}
async fn collect_all_raw_journal_records(
&self,
) -> Result<Vec<RawJournalRecord>, CerememoryError> {
self.raw_journal.get_all().await
}
async fn clear_raw_journal(&self) -> Result<u32, CerememoryError> {
let records = self.raw_journal.get_all().await?;
let mut deleted = 0u32;
for record in records {
if self.raw_journal.delete(&record.id).await? {
deleted += 1;
}
}
Ok(deleted)
}
async fn restore_raw_journal(
&self,
raw_records: &[RawJournalRecord],
) -> Result<(), CerememoryError> {
self.clear_raw_journal().await?;
for record in raw_records {
self.raw_journal.append(record.clone()).await?;
}
Ok(())
}
async fn import_raw_records_with_conflict_resolution(
&self,
raw_records: Vec<RawJournalRecord>,
conflict_resolution: ConflictResolution,
) -> Result<u32, CerememoryError> {
let mut imported = 0u32;
for record in raw_records {
if let Some(existing) = self.raw_journal.get(&record.id).await? {
match conflict_resolution {
ConflictResolution::KeepExisting => continue,
ConflictResolution::KeepImported => {
self.raw_journal.update(record.clone()).await?;
imported += 1;
}
ConflictResolution::KeepNewer => {
if record.updated_at > existing.updated_at {
self.raw_journal.update(record.clone()).await?;
imported += 1;
}
}
}
} else {
self.raw_journal.append(record).await?;
imported += 1;
}
}
Ok(imported)
}
pub async fn append_raw_journal(
&self,
record: RawJournalRecord,
) -> Result<Uuid, CerememoryError> {
self.raw_journal.append(record).await
}
pub async fn get_raw_journal_record(
&self,
id: &Uuid,
) -> Result<Option<RawJournalRecord>, CerememoryError> {
self.raw_journal.get(id).await
}
pub async fn query_raw_journal_by_session(
&self,
session_id: &str,
) -> Result<Vec<RawJournalRecord>, CerememoryError> {
self.raw_journal.query_session(session_id).await
}
pub async fn query_raw_journal_session_range(
&self,
session_id: &str,
start: chrono::DateTime<Utc>,
end: chrono::DateTime<Utc>,
) -> Result<Vec<RawJournalRecord>, CerememoryError> {
self.raw_journal
.query_session_range(session_id, start, end)
.await
}
pub async fn raw_journal_count(&self) -> Result<usize, CerememoryError> {
self.raw_journal.count().await
}
fn raw_query_allowed_visibility(
record: &RawJournalRecord,
include_private_scratch: bool,
include_sealed: bool,
) -> bool {
match record.visibility {
RawVisibility::Normal => true,
RawVisibility::PrivateScratch => include_private_scratch,
RawVisibility::Sealed => include_sealed,
}
}
fn raw_query_allowed_secrecy(
record: &RawJournalRecord,
secrecy_levels: Option<&[SecrecyLevel]>,
) -> bool {
match secrecy_levels {
Some(levels) => levels.contains(&record.secrecy_level),
None => matches!(
record.secrecy_level,
SecrecyLevel::Public | SecrecyLevel::Sensitive
),
}
}
fn raw_record_processed(record: &RawJournalRecord) -> bool {
record
.metadata
.get("_dream")
.and_then(|value| value.get("processed_at"))
.and_then(|value| value.as_str())
.is_some()
}
fn mark_raw_record_dream_processed(
record: &mut RawJournalRecord,
summary_id: Uuid,
semantic_id: Option<Uuid>,
dreamed_at: chrono::DateTime<Utc>,
) {
record.updated_at = dreamed_at;
if !record.derived_memory_ids.contains(&summary_id) {
record.derived_memory_ids.push(summary_id);
}
if let Some(semantic_id) = semantic_id {
if !record.derived_memory_ids.contains(&semantic_id) {
record.derived_memory_ids.push(semantic_id);
}
}
let metadata = if let serde_json::Value::Object(map) = &mut record.metadata {
map
} else {
record.metadata = serde_json::json!({});
match &mut record.metadata {
serde_json::Value::Object(map) => map,
_ => unreachable!("metadata initialized as object"),
}
};
let dream = metadata
.entry("_dream".to_string())
.or_insert_with(|| serde_json::json!({}));
if let serde_json::Value::Object(map) = dream {
map.insert(
"processed_at".to_string(),
serde_json::Value::String(dreamed_at.to_rfc3339()),
);
map.insert(
"last_summary_id".to_string(),
serde_json::Value::String(summary_id.to_string()),
);
}
let derived = metadata
.entry("_derived".to_string())
.or_insert_with(|| serde_json::json!({}));
if let serde_json::Value::Object(map) = derived {
let summary_entry = map
.entry("episodic_summary_ids".to_string())
.or_insert_with(|| serde_json::json!([]));
if let serde_json::Value::Array(ids) = summary_entry {
let summary_value = serde_json::Value::String(summary_id.to_string());
if !ids.iter().any(|existing| existing == &summary_value) {
ids.push(summary_value);
}
}
if let Some(semantic_id) = semantic_id {
let semantic_entry = map
.entry("semantic_ids".to_string())
.or_insert_with(|| serde_json::json!([]));
if let serde_json::Value::Array(ids) = semantic_entry {
let semantic_value = serde_json::Value::String(semantic_id.to_string());
if !ids.iter().any(|existing| existing == &semantic_value) {
ids.push(semantic_value);
}
}
}
}
}
fn dream_stat_u64(summary_stats: &serde_json::Value, key: &str) -> u64 {
summary_stats
.get(key)
.and_then(|value| value.as_u64())
.unwrap_or(0)
}
fn should_promote_dream_group(
raw_topic_id: Option<&str>,
topic_hint: Option<&str>,
summary_stats: &serde_json::Value,
) -> bool {
let normal_records = Self::dream_stat_u64(summary_stats, "normal_records");
let has_topic_signal = raw_topic_id.filter(|topic| !topic.is_empty()).is_some()
|| topic_hint.filter(|topic| !topic.is_empty()).is_some();
normal_records >= 2 && has_topic_signal
}
fn attach_summary_semantic_link(summary_record: &mut MemoryRecord, semantic_id: Uuid) {
if let serde_json::Value::Object(map) = &mut summary_record.metadata {
let derived = map
.entry("_derived".to_string())
.or_insert_with(|| serde_json::json!({}));
if let serde_json::Value::Object(derived_map) = derived {
let semantic_ids = derived_map
.entry("semantic_ids".to_string())
.or_insert_with(|| serde_json::json!([]));
if let serde_json::Value::Array(ids) = semantic_ids {
let semantic_value = serde_json::Value::String(semantic_id.to_string());
if !ids.iter().any(|existing| existing == &semantic_value) {
ids.push(semantic_value);
}
}
}
}
}
fn prepare_dream_summary_inputs(
raw_records: &[RawJournalRecord],
) -> (Vec<String>, serde_json::Value) {
let mut texts = Vec::new();
let mut normal_count = 0u32;
let mut private_scratch_redacted = 0u32;
let mut sealed_redacted = 0u32;
let mut secret_redacted = 0u32;
for record in raw_records {
match (record.visibility, record.secrecy_level) {
(_, SecrecyLevel::Secret) => {
secret_redacted += 1;
}
(RawVisibility::Sealed, _) => {
sealed_redacted += 1;
}
(RawVisibility::PrivateScratch, _) => {
private_scratch_redacted += 1;
}
(RawVisibility::Normal, _) => {
if let Some(text) = record.text_content() {
texts.push(text.to_string());
}
normal_count += 1;
}
}
}
let stats = serde_json::json!({
"normal_records": normal_count,
"private_scratch_redacted": private_scratch_redacted,
"sealed_redacted": sealed_redacted,
"secret_redacted": secret_redacted,
"redacted_total": private_scratch_redacted + sealed_redacted + secret_redacted,
});
(texts, stats)
}
fn tokenize_topic_text(text: &str) -> HashSet<String> {
const STOPWORDS: &[&str] = &[
"the", "and", "for", "with", "that", "this", "from", "into", "only", "then", "than",
"have", "has", "had", "were", "was", "are", "but", "not", "you", "your", "our", "raw",
"note", "notes", "session", "topic", "summary", "record", "records",
];
text.chars()
.map(|ch| if ch.is_alphanumeric() { ch } else { ' ' })
.collect::<String>()
.split_whitespace()
.map(|token| token.to_lowercase())
.filter(|token| token.len() >= 3)
.filter(|token| !STOPWORDS.iter().any(|stopword| stopword == token))
.collect()
}
fn topic_token_overlap(left: &HashSet<String>, right: &HashSet<String>) -> f64 {
if left.is_empty() || right.is_empty() {
return 1.0;
}
let intersection = left.intersection(right).count() as f64;
let union = left.union(right).count() as f64;
if union == 0.0 {
1.0
} else {
intersection / union
}
}
fn infer_topic_hint(raw_records: &[RawJournalRecord]) -> Option<String> {
let mut counts: HashMap<String, u32> = HashMap::new();
for record in raw_records {
if matches!(record.visibility, RawVisibility::Normal)
&& !matches!(record.secrecy_level, SecrecyLevel::Secret)
{
if let Some(text) = record.text_content() {
for token in Self::tokenize_topic_text(text) {
*counts.entry(token).or_insert(0) += 1;
}
}
}
}
let mut ranked: Vec<(String, u32)> = counts.into_iter().collect();
ranked.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
let top_terms: Vec<String> = ranked.into_iter().take(3).map(|(token, _)| token).collect();
if top_terms.is_empty() {
None
} else {
Some(top_terms.join("+"))
}
}
fn infer_dream_groups(records: Vec<RawJournalRecord>) -> Vec<DreamGroup> {
let mut explicit_groups: BTreeMap<
(chrono::NaiveDate, String, String),
Vec<RawJournalRecord>,
> = BTreeMap::new();
let mut inferred_sources: BTreeMap<(chrono::NaiveDate, String), Vec<RawJournalRecord>> =
BTreeMap::new();
for record in records {
let date = record.created_at.date_naive();
if let Some(topic_id) = record
.topic_id
.as_ref()
.map(|topic| topic.trim())
.filter(|topic| !topic.is_empty())
{
explicit_groups
.entry((date, record.session_id.clone(), topic_id.to_string()))
.or_default()
.push(record);
} else {
inferred_sources
.entry((date, record.session_id.clone()))
.or_default()
.push(record);
}
}
let mut groups = Vec::new();
for ((_, session_id, topic_id), mut records) in explicit_groups {
records.sort_by(|a, b| {
a.created_at
.cmp(&b.created_at)
.then_with(|| a.id.cmp(&b.id))
});
groups.push(DreamGroup {
session_id,
raw_topic_id: Some(topic_id.clone()),
topic_hint: Some(topic_id),
inferred_topic: false,
records,
});
}
for ((date, session_id), mut records) in inferred_sources {
records.sort_by(|a, b| {
a.created_at
.cmp(&b.created_at)
.then_with(|| a.id.cmp(&b.id))
});
let mut current: Vec<RawJournalRecord> = Vec::new();
let mut current_tokens: HashSet<String> = HashSet::new();
let mut segment_index = 0u32;
for record in records {
let record_tokens = record
.text_content()
.map(Self::tokenize_topic_text)
.unwrap_or_default();
let should_split = current.last().is_some_and(|previous: &RawJournalRecord| {
let gap = record.created_at.signed_duration_since(previous.created_at);
let overlap = Self::topic_token_overlap(¤t_tokens, &record_tokens);
gap > chrono::Duration::minutes(45)
|| (gap > chrono::Duration::minutes(10) && overlap < 0.08)
});
if should_split && !current.is_empty() {
let topic_hint = Self::infer_topic_hint(¤t);
groups.push(DreamGroup {
session_id: session_id.clone(),
raw_topic_id: None,
topic_hint,
inferred_topic: true,
records: std::mem::take(&mut current),
});
current_tokens.clear();
segment_index += 1;
}
current_tokens.extend(record_tokens);
current.push(record);
}
if !current.is_empty() {
let topic_hint = Self::infer_topic_hint(¤t)
.or_else(|| Some(format!("auto-{}-{segment_index}", date.format("%Y%m%d"))));
groups.push(DreamGroup {
session_id: session_id.clone(),
raw_topic_id: None,
topic_hint,
inferred_topic: true,
records: current,
});
}
}
groups.sort_by(|left, right| {
let left_time = left.records.first().map(|record| record.created_at);
let right_time = right.records.first().map(|record| record.created_at);
left_time
.cmp(&right_time)
.then_with(|| left.session_id.cmp(&right.session_id))
.then_with(|| left.topic_hint.cmp(&right.topic_hint))
});
groups
}
async fn summarize_dream_group(
&self,
session_id: &str,
topic_hint: Option<&str>,
texts: &[String],
summary_stats: &serde_json::Value,
) -> String {
if let Some(provider) = &self.llm_provider {
match provider.summarize(texts, 300).await {
Ok(summary) if !summary.trim().is_empty() => {
let redacted = summary_stats["redacted_total"].as_u64().unwrap_or(0);
if redacted > 0 {
return format!(
"{summary}\n\n[{} raw record(s) redacted from summary]",
redacted
);
}
return summary;
}
Ok(_) | Err(_) => {}
}
}
let heading = match topic_hint.filter(|topic| !topic.is_empty()) {
Some(topic) => format!("Dream summary for session {session_id} / topic {topic}:"),
None => format!("Dream summary for session {session_id}:"),
};
let snippets: Vec<String> = texts
.iter()
.take(8)
.map(|text| {
if text.len() > 160 {
format!("{}...", truncate_str(text, 160))
} else {
text.clone()
}
})
.collect();
let body = snippets.join(" | ");
let redacted_total = summary_stats["redacted_total"].as_u64().unwrap_or(0);
let suffix = if redacted_total > 0 {
format!(" [Redacted raw records: {redacted_total}]")
} else {
String::new()
};
let combined = format!("{heading} {body}{suffix}");
if combined.len() > 1200 {
format!("{}...", truncate_str(&combined, 1200))
} else {
combined
}
}
fn build_dream_summary_metadata(
session_id: &str,
raw_topic_id: Option<&str>,
topic_hint: Option<&str>,
inferred_topic: bool,
raw_records: &[RawJournalRecord],
dreamed_at: chrono::DateTime<Utc>,
summary_stats: serde_json::Value,
) -> serde_json::Value {
let raw_ids: Vec<String> = raw_records
.iter()
.map(|record| record.id.to_string())
.collect();
let start = raw_records
.first()
.map(|record| record.created_at.to_rfc3339());
let end = raw_records
.last()
.map(|record| record.created_at.to_rfc3339());
serde_json::json!({
"_origin": {
"raw_session_id": session_id,
"raw_topic_id": raw_topic_id,
"raw_topic_hint": topic_hint,
"raw_topic_inferred": inferred_topic,
"raw_record_ids": raw_ids,
"dream_tick_at": dreamed_at.to_rfc3339(),
"raw_record_count": raw_records.len(),
"range_start": start,
"range_end": end
},
"_dream": {
"kind": "episodic_summary",
"source": "raw_journal",
"summary_stats": summary_stats
}
})
}
#[allow(clippy::too_many_arguments)]
fn build_dream_semantic_metadata(
session_id: &str,
raw_topic_id: Option<&str>,
topic_hint: Option<&str>,
inferred_topic: bool,
raw_records: &[RawJournalRecord],
summary_record_id: Uuid,
dreamed_at: chrono::DateTime<Utc>,
summary_stats: serde_json::Value,
) -> serde_json::Value {
let raw_ids: Vec<String> = raw_records
.iter()
.map(|record| record.id.to_string())
.collect();
serde_json::json!({
"_origin": {
"raw_session_id": session_id,
"raw_topic_id": raw_topic_id,
"raw_topic_hint": topic_hint,
"raw_topic_inferred": inferred_topic,
"raw_record_ids": raw_ids,
"dream_tick_at": dreamed_at.to_rfc3339(),
"raw_record_count": raw_records.len(),
"raw_summary_record_id": summary_record_id.to_string()
},
"_dream": {
"kind": "semantic_summary",
"source": "raw_journal",
"summary_stats": summary_stats
}
})
}
pub async fn encode_store_raw(
&self,
req: EncodeStoreRawRequest,
) -> Result<EncodeStoreRawResponse, CerememoryError> {
let metadata = req.metadata.unwrap_or_else(|| serde_json::json!({}));
let record = RawJournalRecord {
id: Uuid::now_v7(),
session_id: req.session_id,
turn_id: req.turn_id,
topic_id: req.topic_id,
source: req.source,
speaker: req.speaker,
visibility: req.visibility,
secrecy_level: req.secrecy_level,
created_at: Utc::now(),
updated_at: Utc::now(),
content: req.content,
metadata,
derived_memory_ids: Vec::new(),
suppressed: false,
};
record.validate()?;
let record_id = self.raw_journal.append(record.clone()).await?;
Ok(EncodeStoreRawResponse {
record_id,
session_id: record.session_id,
visibility: record.visibility,
secrecy_level: record.secrecy_level,
})
}
pub async fn encode_batch_store_raw(
&self,
req: EncodeBatchStoreRawRequest,
) -> Result<EncodeBatchStoreRawResponse, CerememoryError> {
let mut results = Vec::with_capacity(req.records.len());
for record in req.records {
results.push(self.encode_store_raw(record).await?);
}
Ok(EncodeBatchStoreRawResponse { results })
}
pub async fn recall_raw_query(
&self,
req: RecallRawQueryRequest,
) -> Result<RecallRawQueryResponse, CerememoryError> {
let secrecy_levels = req.secrecy_levels.as_deref();
let query_lower = req.query.as_ref().map(|query| query.trim().to_lowercase());
let session_filter = req
.session_id
.as_ref()
.map(|session_id| session_id.trim())
.filter(|session_id| !session_id.is_empty())
.map(str::to_string);
let mut records = match (req.query.as_deref(), &session_filter, &req.temporal) {
(Some(query), Some(session_id), _) if !query.trim().is_empty() => {
self.raw_journal
.search_text(
query,
Some(session_id),
(req.limit as usize).saturating_mul(5),
)
.await?
}
(Some(query), None, _) if !query.trim().is_empty() => {
self.raw_journal
.search_text(query, None, (req.limit as usize).saturating_mul(5))
.await?
}
(None, Some(session_id), Some(temporal)) => {
self.raw_journal
.query_session_range(session_id, temporal.start, temporal.end)
.await?
}
(None, Some(session_id), None) => self.raw_journal.query_session(session_id).await?,
_ => self.raw_journal.get_all().await?,
};
records.retain(|record| {
if record.suppressed {
return false;
}
if !Self::raw_query_allowed_visibility(
record,
req.include_private_scratch,
req.include_sealed,
) {
return false;
}
if !Self::raw_query_allowed_secrecy(record, secrecy_levels) {
return false;
}
if session_filter.is_none() {
if let Some(ref temporal) = req.temporal {
if record.created_at < temporal.start || record.created_at > temporal.end {
return false;
}
}
}
if let Some(ref query_lower) = query_lower {
if !record.matches_text(query_lower) {
return false;
}
}
true
});
records.sort_by(|a, b| {
a.created_at
.cmp(&b.created_at)
.then_with(|| a.id.cmp(&b.id))
});
let total_candidates = records.len() as u32;
records.truncate(req.limit as usize);
Ok(RecallRawQueryResponse {
records,
total_candidates,
})
}
pub async fn encode_store(
&self,
req: EncodeStoreRequest,
) -> Result<EncodeStoreResponse, CerememoryError> {
let _timer = TimerGuard::new("cerememory_encode_duration_seconds");
let EncodeStoreRequest {
content,
store,
emotion,
context,
metadata,
associations,
..
} = req;
let store_type = store.unwrap_or_else(|| self.route_store(&content));
let mut record = MemoryRecord {
id: Uuid::now_v7(),
store: store_type,
created_at: Utc::now(),
updated_at: Utc::now(),
last_accessed_at: Utc::now(),
access_count: 0,
content,
fidelity: FidelityState::default(),
emotion: emotion.unwrap_or_default(),
associations: Vec::new(),
metadata: Self::build_record_metadata(context, metadata),
version: 1,
};
let mut assoc_count = 0u32;
if let Some(manual) = associations {
for ma in manual {
record.associations.push(Association {
target_id: ma.target_id,
association_type: ma.association_type,
weight: ma.weight,
created_at: Utc::now(),
last_co_activation: Utc::now(),
});
assoc_count += 1;
}
}
if let Some(ref provider) = self.llm_provider {
let caps = provider.capabilities();
let has_text_embedding = record
.content
.blocks
.iter()
.any(|b| b.modality == Modality::Text && b.embedding.is_some());
if !has_text_embedding && caps.text_embedding {
if let Some(text) = record.text_content().map(|s| s.to_string()) {
match provider.embed(&text).await {
Ok(embedding) if !embedding.is_empty() => {
if let Some(block) = record
.content
.blocks
.iter_mut()
.find(|b| b.modality == Modality::Text)
{
block.embedding = Some(embedding);
}
}
Ok(_) => {}
Err(e) => {
warn!(error = %e, "LLM text auto-embed failed, continuing without embedding");
}
}
}
}
let image_tasks: Vec<(usize, Vec<u8>, String)> = if caps.image_embedding {
record
.content
.blocks
.iter()
.enumerate()
.filter(|(_, b)| b.modality == Modality::Image && b.embedding.is_none())
.map(|(i, b)| (i, b.data.clone(), b.format.clone()))
.collect()
} else {
Vec::new()
};
let audio_tasks: Vec<(Vec<u8>, String)> = if caps.audio_transcription {
record
.content
.blocks
.iter()
.filter(|b| b.modality == Modality::Audio)
.map(|b| (b.data.clone(), b.format.clone()))
.collect()
} else {
Vec::new()
};
if !image_tasks.is_empty() {
let image_results: Vec<(usize, Result<Vec<f32>, _>)> =
futures::future::join_all(image_tasks.iter().map(|(idx, data, fmt)| {
let idx = *idx;
async move { (idx, provider.embed_image(data, fmt).await) }
}))
.await;
for (idx, result) in image_results {
match result {
Ok(embedding) if !embedding.is_empty() => {
record.content.blocks[idx].embedding = Some(embedding);
}
Ok(_) => {}
Err(e) => {
warn!(error = %e, "LLM image auto-embed failed, continuing without embedding");
}
}
}
}
if !audio_tasks.is_empty() {
let audio_results: Vec<Result<String, _>> =
futures::future::join_all(audio_tasks.iter().map(|(data, fmt)| async move {
provider.transcribe_audio(data, fmt).await
}))
.await;
let mut new_text_blocks = Vec::new();
for result in audio_results {
match result {
Ok(transcript) if !transcript.is_empty() => {
let mut text_block = ContentBlock {
modality: Modality::Text,
format: "text/plain".to_string(),
data: transcript.as_bytes().to_vec(),
embedding: None,
};
if caps.text_embedding {
match provider.embed(&transcript).await {
Ok(emb) if !emb.is_empty() => {
text_block.embedding = Some(emb);
}
Err(error) => {
warn!(error = %error, "LLM transcript auto-embed failed, continuing without embedding");
}
Ok(_) => {}
}
}
new_text_blocks.push(text_block);
}
Ok(_) => {}
Err(e) => {
warn!(error = %e, "LLM audio transcription failed, continuing without transcript");
}
}
}
record.content.blocks.extend(new_text_blocks);
}
}
record.validate()?;
let id = record.id;
let fidelity = record.fidelity.score;
if store_type == StoreType::Working {
let (_, evicted) = self.working.store_with_eviction(record.clone()).await?;
if let Some(evicted_id) = evicted {
self.cleanup_deleted_records(&[(evicted_id, StoreType::Working)])
.await?;
}
} else {
dispatch_store!(self, store_type, store(record.clone()))?;
}
self.coordinator
.register(id, store_type, record.associations.clone())
.await;
if let Err(e) = self.index_record(&record) {
warn!(error = %e, record_id = %id, "Failed to index record, will be indexed on rebuild");
}
info!(record_id = %id, store = %store_type, "Encoded memory record");
metrics::counter!("cerememory_encode_total", "store" => store_type.to_string())
.increment(1);
Ok(EncodeStoreResponse {
record_id: id,
store: store_type,
initial_fidelity: fidelity,
associations_created: assoc_count,
})
}
pub async fn encode_batch(
&self,
req: EncodeBatchRequest,
) -> Result<EncodeBatchResponse, CerememoryError> {
const MAX_BATCH_SIZE: usize = 1000;
if req.records.len() > MAX_BATCH_SIZE {
return Err(CerememoryError::Validation(format!(
"Batch size {} exceeds maximum of {MAX_BATCH_SIZE}",
req.records.len()
)));
}
let mut results = Vec::with_capacity(req.records.len());
let mut total_inferred = 0u32;
let mut prev_id: Option<Uuid> = None;
for store_req in req.records {
let resp = self.encode_store(store_req).await?;
if req.infer_associations {
if let Some(prev) = prev_id {
let assoc_fwd = Association {
target_id: resp.record_id,
association_type: AssociationType::Sequential,
weight: DEFAULT_BATCH_SEQUENTIAL_WEIGHT,
created_at: Utc::now(),
last_co_activation: Utc::now(),
};
let assoc_bwd = Association {
target_id: prev,
association_type: AssociationType::Sequential,
weight: DEFAULT_BATCH_SEQUENTIAL_WEIGHT,
created_at: Utc::now(),
last_co_activation: Utc::now(),
};
self.add_persisted_association(&prev, assoc_fwd).await?;
self.add_persisted_association(&resp.record_id, assoc_bwd)
.await?;
total_inferred += 2;
}
}
prev_id = Some(resp.record_id);
results.push(resp);
}
Ok(EncodeBatchResponse {
results,
associations_inferred: total_inferred,
})
}
pub async fn encode_update(&self, req: EncodeUpdateRequest) -> Result<(), CerememoryError> {
let (mut record, store_type) = self
.get_store_record(&req.record_id)
.await?
.ok_or_else(|| CerememoryError::RecordNotFound(req.record_id.to_string()))?;
record.apply_updates(
req.content.clone(),
req.emotion.clone(),
req.metadata.clone(),
);
record.validate()?;
dispatch_store!(
self,
store_type,
update_record(
&req.record_id,
req.content.clone(),
req.emotion,
req.metadata
)
)?;
if req.content.is_some() {
let text = Self::build_searchable_text(&record).unwrap_or_default();
if Self::has_indexable_content(&text, &record) {
if let Err(e) = self.text_index.update(
req.record_id,
store_type,
&text,
record.content.summary.as_deref(),
) {
warn!(error = %e, record_id = %req.record_id, "Failed to update text index");
}
} else if let Err(e) = self.text_index.remove(req.record_id) {
warn!(error = %e, record_id = %req.record_id, "Failed to clear text index");
}
let _ = self.vector_index.remove(req.record_id);
if let Some(embedding) = Self::primary_embedding(&record) {
if let Err(e) = self.vector_index.upsert(req.record_id, embedding) {
warn!(error = %e, record_id = %req.record_id, "Failed to update vector index");
}
}
}
Ok(())
}
pub async fn recall_query(
&self,
req: RecallQueryRequest,
) -> Result<RecallQueryResponse, CerememoryError> {
let _timer = TimerGuard::new("cerememory_recall_duration_seconds");
let mode = *self.recall_mode.read().await;
let recall_mode = req.recall_mode;
let effective_mode = if mode == RecallMode::Perfect {
RecallMode::Perfect
} else {
recall_mode
};
let (cue_text, cue_embedding) = self.resolve_recall_cues(&req.cue).await?;
let stores = req.stores.clone().unwrap_or_else(|| {
vec![
StoreType::Episodic,
StoreType::Semantic,
StoreType::Procedural,
StoreType::Emotional,
StoreType::Working,
]
});
let mut candidates: Vec<(MemoryRecord, f64)> = Vec::new();
let mut seen_ids: HashSet<Uuid> = HashSet::new();
let mut text_scores: HashMap<Uuid, f64> = HashMap::new();
let mut vec_scores: HashMap<Uuid, f64> = HashMap::new();
if let Some(ref text) = cue_text {
let search_limit = req.limit as usize * 3;
match self.text_index.search(text, Some(&stores), search_limit) {
Ok(hits) => {
for hit in hits {
text_scores.insert(hit.record_id, hit.score as f64);
}
}
Err(e) => {
warn!(error = %e, "Text index search failed, falling back to store query");
for store_type in &stores {
let results = dispatch_store!(
self,
*store_type,
query_text(text, req.limit as usize * 2)
)?;
for record in results {
text_scores.insert(record.id, record.fidelity.score);
}
}
}
}
}
if let Some(ref embedding) = cue_embedding {
if let Ok(hits) = self.vector_index.search(embedding, req.limit as usize * 3) {
for hit in hits {
if hit.similarity > 0.0 {
vec_scores.insert(hit.record_id, hit.similarity as f64);
}
}
}
}
let all_ids: HashSet<Uuid> = text_scores
.keys()
.chain(vec_scores.keys())
.copied()
.collect();
let mut scanned_ids = all_ids.clone();
let mut total_records_scanned = scanned_ids.len() as u32;
let mut fidelity_filtered: u32 = 0;
for id in all_ids {
if !seen_ids.insert(id) {
continue;
}
if let Some((record, _)) = self.get_store_record(&id).await? {
if !stores.contains(&record.store) {
continue;
}
if let Some(min_f) = req.min_fidelity {
if record.fidelity.score < min_f && !req.include_decayed {
fidelity_filtered += 1;
continue;
}
}
let ts = text_scores.get(&id);
let vs = vec_scores.get(&id);
let score = match (ts, vs) {
(Some(&t), Some(&v)) => t * 0.6 + v * 0.4,
(Some(&t), None) => t,
(None, Some(&v)) => v,
(None, None) => 0.0,
};
candidates.push((record, score));
}
}
if cue_text.is_none() && cue_embedding.is_none() {
}
if let Some(ref temporal) = req.cue.temporal {
for store_type in &stores {
let records = dispatch_store!(self, *store_type, get_all())?;
for record in records {
if record.created_at < temporal.start || record.created_at > temporal.end {
continue;
}
if scanned_ids.insert(record.id) {
total_records_scanned += 1;
}
if !seen_ids.insert(record.id) {
continue;
}
if let Some(min_f) = req.min_fidelity {
if record.fidelity.score < min_f && !req.include_decayed {
fidelity_filtered += 1;
continue;
}
}
let score = record.fidelity.score;
candidates.push((record, score));
}
}
}
let mut activated_ids: HashMap<Uuid, f64> = HashMap::new();
if req.activation_depth > 0 && !candidates.is_empty() {
let top_id = candidates
.iter()
.max_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal))
.map(|(r, _)| r.id);
if let Some(source_id) = top_id {
if let Ok(activated) = self
.activation
.activate(&source_id, req.activation_depth, 0.05)
.await
{
for act in &activated {
activated_ids.insert(act.record_id, act.activation_level);
if seen_ids.insert(act.record_id) {
if let Some((record, _store)) =
self.get_store_record(&act.record_id).await?
{
if !stores.contains(&record.store) {
continue;
}
if let Some(ref temporal) = req.cue.temporal {
if record.created_at < temporal.start
|| record.created_at > temporal.end
{
continue;
}
}
if let Some(min_f) = req.min_fidelity {
if record.fidelity.score < min_f && !req.include_decayed {
fidelity_filtered += 1;
continue;
}
}
candidates.push((record, act.activation_level * 0.5));
}
}
}
}
}
}
for (record, relevance) in &mut candidates {
if let Some(activation) = activated_ids.get(&record.id) {
*relevance += activation * 0.3;
}
}
if let Some(ref temporal) = req.cue.temporal {
candidates.retain(|(record, _)| {
record.created_at >= temporal.start && record.created_at <= temporal.end
});
}
candidates.retain(|(record, _)| stores.contains(&record.store));
if let Some(min_fidelity) = req.min_fidelity {
let before = candidates.len();
candidates
.retain(|(record, _)| req.include_decayed || record.fidelity.score >= min_fidelity);
fidelity_filtered += (before - candidates.len()) as u32;
}
candidates.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
let total_candidates = candidates.len() as u32;
candidates.truncate(req.limit as usize);
let mut memories = Vec::with_capacity(candidates.len());
for (mut record, relevance) in candidates {
if req.reconsolidate {
record.access_count += 1;
record.last_accessed_at = Utc::now();
let new_stability = self.decay.boost_stability(record.fidelity.stability);
record.fidelity.stability = new_stability;
record.fidelity.reinforcement_count += 1;
if let Some(store_type) = self.coordinator.get_record_store_type(&record.id).await?
{
if let Err(e) = dispatch_store!(
self,
store_type,
update_fidelity(&record.id, record.fidelity.clone())
) {
warn!(record_id = %record.id, error = %e, "Failed to update fidelity during reconsolidation");
}
if let Err(e) = dispatch_store!(
self,
store_type,
update_access(&record.id, record.access_count, record.last_accessed_at)
) {
warn!(record_id = %record.id, error = %e, "Failed to update access during reconsolidation");
}
}
}
let rendered_content = match effective_mode {
RecallMode::Perfect => record.content.clone(),
RecallMode::Human => apply_human_noise(&record.content, record.fidelity.score),
};
memories.push(RecalledMemory {
activation_path: activated_ids.get(&record.id).map(|_| vec![record.id]),
relevance_score: relevance,
rendered_content,
record,
});
}
if !memories.is_empty() {
let hit_count = memories.iter().filter(|m| m.relevance_score > 0.0).count();
let hit_rate = hit_count as f64 / memories.len() as f64;
let store = memories[0].record.store;
self.evolution.observe_recall(store, hit_rate);
}
metrics::counter!("cerememory_recall_total").increment(1);
Ok(RecallQueryResponse {
memories,
activation_trace: None,
total_candidates,
query_metadata: Some(QueryMetadata {
total_records_scanned,
stores_searched: stores,
fidelity_filtered,
}),
})
}
pub async fn recall_associate(
&self,
req: RecallAssociateRequest,
) -> Result<RecallAssociateResponse, CerememoryError> {
self.get_store_record(&req.record_id)
.await?
.ok_or_else(|| CerememoryError::RecordNotFound(req.record_id.to_string()))?;
let activated = self
.activation
.activate(&req.record_id, req.depth, req.min_weight)
.await?;
let source_assocs = if req.association_types.is_some() {
Some(self.coordinator.get_associations(&req.record_id).await?)
} else {
None
};
let mut memories = Vec::new();
for act in activated.iter().take(req.limit as usize) {
if let Some(types) = &req.association_types {
let assocs = source_assocs.as_ref().expect("pre-fetched above");
let matches = assocs
.iter()
.any(|a| a.target_id == act.record_id && types.contains(&a.association_type));
if !matches && act.path.len() <= 2 {
continue;
}
}
if let Some((record, _)) = self.get_store_record(&act.record_id).await? {
memories.push(RecalledMemory {
rendered_content: record.content.clone(),
relevance_score: act.activation_level,
activation_path: Some(act.path.clone()),
record,
});
}
}
Ok(RecallAssociateResponse {
total_candidates: memories.len() as u32,
memories,
})
}
pub async fn recall_timeline(
&self,
req: RecallTimelineRequest,
) -> Result<RecallTimelineResponse, CerememoryError> {
let records = self
.episodic
.query_temporal_range(req.range.start, req.range.end)
.await?;
let mut bucket_map: std::collections::BTreeMap<i64, Vec<MemoryRecord>> =
std::collections::BTreeMap::new();
for record in records {
if let Some(min_f) = req.min_fidelity {
if record.fidelity.score < min_f {
continue;
}
}
if let Some(ref filter) = req.emotion_filter {
if !Self::emotion_matches(&record.emotion, filter) {
continue;
}
}
let bucket_key = Self::bucket_key(record.created_at, req.granularity);
bucket_map.entry(bucket_key).or_default().push(record);
}
for store_type in [
StoreType::Semantic,
StoreType::Procedural,
StoreType::Emotional,
] {
let records = dispatch_store!(self, store_type, get_all())?;
for record in records {
if record.created_at >= req.range.start && record.created_at <= req.range.end {
if let Some(min_f) = req.min_fidelity {
if record.fidelity.score < min_f {
continue;
}
}
if let Some(ref filter) = req.emotion_filter {
if !Self::emotion_matches(&record.emotion, filter) {
continue;
}
}
let bucket_key = Self::bucket_key(record.created_at, req.granularity);
bucket_map.entry(bucket_key).or_default().push(record);
}
}
}
let buckets: Vec<TimelineBucket> = bucket_map
.into_iter()
.map(|(key, records)| {
let (start, end) = Self::bucket_range(key, req.granularity);
let count = records.len() as u32;
let memories = records
.into_iter()
.map(|record| RecalledMemory {
relevance_score: record.fidelity.score,
rendered_content: record.content.clone(),
activation_path: None,
record,
})
.collect();
TimelineBucket {
start,
end,
memories,
count,
}
})
.collect();
Ok(RecallTimelineResponse { buckets })
}
fn emotion_matches(record_emotion: &EmotionVector, filter: &EmotionVector) -> bool {
let r = [
record_emotion.joy,
record_emotion.trust,
record_emotion.fear,
record_emotion.surprise,
record_emotion.sadness,
record_emotion.disgust,
record_emotion.anger,
record_emotion.anticipation,
];
let f = [
filter.joy,
filter.trust,
filter.fear,
filter.surprise,
filter.sadness,
filter.disgust,
filter.anger,
filter.anticipation,
];
let dot: f64 = r.iter().zip(f.iter()).map(|(a, b)| a * b).sum();
let norm_r: f64 = r.iter().map(|x| x * x).sum::<f64>().sqrt();
let norm_f: f64 = f.iter().map(|x| x * x).sum::<f64>().sqrt();
if norm_r < f64::EPSILON || norm_f < f64::EPSILON {
return false; }
let similarity = dot / (norm_r * norm_f);
similarity > 0.5
}
fn bucket_key(ts: chrono::DateTime<Utc>, granularity: TimeGranularity) -> i64 {
use chrono::Datelike;
let secs = ts.timestamp();
match granularity {
TimeGranularity::Minute => secs / 60,
TimeGranularity::Hour => secs / 3600,
TimeGranularity::Day => secs / 86400,
TimeGranularity::Week => secs / 604800,
TimeGranularity::Month => ts.year() as i64 * 12 + (ts.month() as i64 - 1),
}
}
fn bucket_range(
key: i64,
granularity: TimeGranularity,
) -> (chrono::DateTime<Utc>, chrono::DateTime<Utc>) {
use chrono::TimeZone;
let epoch_secs = |g: TimeGranularity| -> Option<i64> {
match g {
TimeGranularity::Minute => Some(60),
TimeGranularity::Hour => Some(3600),
TimeGranularity::Day => Some(86400),
TimeGranularity::Week => Some(604800),
TimeGranularity::Month => None,
}
};
if let Some(secs) = epoch_secs(granularity) {
let start = Utc
.timestamp_opt(key * secs, 0)
.single()
.unwrap_or_default();
let end = Utc
.timestamp_opt((key + 1) * secs, 0)
.single()
.unwrap_or_default();
return (start, end);
}
let year = key / 12;
let month = (key % 12) + 1;
let start = Utc
.with_ymd_and_hms(year as i32, month as u32, 1, 0, 0, 0)
.single()
.unwrap_or_default();
let next_month = if month == 12 { 1 } else { month + 1 };
let next_year = if month == 12 { year + 1 } else { year };
let end = Utc
.with_ymd_and_hms(next_year as i32, next_month as u32, 1, 0, 0, 0)
.single()
.unwrap_or_default();
(start, end)
}
pub async fn recall_graph(
&self,
req: RecallGraphRequest,
) -> Result<RecallGraphResponse, CerememoryError> {
let mut nodes: Vec<GraphNode> = Vec::new();
let mut edges: Vec<GraphEdge> = Vec::new();
let mut visited: HashSet<Uuid> = HashSet::new();
let mut queue: std::collections::VecDeque<(Uuid, u32)> = std::collections::VecDeque::new();
let limit = req.limit_nodes as usize;
if let Some(center) = req.center_id {
queue.push_back((center, 0));
} else {
let reg = self.coordinator.records_by_store().await;
let all_ids: Vec<Uuid> = {
let mut ids = Vec::new();
for st in [
StoreType::Episodic,
StoreType::Semantic,
StoreType::Procedural,
StoreType::Emotional,
] {
if reg.contains_key(&st) {
let store_ids = dispatch_store!(self, st, list_ids())?;
ids.extend(store_ids.into_iter().take(limit));
}
if ids.len() >= limit {
break;
}
}
ids
};
for id in all_ids {
queue.push_back((id, 0));
}
}
while let Some((id, depth)) = queue.pop_front() {
if !visited.insert(id) || nodes.len() >= limit {
continue;
}
if let Some((record, _store)) = self.get_store_record(&id).await? {
nodes.push(GraphNode {
id: record.id,
store: record.store,
summary: record.content.summary.clone().or_else(|| {
record.text_content().map(|t| {
if t.len() > 80 {
format!("{}...", truncate_str(t, 80))
} else {
t.to_string()
}
})
}),
fidelity: record.fidelity.score,
});
if depth < req.depth {
let assocs = self.coordinator.get_associations(&id).await?;
for assoc in assocs {
if let Some(ref types) = req.edge_types {
let type_str = serde_json::to_value(assoc.association_type)
.ok()
.and_then(|v| v.as_str().map(|s| s.to_string()))
.unwrap_or_default();
if !types.iter().any(|t| t.to_lowercase() == type_str) {
continue;
}
}
if edges.len() < limit * 10 {
edges.push(GraphEdge {
source: id,
target: assoc.target_id,
edge_type: assoc.association_type,
weight: assoc.weight,
});
}
if !visited.contains(&assoc.target_id) {
queue.push_back((assoc.target_id, depth + 1));
}
}
}
}
}
let total_nodes = nodes.len() as u32;
Ok(RecallGraphResponse {
nodes,
edges,
total_nodes,
})
}
pub async fn lifecycle_dream_tick(
&self,
req: DreamTickRequest,
) -> Result<DreamTickResponse, CerememoryError> {
let secrecy_levels = req.secrecy_levels.as_deref();
let session_filter = req
.session_id
.as_ref()
.map(|session_id| session_id.trim())
.filter(|session_id| !session_id.is_empty())
.map(str::to_string);
let raw_records = self.raw_journal.get_all().await?;
let mut candidate_records = Vec::new();
for record in raw_records {
if record.suppressed || Self::raw_record_processed(&record) {
continue;
}
if !Self::raw_query_allowed_visibility(
&record,
req.include_private_scratch,
req.include_sealed,
) {
continue;
}
if !Self::raw_query_allowed_secrecy(&record, secrecy_levels) {
continue;
}
if let Some(ref session_id) = session_filter {
if &record.session_id != session_id {
continue;
}
}
candidate_records.push(record);
}
let mut groups_processed = 0u32;
let mut raw_records_processed = 0u32;
let mut episodic_summaries_created = 0u32;
let mut semantic_nodes_created = 0u32;
let max_groups = req.max_groups as usize;
for group in Self::infer_dream_groups(candidate_records)
.into_iter()
.take(max_groups)
{
if group.records.is_empty() {
continue;
}
groups_processed += 1;
raw_records_processed += group.records.len() as u32;
if req.dry_run {
episodic_summaries_created += 1;
continue;
}
let session_id = group.session_id.clone();
let (texts, summary_stats) = Self::prepare_dream_summary_inputs(&group.records);
let summary_text = if texts.is_empty() {
let redacted_total = summary_stats["redacted_total"].as_u64().unwrap_or(0);
format!(
"Dream summary for session {}: {} raw record(s) preserved. {} redacted from summary.",
session_id,
group.records.len(),
redacted_total
)
} else {
self.summarize_dream_group(
&session_id,
group.topic_hint.as_deref(),
&texts,
&summary_stats,
)
.await
};
let dreamed_at = Utc::now();
let mut summary_record =
MemoryRecord::new_text(StoreType::Episodic, summary_text.clone());
summary_record.content.summary = Some(if summary_text.len() > 160 {
format!("{}...", truncate_str(&summary_text, 160))
} else {
summary_text.clone()
});
summary_record.metadata = Self::build_dream_summary_metadata(
&session_id,
group.raw_topic_id.as_deref(),
group.topic_hint.as_deref(),
group.inferred_topic,
&group.records,
dreamed_at,
summary_stats.clone(),
);
let promote_semantic = req.promote_semantic
&& Self::should_promote_dream_group(
group.raw_topic_id.as_deref(),
group.topic_hint.as_deref(),
&summary_stats,
);
let semantic_record = if promote_semantic {
let mut semantic_record =
MemoryRecord::new_text(StoreType::Semantic, summary_text.clone());
semantic_record.content.summary = summary_record.content.summary.clone();
semantic_record.metadata = Self::build_dream_semantic_metadata(
&session_id,
group.raw_topic_id.as_deref(),
group.topic_hint.as_deref(),
group.inferred_topic,
&group.records,
summary_record.id,
dreamed_at,
summary_stats.clone(),
);
Some(semantic_record)
} else {
None
};
if let Some(semantic_record) = &semantic_record {
let assoc = Association {
target_id: semantic_record.id,
association_type: AssociationType::Semantic,
weight: 1.0,
created_at: dreamed_at,
last_co_activation: dreamed_at,
};
summary_record.associations.push(assoc);
Self::attach_summary_semantic_link(&mut summary_record, semantic_record.id);
}
let summary_id = summary_record.id;
dispatch_store!(self, StoreType::Episodic, store(summary_record.clone()))?;
self.coordinator
.register(
summary_record.id,
StoreType::Episodic,
summary_record.associations.clone(),
)
.await;
if let Err(e) = self.index_record(&summary_record) {
warn!(error = %e, record_id = %summary_record.id, "Failed to index dream summary");
}
let semantic_id = if let Some(mut semantic_record) = semantic_record {
semantic_record.associations.push(Association {
target_id: summary_id,
association_type: AssociationType::Semantic,
weight: 1.0,
created_at: dreamed_at,
last_co_activation: dreamed_at,
});
let semantic_id = semantic_record.id;
if let Err(err) =
dispatch_store!(self, StoreType::Semantic, store(semantic_record.clone()))
{
if dispatch_store!(self, StoreType::Episodic, delete(&summary_id))? {
self.cleanup_deleted_records(&[(summary_id, StoreType::Episodic)])
.await?;
}
return Err(err);
}
self.coordinator
.register(
semantic_record.id,
StoreType::Semantic,
semantic_record.associations.clone(),
)
.await;
if let Err(e) = self.index_record(&semantic_record) {
warn!(error = %e, record_id = %semantic_record.id, "Failed to index dream semantic summary");
}
semantic_nodes_created += 1;
Some(semantic_id)
} else {
None
};
let original_group = group.records.clone();
let mut update_failed: Option<CerememoryError> = None;
let mut updated_group = group.records.clone();
for raw_record in &mut updated_group {
Self::mark_raw_record_dream_processed(
raw_record,
summary_id,
semantic_id,
dreamed_at,
);
if let Err(err) = self.raw_journal.update(raw_record.clone()).await {
update_failed = Some(err);
break;
}
}
if let Some(err) = update_failed {
for original in &original_group {
let _ = self.raw_journal.update(original.clone()).await;
}
let mut cleanup_targets = Vec::new();
if dispatch_store!(self, StoreType::Episodic, delete(&summary_id))? {
cleanup_targets.push((summary_id, StoreType::Episodic));
}
if let Some(semantic_id) = semantic_id {
if dispatch_store!(self, StoreType::Semantic, delete(&semantic_id))? {
cleanup_targets.push((semantic_id, StoreType::Semantic));
}
}
if !cleanup_targets.is_empty() {
self.cleanup_deleted_records(&cleanup_targets).await?;
}
return Err(CerememoryError::Internal(format!(
"Dream tick failed while updating raw journal state: {err}"
)));
}
episodic_summaries_created += 1;
}
Ok(DreamTickResponse {
groups_processed,
raw_records_processed,
episodic_summaries_created,
semantic_nodes_created,
})
}
pub async fn lifecycle_consolidate(
&self,
req: ConsolidateRequest,
) -> Result<ConsolidateResponse, CerememoryError> {
let ids = self.episodic.list_ids().await?;
let mut processed = 0u32;
let mut migrated = 0u32;
let mut semantic_created = 0u32;
let mut compressed = 0u32;
let mut pruned = 0u32;
let mut duplicate_groups: Vec<(Uuid, Uuid)> = Vec::new();
if !req.dry_run {
let mut checked: HashSet<Uuid> = HashSet::new();
for &id in &ids {
if checked.contains(&id) {
continue;
}
if let Some(record) = self.episodic.get(&id).await? {
if let Some(emb) = Self::primary_embedding(&record) {
if let Ok(hits) = self.vector_index.search(emb, 5) {
for hit in hits {
if hit.record_id != id
&& hit.similarity > 0.92
&& !checked.contains(&hit.record_id)
{
let Some((_, hit_store)) =
self.get_store_record(&hit.record_id).await?
else {
continue;
};
if hit_store != StoreType::Episodic {
continue;
}
duplicate_groups.push((id, hit.record_id));
checked.insert(hit.record_id);
}
}
}
}
}
checked.insert(id);
}
for (keep_id, remove_id) in &duplicate_groups {
if let (Some(keep_rec), Some(remove_rec)) = (
self.episodic.get(keep_id).await?,
self.get_store_record(remove_id).await?,
) {
let (remove_record, remove_store) = remove_rec;
let (actual_keep, actual_remove, actual_remove_store) =
if keep_rec.fidelity.score >= remove_record.fidelity.score {
(*keep_id, *remove_id, remove_store)
} else {
(*remove_id, *keep_id, StoreType::Episodic)
};
let removed_assocs = self.coordinator.get_associations(&actual_remove).await?;
for assoc in removed_assocs {
self.add_persisted_association(&actual_keep, assoc).await?;
}
if dispatch_store!(self, actual_remove_store, delete(&actual_remove))? {
self.cleanup_deleted_records(&[(actual_remove, actual_remove_store)])
.await?;
compressed += 1;
}
}
}
}
let remaining_records = self.episodic.get_all().await?;
for record in remaining_records {
processed += 1;
let age_hours = (Utc::now() - record.created_at).num_hours() as u32;
if age_hours < req.min_age_hours {
continue;
}
if record.access_count < req.min_access_count {
continue;
}
if req.dry_run {
migrated += 1;
continue;
}
let mut semantic_record = record.clone();
semantic_record.store = StoreType::Semantic;
semantic_record.id = Uuid::now_v7();
if semantic_record.content.summary.is_none() {
if let Some(ref provider) = self.llm_provider {
if let Some(text) = record.text_content() {
match provider.summarize(&[text.to_string()], 200).await {
Ok(summary) if !summary.is_empty() => {
semantic_record.content.summary = Some(summary);
}
_ => {
semantic_record.content.summary = Some(
record
.text_content()
.map(|t| {
if t.len() > 100 {
format!("{}...", truncate_str(t, 100))
} else {
t.to_string()
}
})
.unwrap_or_default(),
);
}
}
}
} else {
semantic_record.content.summary = semantic_record.text_content().map(|t| {
if t.len() > 100 {
format!("{}...", truncate_str(t, 100))
} else {
t.to_string()
}
});
}
}
if let Some(ref provider) = self.llm_provider {
if let Some(text) = record.text_content() {
if let Ok(relations) = provider.extract_relations(text).await {
for rel in relations {
if let serde_json::Value::Object(ref mut map) = semantic_record.metadata
{
let relations_arr = map
.entry("extracted_relations".to_string())
.or_insert_with(|| serde_json::json!([]));
if let serde_json::Value::Array(ref mut arr) = relations_arr {
arr.push(serde_json::json!({
"subject": rel.subject,
"predicate": rel.predicate,
"object": rel.object,
"confidence": rel.confidence,
}));
}
}
}
}
}
}
dispatch_store!(self, StoreType::Semantic, store(semantic_record.clone()))?;
semantic_created += 1;
self.coordinator
.register(
semantic_record.id,
StoreType::Semantic,
semantic_record.associations.clone(),
)
.await;
if let Err(e) = self.index_record(&semantic_record) {
warn!(error = %e, "Failed to index consolidated record");
}
let assoc = Association {
target_id: semantic_record.id,
association_type: AssociationType::Semantic,
weight: 1.0,
created_at: Utc::now(),
last_co_activation: Utc::now(),
};
self.add_persisted_association(&record.id, assoc).await?;
migrated += 1;
if record.fidelity.score < 0.1 && self.episodic.delete(&record.id).await? {
self.cleanup_deleted_records(&[(record.id, StoreType::Episodic)])
.await?;
pruned += 1;
}
}
info!(
processed,
migrated, semantic_created, compressed, pruned, "Smart consolidation completed"
);
Ok(ConsolidateResponse {
records_processed: processed,
records_migrated: migrated,
records_compressed: compressed,
records_pruned: pruned,
semantic_nodes_created: semantic_created,
})
}
pub async fn lifecycle_decay_tick(
&self,
req: DecayTickRequest,
) -> Result<DecayTickResponse, CerememoryError> {
let tick_secs = req.tick_duration_seconds.unwrap_or(3600) as f64;
let mut all_inputs = Vec::new();
let mut record_stores: HashMap<Uuid, StoreType> = HashMap::new();
for store_type in [
StoreType::Episodic,
StoreType::Semantic,
StoreType::Procedural,
StoreType::Emotional,
] {
let records = dispatch_store!(self, store_type, get_all())?;
for record in records {
all_inputs.push(DecayInput {
id: record.id,
fidelity: record.fidelity.clone(),
emotion: record.emotion.clone(),
last_accessed_at: record.last_accessed_at,
access_count: record.access_count,
});
record_stores.insert(record.id, store_type);
}
}
let decay = self.decay.clone();
let result =
tokio::task::spawn_blocking(move || decay.compute_tick(&all_inputs, tick_secs))
.await
.map_err(|e| CerememoryError::Internal(format!("Decay task failed: {e}")))?;
for output in &result.updates {
if let Some(&store_type) = record_stores.get(&output.id) {
if output.should_prune {
if dispatch_store!(self, store_type, delete(&output.id))? {
self.cleanup_deleted_records(&[(output.id, store_type)])
.await?;
}
} else {
dispatch_store!(
self,
store_type,
update_fidelity(&output.id, output.new_fidelity.clone())
)?;
}
}
}
let mut fidelity_by_store: HashMap<StoreType, Vec<f64>> = HashMap::new();
for output in &result.updates {
if let Some(&store_type) = record_stores.get(&output.id) {
fidelity_by_store
.entry(store_type)
.or_default()
.push(output.new_fidelity.score);
}
}
for (store_type, scores) in &fidelity_by_store {
self.evolution.observe_decay_tick(*store_type, scores);
}
info!(
updated = result.records_updated,
below_threshold = result.records_below_threshold,
pruned = result.records_pruned,
"Decay tick completed"
);
Ok(DecayTickResponse {
records_updated: result.records_updated,
records_below_threshold: result.records_below_threshold,
records_pruned: result.records_pruned,
})
}
pub async fn lifecycle_set_mode(&self, req: SetModeRequest) -> Result<(), CerememoryError> {
*self.recall_mode.write().await = req.mode;
info!(mode = ?req.mode, "Recall mode changed");
Ok(())
}
pub async fn lifecycle_forget(&self, req: ForgetRequest) -> Result<u32, CerememoryError> {
if !req.confirm {
return Err(CerememoryError::ForgetUnconfirmed);
}
let ForgetRequest {
record_ids,
store,
temporal_range,
cascade,
..
} = req;
let mut deleted = 0u32;
let mut delete_targets: HashMap<Uuid, StoreType> = HashMap::new();
if let Some(ids) = record_ids {
for id in ids {
if let Some((record, store_type)) = self.get_store_record(&id).await? {
delete_targets.insert(id, store_type);
if cascade {
for assoc in &record.associations {
if let Some((_, st)) = self.get_store_record(&assoc.target_id).await? {
delete_targets.insert(assoc.target_id, st);
}
}
}
}
}
}
if let Some(store_type) = store {
let ids = dispatch_store!(self, store_type, list_ids())?;
for id in ids {
delete_targets.insert(id, store_type);
}
}
if let Some(range) = temporal_range {
for store_type in ALL_STORES {
let records = dispatch_store!(self, store_type, get_all())?;
for record in records {
if record.created_at >= range.start && record.created_at <= range.end {
delete_targets.insert(record.id, store_type);
}
}
}
}
let mut deleted_records = Vec::new();
for (id, store_type) in delete_targets {
if dispatch_store!(self, store_type, delete(&id))? {
deleted_records.push((id, store_type));
deleted += 1;
}
}
self.cleanup_deleted_records(&deleted_records).await?;
warn!(deleted, "Forget operation completed");
Ok(deleted)
}
pub async fn lifecycle_export(
&self,
req: ExportRequest,
) -> Result<(Vec<u8>, ExportResponse), CerememoryError> {
match Self::normalize_export_format(&req.format)? {
"cma" => {}
_ => unreachable!("normalize_export_format only returns supported formats"),
}
let records = self
.collect_records_for_stores(req.stores.as_deref())
.await?;
let raw_records = if req.include_raw_journal {
self.collect_all_raw_journal_records().await?
} else {
Vec::new()
};
let encryption_key = if req.encrypt {
let key_str = req.encryption_key.as_deref().ok_or_else(|| {
CerememoryError::Validation(
"encryption_key is required when encrypt=true".to_string(),
)
})?;
Some(cerememory_archive::crypto::derive_key(key_str))
} else {
None
};
if req.include_raw_journal {
cerememory_archive::export_bundle_filtered(
&records,
&raw_records,
req.stores.as_deref(),
encryption_key.as_ref(),
)
} else {
cerememory_archive::export_filtered(
&records,
req.stores.as_deref(),
encryption_key.as_ref(),
)
}
}
pub async fn lifecycle_import(&self, req: ImportRequest) -> Result<u32, CerememoryError> {
let data = req.archive_data.ok_or_else(|| {
CerememoryError::Validation("Import requires archive_data".to_string())
})?;
let decryption_key = req
.decryption_key
.as_deref()
.map(cerememory_archive::crypto::derive_key);
let bundle = cerememory_archive::import_bundle_with_key(&data, decryption_key.as_ref())?;
let imported = match req.strategy {
ImportStrategy::Merge => {
let imported_curated = self
.import_records_with_conflict_resolution(
bundle.records,
req.conflict_resolution,
)
.await?;
let imported_raw = self
.import_raw_records_with_conflict_resolution(
bundle.raw_records,
req.conflict_resolution,
)
.await?;
imported_curated + imported_raw
}
ImportStrategy::Replace => {
let snapshot = self.collect_records_for_stores(None).await?;
let raw_snapshot = self.collect_all_raw_journal_records().await?;
if let Err(err) = self.clear_all_records().await {
if let Err(restore_err) = self.restore_records(&snapshot).await {
return Err(CerememoryError::ImportConflict(format!(
"Replace import failed while clearing existing records: {err}. Rollback failed: {restore_err}"
)));
}
return Err(err);
}
if let Err(err) = self.clear_raw_journal().await {
let _ = self.restore_records(&snapshot).await;
if let Err(restore_err) = self.restore_raw_journal(&raw_snapshot).await {
return Err(CerememoryError::ImportConflict(format!(
"Replace import failed while clearing raw journal: {err}. Rollback failed: {restore_err}"
)));
}
return Err(CerememoryError::ImportConflict(format!(
"Replace import failed while clearing raw journal: {err}"
)));
}
match async {
let imported_curated = self
.import_records_with_conflict_resolution(
bundle.records,
req.conflict_resolution,
)
.await?;
let imported_raw = self
.import_raw_records_with_conflict_resolution(
bundle.raw_records,
req.conflict_resolution,
)
.await?;
Ok::<u32, CerememoryError>(imported_curated + imported_raw)
}
.await
{
Ok(imported) => imported,
Err(err) => {
if let Err(restore_err) = self.restore_records(&snapshot).await {
return Err(CerememoryError::ImportConflict(format!(
"Replace import failed: {err}. Rollback failed: {restore_err}"
)));
}
if let Err(restore_err) = self.restore_raw_journal(&raw_snapshot).await {
return Err(CerememoryError::ImportConflict(format!(
"Replace import failed: {err}. Raw rollback failed: {restore_err}"
)));
}
return Err(err);
}
}
}
};
info!(imported, strategy = ?req.strategy, "Import completed");
Ok(imported)
}
pub async fn import_records(&self, data: &[u8]) -> Result<u32, CerememoryError> {
let records = cerememory_archive::import_records(data)?;
let mut imported = 0u32;
for record in records {
let store_type = record.store;
if self.get_store_record(&record.id).await?.is_some() {
continue;
}
dispatch_store!(self, store_type, store(record.clone()))?;
self.coordinator
.register(record.id, store_type, record.associations.clone())
.await;
if let Err(e) = self.index_record(&record) {
warn!(error = %e, record_id = %record.id, "Failed to index imported record");
}
imported += 1;
}
info!(imported, "Import completed");
Ok(imported)
}
pub async fn collect_all_records(&self) -> Result<Vec<MemoryRecord>, CerememoryError> {
self.collect_records_for_stores(None).await
}
pub async fn collect_records_for_stores(
&self,
stores: Option<&[StoreType]>,
) -> Result<Vec<MemoryRecord>, CerememoryError> {
let target_stores = stores.unwrap_or(&ALL_STORES);
let mut records = Vec::new();
let mut seen_stores = HashSet::with_capacity(target_stores.len());
for &store_type in target_stores {
if !seen_stores.insert(store_type) {
continue;
}
let store_records = dispatch_store!(self, store_type, get_all())?;
records.extend(store_records);
}
Ok(records)
}
pub async fn introspect_stats(&self) -> Result<StatsResponse, CerememoryError> {
let mut records_by_store = HashMap::new();
let mut avg_fidelity_by_store = HashMap::new();
let mut total_records = 0u32;
let mut total_fidelity = 0.0f64;
let mut dream_episodic_summaries = 0u32;
let mut dream_semantic_nodes = 0u32;
let mut last_dream_tick_at: Option<chrono::DateTime<Utc>> = None;
for store_type in ALL_STORES {
let count = dispatch_store!(self, store_type, count())? as u32;
records_by_store.insert(store_type, count);
total_records += count;
if count > 0 {
let records = dispatch_store!(self, store_type, get_all())?;
let mut store_fidelity = 0.0f64;
for record in &records {
store_fidelity += record.fidelity.score;
total_fidelity += record.fidelity.score;
if let Some(kind) = record
.metadata
.get("_dream")
.and_then(|value| value.get("kind"))
.and_then(|value| value.as_str())
{
match kind {
"episodic_summary" if store_type == StoreType::Episodic => {
dream_episodic_summaries += 1;
}
"semantic_summary" if store_type == StoreType::Semantic => {
dream_semantic_nodes += 1;
}
_ => {}
}
}
if let Some(timestamp) = record
.metadata
.get("_origin")
.and_then(|value| value.get("dream_tick_at"))
.and_then(|value| value.as_str())
.and_then(|value| chrono::DateTime::parse_from_rfc3339(value).ok())
.map(|value| value.with_timezone(&Utc))
{
if last_dream_tick_at.is_none_or(|current| timestamp > current) {
last_dream_tick_at = Some(timestamp);
}
}
}
avg_fidelity_by_store.insert(store_type, store_fidelity / count as f64);
}
}
let raw_journal_all = self.raw_journal.get_all().await?;
let raw_journal_records = raw_journal_all.len() as u32;
let raw_journal_pending_dream = raw_journal_all
.iter()
.filter(|record| !Self::raw_record_processed(record))
.count() as u32;
let avg_fidelity = if total_records > 0 {
total_fidelity / total_records as f64
} else {
0.0
};
Ok(StatsResponse {
total_records,
records_by_store,
total_associations: self.coordinator.total_associations().await,
avg_fidelity,
avg_fidelity_by_store,
oldest_record: None,
newest_record: None,
total_recall_count: 0,
raw_journal_records,
raw_journal_pending_dream,
dream_episodic_summaries,
dream_semantic_nodes,
last_dream_tick_at,
evolution_metrics: Some(self.evolution.get_metrics()),
background_decay_enabled: self.is_background_decay_enabled().await,
background_dream_enabled: self.is_background_dream_enabled().await,
})
}
pub async fn introspect_record(
&self,
req: RecordIntrospectRequest,
) -> Result<MemoryRecord, CerememoryError> {
let (record, _) = self
.get_store_record(&req.record_id)
.await?
.ok_or_else(|| CerememoryError::RecordNotFound(req.record_id.to_string()))?;
Ok(record)
}
pub async fn introspect_decay_forecast(
&self,
req: DecayForecastRequest,
) -> Result<DecayForecastResponse, CerememoryError> {
let mut forecasts = Vec::with_capacity(req.record_ids.len());
for record_id in &req.record_ids {
let (record, _) = self
.get_store_record(record_id)
.await?
.ok_or_else(|| CerememoryError::RecordNotFound(record_id.to_string()))?;
let current_fidelity = record.fidelity.score;
let last_access_secs = record.last_accessed_at.timestamp() as f64;
let last_tick_secs = record.fidelity.last_decay_tick.timestamp() as f64;
let baseline_secs = last_access_secs.max(last_tick_secs);
let forecast_secs = req.forecast_at.timestamp() as f64;
let elapsed_secs = (forecast_secs - baseline_secs).max(0.0);
let decay_exponent = record.fidelity.decay_rate;
let emotion_mod = cerememory_decay::math::compute_emotion_mod(record.emotion.intensity);
let forecasted_fidelity = cerememory_decay::math::compute_fidelity(
current_fidelity,
elapsed_secs,
record.fidelity.stability,
decay_exponent,
emotion_mod,
);
let params = self.decay.params();
let estimated_threshold_date = if current_fidelity > params.prune_threshold {
Self::estimate_threshold_date(
&record,
decay_exponent,
params.prune_threshold,
emotion_mod,
)
} else {
None
};
forecasts.push(DecayForecast {
record_id: *record_id,
current_fidelity,
forecasted_fidelity,
estimated_threshold_date,
});
}
Ok(DecayForecastResponse { forecasts })
}
fn estimate_threshold_date(
record: &MemoryRecord,
decay_exponent: f64,
prune_threshold: f64,
emotion_mod: f64,
) -> Option<chrono::DateTime<Utc>> {
let base_time = record.last_accessed_at.max(record.fidelity.last_decay_tick);
let f0 = record.fidelity.score;
let stability = record.fidelity.stability;
let mut lo: f64 = 0.0;
let mut hi: f64 = 315_360_000.0;
let f_hi = cerememory_decay::math::compute_fidelity(
f0,
hi,
stability,
decay_exponent,
emotion_mod,
);
if f_hi >= prune_threshold {
return None; }
for _ in 0..30 {
let mid = (lo + hi) / 2.0;
let f_mid = cerememory_decay::math::compute_fidelity(
f0,
mid,
stability,
decay_exponent,
emotion_mod,
);
if f_mid > prune_threshold {
lo = mid;
} else {
hi = mid;
}
}
let threshold_secs = ((lo + hi) / 2.0) as i64;
Some(base_time + chrono::Duration::seconds(threshold_secs))
}
pub async fn introspect_evolution(&self) -> Result<EvolutionMetrics, CerememoryError> {
Ok(self.evolution.get_metrics())
}
}
use cerememory_core::truncate_str;
fn apply_human_noise(content: &MemoryContent, fidelity: f64) -> MemoryContent {
if fidelity >= 0.95 {
return content.clone();
}
let mut noised = content.clone();
for block in &mut noised.blocks {
if block.modality == Modality::Text {
if let Ok(text) = std::str::from_utf8(&block.data) {
let degraded = degrade_text(text, fidelity);
block.data = degraded.into_bytes();
}
}
}
noised
}
fn degrade_text(text: &str, fidelity: f64) -> String {
if fidelity >= 0.9 {
return text.to_string();
}
let words: Vec<&str> = text.split_whitespace().collect();
if words.is_empty() {
return text.to_string();
}
let degrade_fraction = (1.0 - fidelity).min(0.8);
let step = (1.0 / degrade_fraction).max(2.0) as usize;
let mut result = Vec::with_capacity(words.len());
for (i, word) in words.iter().enumerate() {
if i % step == 0 {
result.push("...");
} else {
result.push(word);
}
}
result.join(" ")
}
#[cfg(test)]
mod tests {
use super::*;
async fn make_engine() -> CerememoryEngine {
CerememoryEngine::in_memory().unwrap()
}
fn text_store_req(text: &str, store: Option<StoreType>) -> EncodeStoreRequest {
EncodeStoreRequest {
header: None,
content: MemoryContent {
blocks: vec![ContentBlock {
modality: Modality::Text,
format: "text/plain".to_string(),
data: text.as_bytes().to_vec(),
embedding: None,
}],
summary: None,
},
store,
emotion: None,
context: None,
metadata: None,
associations: None,
}
}
fn structured_store_req(json: &str, store: Option<StoreType>) -> EncodeStoreRequest {
EncodeStoreRequest {
header: None,
content: MemoryContent {
blocks: vec![ContentBlock {
modality: Modality::Structured,
format: "application/json".to_string(),
data: json.as_bytes().to_vec(),
embedding: None,
}],
summary: None,
},
store,
emotion: None,
context: None,
metadata: None,
associations: None,
}
}
fn raw_text_record(session_id: &str, text: &str) -> RawJournalRecord {
RawJournalRecord::new_text(
session_id,
RawSource::Conversation,
RawSpeaker::User,
RawVisibility::Normal,
SecrecyLevel::Public,
text,
)
}
fn raw_text_store_req(
session_id: &str,
text: &str,
visibility: RawVisibility,
secrecy_level: SecrecyLevel,
) -> EncodeStoreRawRequest {
EncodeStoreRawRequest {
header: None,
session_id: session_id.to_string(),
turn_id: None,
topic_id: None,
source: RawSource::Conversation,
speaker: RawSpeaker::User,
visibility,
secrecy_level,
content: MemoryContent {
blocks: vec![ContentBlock {
modality: Modality::Text,
format: "text/plain".to_string(),
data: text.as_bytes().to_vec(),
embedding: None,
}],
summary: None,
},
metadata: None,
}
}
#[tokio::test]
async fn raw_journal_append_and_get_roundtrip() {
let engine = make_engine().await;
let record = raw_text_record("sess-raw-1", "hello from raw journal");
let id = record.id;
let stored_id = engine.append_raw_journal(record).await.unwrap();
assert_eq!(stored_id, id);
let restored = engine.get_raw_journal_record(&id).await.unwrap().unwrap();
assert_eq!(restored.session_id, "sess-raw-1");
assert_eq!(restored.text_content(), Some("hello from raw journal"));
}
#[tokio::test]
async fn raw_journal_query_session_filters_records() {
let engine = make_engine().await;
engine
.append_raw_journal(raw_text_record("sess-a", "first"))
.await
.unwrap();
engine
.append_raw_journal(raw_text_record("sess-b", "second"))
.await
.unwrap();
engine
.append_raw_journal(raw_text_record("sess-a", "third"))
.await
.unwrap();
let records = engine.query_raw_journal_by_session("sess-a").await.unwrap();
assert_eq!(records.len(), 2);
assert!(records.iter().all(|record| record.session_id == "sess-a"));
}
#[tokio::test]
async fn raw_journal_count_tracks_records() {
let engine = make_engine().await;
assert_eq!(engine.raw_journal_count().await.unwrap(), 0);
engine
.append_raw_journal(raw_text_record("sess-a", "first"))
.await
.unwrap();
engine
.append_raw_journal(raw_text_record("sess-a", "second"))
.await
.unwrap();
assert_eq!(engine.raw_journal_count().await.unwrap(), 2);
}
#[tokio::test]
async fn encode_store_raw_and_recall_raw_roundtrip() {
let engine = make_engine().await;
let response = engine
.encode_store_raw(raw_text_store_req(
"sess-raw",
"forensic memory",
RawVisibility::Normal,
SecrecyLevel::Public,
))
.await
.unwrap();
let recalled = engine
.recall_raw_query(RecallRawQueryRequest {
header: None,
session_id: Some("sess-raw".to_string()),
query: Some("forensic".to_string()),
temporal: None,
limit: 10,
include_private_scratch: false,
include_sealed: false,
secrecy_levels: None,
})
.await
.unwrap();
assert_eq!(recalled.total_candidates, 1);
assert_eq!(recalled.records.len(), 1);
assert_eq!(recalled.records[0].id, response.record_id);
assert_eq!(recalled.records[0].text_content(), Some("forensic memory"));
}
#[tokio::test]
async fn recall_raw_defaults_exclude_private_scratch_and_secret() {
let engine = make_engine().await;
engine
.encode_batch_store_raw(EncodeBatchStoreRawRequest {
header: None,
records: vec![
raw_text_store_req(
"sess-secure",
"public normal",
RawVisibility::Normal,
SecrecyLevel::Public,
),
raw_text_store_req(
"sess-secure",
"private scratch",
RawVisibility::PrivateScratch,
SecrecyLevel::Sensitive,
),
raw_text_store_req(
"sess-secure",
"sealed secret",
RawVisibility::Sealed,
SecrecyLevel::Secret,
),
],
})
.await
.unwrap();
let recalled = engine
.recall_raw_query(RecallRawQueryRequest {
header: None,
session_id: Some("sess-secure".to_string()),
query: None,
temporal: None,
limit: 10,
include_private_scratch: false,
include_sealed: false,
secrecy_levels: None,
})
.await
.unwrap();
assert_eq!(recalled.total_candidates, 1);
assert_eq!(recalled.records[0].text_content(), Some("public normal"));
}
#[tokio::test]
async fn raw_journal_does_not_leak_into_normal_recall() {
let engine = make_engine().await;
engine
.encode_store_raw(raw_text_store_req(
"sess-leak",
"hidden raw only",
RawVisibility::Normal,
SecrecyLevel::Public,
))
.await
.unwrap();
let response = engine
.recall_query(RecallQueryRequest {
header: None,
cue: RecallCue {
text: Some("hidden raw".to_string()),
..Default::default()
},
stores: None,
limit: 10,
min_fidelity: None,
include_decayed: false,
reconsolidate: false,
activation_depth: 0,
recall_mode: RecallMode::Perfect,
})
.await
.unwrap();
assert_eq!(response.total_candidates, 0);
assert!(response.memories.is_empty());
}
#[tokio::test]
async fn dream_tick_creates_episodic_summary_and_marks_raw_processed() {
let engine = make_engine().await;
let first = engine
.encode_store_raw(raw_text_store_req(
"sess-dream",
"Discussed API timeout policy",
RawVisibility::Normal,
SecrecyLevel::Public,
))
.await
.unwrap();
let second = engine
.encode_store_raw(raw_text_store_req(
"sess-dream",
"Decided to keep retries idempotent-only",
RawVisibility::Normal,
SecrecyLevel::Public,
))
.await
.unwrap();
let resp = engine
.lifecycle_dream_tick(DreamTickRequest {
header: None,
session_id: Some("sess-dream".to_string()),
dry_run: false,
max_groups: 10,
include_private_scratch: false,
include_sealed: false,
promote_semantic: true,
secrecy_levels: None,
})
.await
.unwrap();
assert_eq!(resp.groups_processed, 1);
assert_eq!(resp.raw_records_processed, 2);
assert_eq!(resp.episodic_summaries_created, 1);
assert_eq!(resp.semantic_nodes_created, 1);
let summaries = engine.episodic.get_all().await.unwrap();
assert_eq!(summaries.len(), 1);
let summary = &summaries[0];
let semantic_records = engine.semantic.get_all().await.unwrap();
assert_eq!(semantic_records.len(), 1);
let semantic = &semantic_records[0];
assert_eq!(summary.store, StoreType::Episodic);
assert_eq!(summary.metadata["_origin"]["raw_session_id"], "sess-dream");
assert_eq!(summary.metadata["_origin"]["raw_record_count"], 2);
assert_eq!(
summary.metadata["_origin"]["raw_record_ids"][0],
first.record_id.to_string()
);
assert_eq!(
summary.metadata["_origin"]["raw_record_ids"][1],
second.record_id.to_string()
);
let raw_one = engine
.get_raw_journal_record(&first.record_id)
.await
.unwrap()
.unwrap();
let raw_two = engine
.get_raw_journal_record(&second.record_id)
.await
.unwrap()
.unwrap();
assert_eq!(raw_one.derived_memory_ids, vec![summary.id, semantic.id]);
assert_eq!(raw_two.derived_memory_ids, vec![summary.id, semantic.id]);
assert_eq!(
raw_one.metadata["_dream"]["last_summary_id"],
serde_json::Value::String(summary.id.to_string())
);
assert!(raw_one.metadata["_dream"]["processed_at"].is_string());
}
#[tokio::test]
async fn dream_tick_is_idempotent_for_processed_raw_records() {
let engine = make_engine().await;
engine
.encode_store_raw(raw_text_store_req(
"sess-dream-repeat",
"First raw note",
RawVisibility::Normal,
SecrecyLevel::Public,
))
.await
.unwrap();
let first = engine
.lifecycle_dream_tick(DreamTickRequest {
header: None,
session_id: Some("sess-dream-repeat".to_string()),
dry_run: false,
max_groups: 10,
include_private_scratch: false,
include_sealed: false,
promote_semantic: true,
secrecy_levels: None,
})
.await
.unwrap();
let second = engine
.lifecycle_dream_tick(DreamTickRequest {
header: None,
session_id: Some("sess-dream-repeat".to_string()),
dry_run: false,
max_groups: 10,
include_private_scratch: false,
include_sealed: false,
promote_semantic: true,
secrecy_levels: None,
})
.await
.unwrap();
assert_eq!(first.episodic_summaries_created, 1);
assert_eq!(first.semantic_nodes_created, 0);
assert_eq!(second.episodic_summaries_created, 0);
assert_eq!(second.semantic_nodes_created, 0);
assert_eq!(engine.episodic.count().await.unwrap(), 1);
}
#[tokio::test]
async fn dream_tick_redacts_sealed_secret_and_private_scratch_content() {
let engine = make_engine().await;
engine
.encode_batch_store_raw(EncodeBatchStoreRawRequest {
header: None,
records: vec![
raw_text_store_req(
"sess-dream-redact",
"Visible public note",
RawVisibility::Normal,
SecrecyLevel::Public,
),
raw_text_store_req(
"sess-dream-redact",
"Private scratch hypothesis",
RawVisibility::PrivateScratch,
SecrecyLevel::Sensitive,
),
raw_text_store_req(
"sess-dream-redact",
"Sealed customer secret",
RawVisibility::Sealed,
SecrecyLevel::Secret,
),
],
})
.await
.unwrap();
let resp = engine
.lifecycle_dream_tick(DreamTickRequest {
header: None,
session_id: Some("sess-dream-redact".to_string()),
dry_run: false,
max_groups: 10,
include_private_scratch: true,
include_sealed: true,
promote_semantic: true,
secrecy_levels: Some(vec![
SecrecyLevel::Public,
SecrecyLevel::Sensitive,
SecrecyLevel::Secret,
]),
})
.await
.unwrap();
assert_eq!(resp.groups_processed, 1);
assert_eq!(resp.semantic_nodes_created, 0);
let summaries = engine.episodic.get_all().await.unwrap();
assert_eq!(summaries.len(), 1);
let summary = &summaries[0];
let summary_text = summary.text_content().unwrap_or("");
assert!(summary_text.contains("Visible public note"));
assert!(!summary_text.contains("Private scratch hypothesis"));
assert!(!summary_text.contains("Sealed customer secret"));
assert!(summary_text.contains("Redacted raw records: 2"));
assert_eq!(
summary.metadata["_dream"]["summary_stats"]["normal_records"],
1
);
assert_eq!(
summary.metadata["_dream"]["summary_stats"]["private_scratch_redacted"],
1
);
assert_eq!(
summary.metadata["_dream"]["summary_stats"]["secret_redacted"],
1
);
}
#[tokio::test]
async fn dream_tick_infers_multiple_topics_with_time_gap_and_lexical_shift() {
let engine = make_engine().await;
let base = Utc::now() - chrono::Duration::hours(2);
let mut first = raw_text_record("sess-topic-infer", "API timeout retries idempotent only");
first.created_at = base;
first.updated_at = base;
let mut second = raw_text_record("sess-topic-infer", "Backoff budget and timeout policy");
second.created_at = base + chrono::Duration::minutes(5);
second.updated_at = second.created_at;
let mut third = raw_text_record("sess-topic-infer", "Landing page hero typography palette");
third.created_at = base + chrono::Duration::minutes(25);
third.updated_at = third.created_at;
engine.append_raw_journal(first).await.unwrap();
engine.append_raw_journal(second).await.unwrap();
engine.append_raw_journal(third).await.unwrap();
let resp = engine
.lifecycle_dream_tick(DreamTickRequest {
header: None,
session_id: Some("sess-topic-infer".to_string()),
dry_run: false,
max_groups: 10,
include_private_scratch: false,
include_sealed: false,
promote_semantic: true,
secrecy_levels: None,
})
.await
.unwrap();
assert_eq!(resp.groups_processed, 2);
assert_eq!(resp.episodic_summaries_created, 2);
assert_eq!(resp.semantic_nodes_created, 1);
let summaries = engine.episodic.get_all().await.unwrap();
assert_eq!(summaries.len(), 2);
assert!(summaries.iter().all(|summary| {
summary.metadata["_origin"]["raw_topic_inferred"] == serde_json::Value::Bool(true)
}));
assert!(summaries
.iter()
.all(|summary| { summary.metadata["_origin"]["raw_topic_hint"].is_string() }));
}
#[tokio::test]
async fn encode_recall_roundtrip() {
let engine = make_engine().await;
let resp = engine
.encode_store(text_store_req(
"The quick brown fox",
Some(StoreType::Episodic),
))
.await
.unwrap();
assert_eq!(resp.store, StoreType::Episodic);
assert_eq!(resp.initial_fidelity, 1.0);
let query = RecallQueryRequest {
header: None,
cue: RecallCue {
text: Some("quick brown".to_string()),
..Default::default()
},
stores: None,
limit: 10,
min_fidelity: None,
include_decayed: false,
reconsolidate: true,
activation_depth: 0,
recall_mode: RecallMode::Perfect,
};
let recall_resp = engine.recall_query(query).await.unwrap();
assert!(!recall_resp.memories.is_empty());
assert_eq!(
recall_resp.memories[0].record.text_content(),
Some("The quick brown fox")
);
}
#[tokio::test]
async fn encode_store_persists_metadata_and_context() {
let engine = make_engine().await;
let resp = engine
.encode_store(EncodeStoreRequest {
header: None,
content: MemoryContent {
blocks: vec![ContentBlock {
modality: Modality::Text,
format: "text/plain".to_string(),
data: b"metadata roundtrip".to_vec(),
embedding: None,
}],
summary: None,
},
store: Some(StoreType::Episodic),
emotion: None,
context: Some(EncodeContext {
source: Some("chat".to_string()),
session_id: Some("sess-1".to_string()),
spatial: None,
temporal: None,
}),
metadata: Some(serde_json::json!({"tag": "important"})),
associations: None,
})
.await
.unwrap();
let record = engine
.introspect_record(RecordIntrospectRequest {
header: None,
record_id: resp.record_id,
include_history: false,
include_associations: false,
include_versions: false,
})
.await
.unwrap();
assert_eq!(record.metadata["tag"], "important");
assert_eq!(record.metadata["_context"]["source"], "chat");
assert_eq!(record.metadata["_context"]["session_id"], "sess-1");
}
#[tokio::test]
async fn tantivy_tokenized_search() {
let engine = make_engine().await;
engine
.encode_store(text_store_req(
"The quick brown fox jumps over the lazy dog",
Some(StoreType::Episodic),
))
.await
.unwrap();
let query = RecallQueryRequest {
header: None,
cue: RecallCue {
text: Some("quick".to_string()),
..Default::default()
},
stores: None,
limit: 10,
min_fidelity: None,
include_decayed: false,
reconsolidate: false,
activation_depth: 0,
recall_mode: RecallMode::Perfect,
};
let resp = engine.recall_query(query).await.unwrap();
assert!(!resp.memories.is_empty());
assert_eq!(
resp.memories[0].record.text_content(),
Some("The quick brown fox jumps over the lazy dog")
);
}
#[tokio::test]
async fn vector_search_recall() {
let engine = make_engine().await;
let req = EncodeStoreRequest {
header: None,
content: MemoryContent {
blocks: vec![ContentBlock {
modality: Modality::Text,
format: "text/plain".to_string(),
data: b"Cats are fluffy animals".to_vec(),
embedding: Some(vec![1.0, 0.0, 0.0]),
}],
summary: None,
},
store: Some(StoreType::Episodic),
emotion: None,
context: None,
metadata: None,
associations: None,
};
engine.encode_store(req).await.unwrap();
let query = RecallQueryRequest {
header: None,
cue: RecallCue {
embedding: Some(vec![1.0, 0.1, 0.0]),
..Default::default()
},
stores: None,
limit: 10,
min_fidelity: None,
include_decayed: false,
reconsolidate: false,
activation_depth: 0,
recall_mode: RecallMode::Perfect,
};
let resp = engine.recall_query(query).await.unwrap();
assert!(!resp.memories.is_empty());
assert_eq!(
resp.memories[0].record.text_content(),
Some("Cats are fluffy animals")
);
}
#[tokio::test]
async fn hybrid_text_vector_search() {
let engine = make_engine().await;
let req1 = EncodeStoreRequest {
header: None,
content: MemoryContent {
blocks: vec![ContentBlock {
modality: Modality::Text,
format: "text/plain".to_string(),
data: b"Machine learning is fascinating".to_vec(),
embedding: Some(vec![1.0, 0.0, 0.0]),
}],
summary: None,
},
store: Some(StoreType::Episodic),
emotion: None,
context: None,
metadata: None,
associations: None,
};
engine.encode_store(req1).await.unwrap();
let query = RecallQueryRequest {
header: None,
cue: RecallCue {
text: Some("machine learning".to_string()),
embedding: Some(vec![1.0, 0.0, 0.0]),
..Default::default()
},
stores: None,
limit: 10,
min_fidelity: None,
include_decayed: false,
reconsolidate: false,
activation_depth: 0,
recall_mode: RecallMode::Perfect,
};
let resp = engine.recall_query(query).await.unwrap();
assert!(!resp.memories.is_empty());
}
#[tokio::test]
async fn recall_query_metadata_counts_temporal_only_searches() {
let engine = make_engine().await;
engine
.encode_store(text_store_req(
"Temporal-only query metadata should count scanned records",
Some(StoreType::Episodic),
))
.await
.unwrap();
let now = Utc::now();
let resp = engine
.recall_query(RecallQueryRequest {
header: None,
cue: RecallCue {
temporal: Some(TemporalRange {
start: now - chrono::Duration::minutes(1),
end: now + chrono::Duration::minutes(1),
}),
..Default::default()
},
stores: None,
limit: 10,
min_fidelity: None,
include_decayed: false,
reconsolidate: false,
activation_depth: 0,
recall_mode: RecallMode::Perfect,
})
.await
.unwrap();
let metadata = resp
.query_metadata
.expect("query metadata should be present");
assert_eq!(metadata.total_records_scanned, 1);
assert_eq!(resp.memories.len(), 1);
}
#[tokio::test]
async fn temporal_query_filters_activation_results_across_stores() {
let engine = make_engine().await;
let mut old_semantic = MemoryRecord::new_text(StoreType::Semantic, "alpha out of range");
old_semantic.created_at = Utc::now() - chrono::Duration::days(7);
old_semantic.updated_at = old_semantic.created_at;
old_semantic.last_accessed_at = old_semantic.created_at;
engine.semantic.store(old_semantic.clone()).await.unwrap();
engine
.coordinator
.register(
old_semantic.id,
StoreType::Semantic,
old_semantic.associations.clone(),
)
.await;
engine.index_record(&old_semantic).unwrap();
let current = MemoryRecord {
associations: vec![Association {
target_id: old_semantic.id,
association_type: AssociationType::Semantic,
weight: 0.9,
created_at: Utc::now(),
last_co_activation: Utc::now(),
}],
..MemoryRecord::new_text(StoreType::Episodic, "alpha in range")
};
engine.episodic.store(current.clone()).await.unwrap();
engine
.coordinator
.register(
current.id,
StoreType::Episodic,
current.associations.clone(),
)
.await;
engine.index_record(¤t).unwrap();
let now = Utc::now();
let resp = engine
.recall_query(RecallQueryRequest {
header: None,
cue: RecallCue {
text: Some("alpha".to_string()),
temporal: Some(TemporalRange {
start: now - chrono::Duration::minutes(1),
end: now + chrono::Duration::minutes(1),
}),
..Default::default()
},
stores: None,
limit: 10,
min_fidelity: None,
include_decayed: false,
reconsolidate: false,
activation_depth: 1,
recall_mode: RecallMode::Perfect,
})
.await
.unwrap();
assert_eq!(resp.memories.len(), 1);
assert_eq!(resp.memories[0].record.id, current.id);
}
#[tokio::test]
async fn encode_batch_with_associations() {
let engine = make_engine().await;
let batch = EncodeBatchRequest {
header: None,
records: vec![
text_store_req("First memory", Some(StoreType::Episodic)),
text_store_req("Second memory", Some(StoreType::Episodic)),
text_store_req("Third memory", Some(StoreType::Episodic)),
],
infer_associations: true,
};
let resp = engine.encode_batch(batch).await.unwrap();
assert_eq!(resp.results.len(), 3);
assert_eq!(resp.associations_inferred, 4);
}
#[tokio::test]
async fn encode_batch_associations_survive_rebuild() {
let engine = make_engine().await;
let resp = engine
.encode_batch(EncodeBatchRequest {
header: None,
records: vec![
text_store_req("Persisted first", Some(StoreType::Episodic)),
text_store_req("Persisted second", Some(StoreType::Episodic)),
],
infer_associations: true,
})
.await
.unwrap();
engine.rebuild_coordinator().await.unwrap();
let first = engine
.introspect_record(RecordIntrospectRequest {
header: None,
record_id: resp.results[0].record_id,
include_history: false,
include_associations: true,
include_versions: false,
})
.await
.unwrap();
assert_eq!(first.associations.len(), 1);
let assoc_resp = engine
.recall_associate(RecallAssociateRequest {
header: None,
record_id: resp.results[0].record_id,
association_types: Some(vec![AssociationType::Sequential]),
depth: 1,
min_weight: 0.1,
limit: 10,
})
.await
.unwrap();
assert_eq!(assoc_resp.memories.len(), 1);
assert_eq!(assoc_resp.memories[0].record.id, resp.results[1].record_id);
}
#[tokio::test]
async fn decay_tick_integration() {
let engine = make_engine().await;
for i in 0..5 {
engine
.encode_store(text_store_req(
&format!("Memory {i}"),
Some(StoreType::Episodic),
))
.await
.unwrap();
}
let resp = engine
.lifecycle_decay_tick(DecayTickRequest {
header: None,
tick_duration_seconds: Some(86400),
})
.await
.unwrap();
assert_eq!(resp.records_updated, 5);
}
#[tokio::test]
async fn forget_requires_confirmation() {
let engine = make_engine().await;
let result = engine
.lifecycle_forget(ForgetRequest {
header: None,
record_ids: None,
store: None,
temporal_range: None,
cascade: false,
confirm: false,
})
.await;
assert!(matches!(result, Err(CerememoryError::ForgetUnconfirmed)));
}
#[tokio::test]
async fn forget_deletes_records() {
let engine = make_engine().await;
let resp = engine
.encode_store(text_store_req("To forget", Some(StoreType::Episodic)))
.await
.unwrap();
let deleted = engine
.lifecycle_forget(ForgetRequest {
header: None,
record_ids: Some(vec![resp.record_id]),
store: None,
temporal_range: None,
cascade: false,
confirm: true,
})
.await
.unwrap();
assert_eq!(deleted, 1);
let record = engine.get_store_record(&resp.record_id).await.unwrap();
assert!(record.is_none());
let hits = engine.text_index.search("forget", None, 10).unwrap();
assert!(hits.is_empty());
}
#[tokio::test]
async fn forget_temporal_range_deletes_matching_records() {
let engine = make_engine().await;
let mut old_record = MemoryRecord::new_text(StoreType::Episodic, "old");
old_record.created_at = Utc::now() - chrono::Duration::days(2);
old_record.updated_at = old_record.created_at;
old_record.last_accessed_at = old_record.created_at;
engine.episodic.store(old_record.clone()).await.unwrap();
engine
.coordinator
.register(
old_record.id,
StoreType::Episodic,
old_record.associations.clone(),
)
.await;
engine.index_record(&old_record).unwrap();
let current = MemoryRecord::new_text(StoreType::Episodic, "current");
engine.episodic.store(current.clone()).await.unwrap();
engine
.coordinator
.register(
current.id,
StoreType::Episodic,
current.associations.clone(),
)
.await;
engine.index_record(¤t).unwrap();
let deleted = engine
.lifecycle_forget(ForgetRequest {
header: None,
record_ids: None,
store: None,
temporal_range: Some(TemporalRange {
start: Utc::now() - chrono::Duration::minutes(1),
end: Utc::now() + chrono::Duration::minutes(1),
}),
cascade: false,
confirm: true,
})
.await
.unwrap();
assert_eq!(deleted, 1);
assert!(engine
.get_store_record(¤t.id)
.await
.unwrap()
.is_none());
assert!(engine
.get_store_record(&old_record.id)
.await
.unwrap()
.is_some());
}
#[tokio::test]
async fn mode_switch() {
let engine = make_engine().await;
engine
.lifecycle_set_mode(SetModeRequest {
header: None,
mode: RecallMode::Perfect,
scope: None,
})
.await
.unwrap();
assert_eq!(*engine.recall_mode.read().await, RecallMode::Perfect);
}
#[tokio::test]
async fn introspect_stats() {
let engine = make_engine().await;
engine
.encode_store(text_store_req("Stats test", Some(StoreType::Episodic)))
.await
.unwrap();
engine
.encode_store_raw(raw_text_store_req(
"sess-stats",
"Raw stats note",
RawVisibility::Normal,
SecrecyLevel::Public,
))
.await
.unwrap();
let stats = engine.introspect_stats().await.unwrap();
assert_eq!(stats.total_records, 1);
assert_eq!(stats.records_by_store[&StoreType::Episodic], 1);
assert!((stats.avg_fidelity - 1.0).abs() < f64::EPSILON);
assert!(!stats.background_decay_enabled);
assert!(!stats.background_dream_enabled);
assert_eq!(stats.raw_journal_records, 1);
assert_eq!(stats.raw_journal_pending_dream, 1);
}
#[tokio::test]
async fn auto_store_routing() {
let engine = make_engine().await;
let resp1 = engine
.encode_store(text_store_req("An event", None))
.await
.unwrap();
assert_eq!(resp1.store, StoreType::Episodic);
let req = EncodeStoreRequest {
header: None,
content: MemoryContent {
blocks: vec![ContentBlock {
modality: Modality::Text,
format: "text/plain".to_string(),
data: b"Rust is a systems language".to_vec(),
embedding: None,
}],
summary: Some("Rust programming facts".to_string()),
},
store: None,
emotion: None,
context: None,
metadata: None,
associations: None,
};
let resp2 = engine.encode_store(req).await.unwrap();
assert_eq!(resp2.store, StoreType::Semantic);
}
#[tokio::test]
async fn human_mode_degrades_content() {
let content = MemoryContent {
blocks: vec![ContentBlock {
modality: Modality::Text,
format: "text/plain".to_string(),
data: b"The quick brown fox jumps over the lazy dog".to_vec(),
embedding: None,
}],
summary: None,
};
let result = apply_human_noise(&content, 0.95);
assert_eq!(result.blocks[0].data, content.blocks[0].data);
let result = apply_human_noise(&content, 0.3);
let text = std::str::from_utf8(&result.blocks[0].data).unwrap();
assert!(text.contains("..."));
}
#[tokio::test]
async fn encode_update() {
let engine = make_engine().await;
let resp = engine
.encode_store(text_store_req("Original", Some(StoreType::Episodic)))
.await
.unwrap();
engine
.encode_update(EncodeUpdateRequest {
header: None,
record_id: resp.record_id,
content: Some(MemoryContent {
blocks: vec![ContentBlock {
modality: Modality::Text,
format: "text/plain".to_string(),
data: b"Updated content".to_vec(),
embedding: None,
}],
summary: None,
}),
emotion: None,
metadata: None,
})
.await
.unwrap();
let record = engine
.introspect_record(RecordIntrospectRequest {
header: None,
record_id: resp.record_id,
include_history: false,
include_associations: false,
include_versions: false,
})
.await
.unwrap();
assert_eq!(record.text_content(), Some("Updated content"));
let hits = engine.text_index.search("Updated", None, 10).unwrap();
assert_eq!(hits.len(), 1);
let hits = engine.text_index.search("Original", None, 10).unwrap();
assert!(hits.is_empty());
}
#[tokio::test]
async fn encode_update_refreshes_structured_index() {
let engine = make_engine().await;
let resp = engine
.encode_store(EncodeStoreRequest {
header: None,
content: MemoryContent {
blocks: vec![ContentBlock {
modality: Modality::Structured,
format: "application/json".to_string(),
data: br#"{"user":{"name":"Alice"}}"#.to_vec(),
embedding: None,
}],
summary: None,
},
store: Some(StoreType::Episodic),
emotion: None,
context: None,
metadata: None,
associations: None,
})
.await
.unwrap();
assert_eq!(
engine.text_index.search("Alice", None, 10).unwrap().len(),
1
);
engine
.encode_update(EncodeUpdateRequest {
header: None,
record_id: resp.record_id,
content: Some(MemoryContent {
blocks: vec![ContentBlock {
modality: Modality::Structured,
format: "application/json".to_string(),
data: br#"{"user":{"name":"Bob"}}"#.to_vec(),
embedding: None,
}],
summary: None,
}),
emotion: None,
metadata: None,
})
.await
.unwrap();
assert!(engine
.text_index
.search("Alice", None, 10)
.unwrap()
.is_empty());
assert_eq!(engine.text_index.search("Bob", None, 10).unwrap().len(), 1);
}
#[tokio::test]
async fn rebuild_coordinator_reindexes_structured_records() {
let engine = make_engine().await;
let resp = engine
.encode_store(EncodeStoreRequest {
header: None,
content: MemoryContent {
blocks: vec![ContentBlock {
modality: Modality::Structured,
format: "application/json".to_string(),
data: br#"{"project":{"name":"Cerememory"}}"#.to_vec(),
embedding: None,
}],
summary: None,
},
store: Some(StoreType::Episodic),
emotion: None,
context: None,
metadata: None,
associations: None,
})
.await
.unwrap();
engine.text_index.remove(resp.record_id).unwrap();
assert!(engine
.text_index
.search("Cerememory", None, 10)
.unwrap()
.is_empty());
engine.rebuild_coordinator().await.unwrap();
assert_eq!(
engine
.text_index
.search("Cerememory", None, 10)
.unwrap()
.len(),
1
);
}
#[tokio::test]
async fn background_decay_runs() {
let config = EngineConfig {
background_decay_interval_secs: Some(1), ..EngineConfig::default()
};
let engine = Arc::new(CerememoryEngine::new(config).unwrap());
engine
.encode_store(text_store_req("Decay me", Some(StoreType::Episodic)))
.await
.unwrap();
engine.start_background_decay();
assert!(engine.is_background_decay_enabled().await);
tokio::time::sleep(std::time::Duration::from_millis(2500)).await;
engine.stop_background_decay().await;
assert!(!engine.is_background_decay_enabled().await);
}
#[tokio::test]
async fn background_decay_disabled_by_default() {
let engine = Arc::new(make_engine().await);
engine.start_background_decay(); assert!(!engine.is_background_decay_enabled().await);
}
#[tokio::test]
async fn background_dream_runs() {
let config = EngineConfig {
background_dream_interval_secs: Some(1),
..EngineConfig::default()
};
let engine = Arc::new(CerememoryEngine::new(config).unwrap());
engine
.encode_store_raw(raw_text_store_req(
"sess-bg-dream",
"Background dream me",
RawVisibility::Normal,
SecrecyLevel::Public,
))
.await
.unwrap();
engine.start_background_dream();
assert!(engine.is_background_dream_enabled().await);
tokio::time::sleep(std::time::Duration::from_millis(2500)).await;
engine.stop_background_dream().await;
assert!(!engine.is_background_dream_enabled().await);
assert_eq!(engine.episodic.count().await.unwrap(), 1);
}
#[tokio::test]
async fn background_dream_disabled_by_default() {
let engine = Arc::new(make_engine().await);
engine.start_background_dream();
assert!(!engine.is_background_dream_enabled().await);
}
#[tokio::test]
async fn multimodal_store_and_retrieve() {
let engine = make_engine().await;
let req = EncodeStoreRequest {
header: None,
content: MemoryContent {
blocks: vec![ContentBlock {
modality: Modality::Image,
format: "image/png".to_string(),
data: vec![0x89, 0x50, 0x4E, 0x47], embedding: Some(vec![0.5, 0.3, 0.8]),
}],
summary: Some("A photo of a sunset".to_string()),
},
store: Some(StoreType::Episodic),
emotion: None,
context: None,
metadata: None,
associations: None,
};
let resp = engine.encode_store(req).await.unwrap();
let record = engine
.introspect_record(RecordIntrospectRequest {
header: None,
record_id: resp.record_id,
include_history: false,
include_associations: false,
include_versions: false,
})
.await
.unwrap();
assert_eq!(record.content.blocks[0].modality, Modality::Image);
assert_eq!(record.content.blocks[0].data, vec![0x89, 0x50, 0x4E, 0x47]);
let query = RecallQueryRequest {
header: None,
cue: RecallCue {
embedding: Some(vec![0.5, 0.3, 0.8]),
..Default::default()
},
stores: None,
limit: 10,
min_fidelity: None,
include_decayed: false,
reconsolidate: false,
activation_depth: 0,
recall_mode: RecallMode::Perfect,
};
let recall_resp = engine.recall_query(query).await.unwrap();
assert!(!recall_resp.memories.is_empty());
}
#[tokio::test]
async fn summary_only_records_are_text_searchable() {
let engine = make_engine().await;
let resp = engine
.encode_store(EncodeStoreRequest {
header: None,
content: MemoryContent {
blocks: vec![ContentBlock {
modality: Modality::Image,
format: "image/png".to_string(),
data: vec![0x89, 0x50, 0x4E, 0x47],
embedding: None,
}],
summary: Some("sunset skyline".to_string()),
},
store: Some(StoreType::Semantic),
emotion: None,
context: None,
metadata: None,
associations: None,
})
.await
.unwrap();
let recalled = engine
.recall_query(RecallQueryRequest {
header: None,
cue: RecallCue {
text: Some("skyline".to_string()),
..Default::default()
},
stores: Some(vec![StoreType::Semantic]),
limit: 10,
min_fidelity: None,
include_decayed: false,
reconsolidate: false,
activation_depth: 0,
recall_mode: RecallMode::Perfect,
})
.await
.unwrap();
assert_eq!(recalled.memories.len(), 1);
assert_eq!(recalled.memories[0].record.id, resp.record_id);
}
#[tokio::test]
async fn multiple_text_blocks_are_all_indexed() {
let engine = make_engine().await;
engine
.encode_store(EncodeStoreRequest {
header: None,
content: MemoryContent {
blocks: vec![
ContentBlock {
modality: Modality::Text,
format: "text/plain".to_string(),
data: b"primary block".to_vec(),
embedding: None,
},
ContentBlock {
modality: Modality::Text,
format: "text/plain".to_string(),
data: b"secondary block".to_vec(),
embedding: None,
},
],
summary: None,
},
store: Some(StoreType::Episodic),
emotion: None,
context: None,
metadata: None,
associations: None,
})
.await
.unwrap();
let recalled = engine
.recall_query(RecallQueryRequest {
header: None,
cue: RecallCue {
text: Some("secondary".to_string()),
..Default::default()
},
stores: Some(vec![StoreType::Episodic]),
limit: 10,
min_fidelity: None,
include_decayed: false,
reconsolidate: false,
activation_depth: 0,
recall_mode: RecallMode::Perfect,
})
.await
.unwrap();
assert_eq!(recalled.memories.len(), 1);
}
#[tokio::test]
async fn structured_recall_survives_rebuild() {
let engine = make_engine().await;
engine
.encode_store(structured_store_req(
r#"{"profile":{"city":"Tokyo","skills":["rust","python"]}}"#,
Some(StoreType::Semantic),
))
.await
.unwrap();
let query = RecallQueryRequest {
header: None,
cue: RecallCue {
text: Some("Tokyo".to_string()),
..Default::default()
},
stores: Some(vec![StoreType::Semantic]),
limit: 10,
min_fidelity: None,
include_decayed: false,
reconsolidate: false,
activation_depth: 0,
recall_mode: RecallMode::Perfect,
};
let before = engine.recall_query(query.clone()).await.unwrap();
assert_eq!(before.memories.len(), 1);
engine.rebuild_coordinator().await.unwrap();
let after = engine.recall_query(query).await.unwrap();
assert_eq!(after.memories.len(), 1);
}
#[tokio::test]
async fn structured_update_rebuilds_text_index() {
let engine = make_engine().await;
let resp = engine
.encode_store(structured_store_req(
r#"{"profile":{"city":"Tokyo"}}"#,
Some(StoreType::Semantic),
))
.await
.unwrap();
let tokyo_hits = engine
.recall_query(RecallQueryRequest {
header: None,
cue: RecallCue {
text: Some("Tokyo".to_string()),
..Default::default()
},
stores: Some(vec![StoreType::Semantic]),
limit: 10,
min_fidelity: None,
include_decayed: false,
reconsolidate: false,
activation_depth: 0,
recall_mode: RecallMode::Perfect,
})
.await
.unwrap();
assert_eq!(tokyo_hits.memories.len(), 1);
engine
.encode_update(EncodeUpdateRequest {
header: None,
record_id: resp.record_id,
content: Some(MemoryContent {
blocks: vec![ContentBlock {
modality: Modality::Structured,
format: "application/json".to_string(),
data: br#"{"profile":{"city":"Osaka"}}"#.to_vec(),
embedding: None,
}],
summary: None,
}),
emotion: None,
metadata: None,
})
.await
.unwrap();
let tokyo_hits = engine
.recall_query(RecallQueryRequest {
header: None,
cue: RecallCue {
text: Some("Tokyo".to_string()),
..Default::default()
},
stores: Some(vec![StoreType::Semantic]),
limit: 10,
min_fidelity: None,
include_decayed: false,
reconsolidate: false,
activation_depth: 0,
recall_mode: RecallMode::Perfect,
})
.await
.unwrap();
assert!(tokyo_hits.memories.is_empty());
let osaka_hits = engine
.recall_query(RecallQueryRequest {
header: None,
cue: RecallCue {
text: Some("Osaka".to_string()),
..Default::default()
},
stores: Some(vec![StoreType::Semantic]),
limit: 10,
min_fidelity: None,
include_decayed: false,
reconsolidate: false,
activation_depth: 0,
recall_mode: RecallMode::Perfect,
})
.await
.unwrap();
assert_eq!(osaka_hits.memories.len(), 1);
}
#[tokio::test]
async fn multimodal_image_recall_uses_provider_embedding() {
let provider = Arc::new(MockLLMProvider::new(4));
let engine = CerememoryEngine::new(EngineConfig {
llm_provider: Some(provider),
..Default::default()
})
.unwrap();
let image = vec![0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A];
engine
.encode_store(EncodeStoreRequest {
header: None,
content: MemoryContent {
blocks: vec![ContentBlock {
modality: Modality::Image,
format: "image/png".to_string(),
data: image.clone(),
embedding: None,
}],
summary: Some("indexed image".to_string()),
},
store: Some(StoreType::Episodic),
emotion: None,
context: None,
metadata: None,
associations: None,
})
.await
.unwrap();
let resp = engine
.recall_query(RecallQueryRequest {
header: None,
cue: RecallCue {
image: Some(image),
..Default::default()
},
stores: None,
limit: 10,
min_fidelity: None,
include_decayed: false,
reconsolidate: false,
activation_depth: 0,
recall_mode: RecallMode::Perfect,
})
.await
.unwrap();
assert_eq!(resp.memories.len(), 1);
assert_eq!(
resp.memories[0].record.content.blocks[0].modality,
Modality::Image
);
}
#[tokio::test]
async fn multimodal_audio_recall_uses_provider_transcript() {
let provider = Arc::new(MockLLMProvider::new(4));
let engine = CerememoryEngine::new(EngineConfig {
llm_provider: Some(provider),
..Default::default()
})
.unwrap();
let wav_bytes = b"RIFFabcdWAVE".to_vec();
engine
.encode_store(text_store_req("audio-12", Some(StoreType::Episodic)))
.await
.unwrap();
let resp = engine
.recall_query(RecallQueryRequest {
header: None,
cue: RecallCue {
audio: Some(wav_bytes),
..Default::default()
},
stores: None,
limit: 10,
min_fidelity: None,
include_decayed: false,
reconsolidate: false,
activation_depth: 0,
recall_mode: RecallMode::Perfect,
})
.await
.unwrap();
assert!(!resp.memories.is_empty());
assert_eq!(resp.memories[0].record.text_content(), Some("audio-12"));
}
#[tokio::test]
async fn encode_store_processes_all_multimodal_blocks() {
let provider = Arc::new(MockLLMProvider::new(4));
let engine = CerememoryEngine::new(EngineConfig {
llm_provider: Some(provider),
..Default::default()
})
.unwrap();
let image_one = vec![0x89, b'P', b'N', b'G', 1, 2, 3, 4];
let image_two = vec![0x89, b'P', b'N', b'G', 5, 6, 7, 8, 9];
let audio_one = b"RIFFabcdWAVEone".to_vec();
let audio_two = b"RIFFabcdWAVEtwoo".to_vec();
let resp = engine
.encode_store(EncodeStoreRequest {
header: None,
content: MemoryContent {
blocks: vec![
ContentBlock {
modality: Modality::Image,
format: "image/png".to_string(),
data: image_one.clone(),
embedding: None,
},
ContentBlock {
modality: Modality::Audio,
format: "audio/wav".to_string(),
data: audio_one.clone(),
embedding: None,
},
ContentBlock {
modality: Modality::Image,
format: "image/png".to_string(),
data: image_two.clone(),
embedding: None,
},
ContentBlock {
modality: Modality::Audio,
format: "audio/wav".to_string(),
data: audio_two.clone(),
embedding: None,
},
],
summary: Some("multimodal batch".to_string()),
},
store: Some(StoreType::Episodic),
emotion: None,
context: None,
metadata: None,
associations: None,
})
.await
.unwrap();
let record = engine
.introspect_record(RecordIntrospectRequest {
header: None,
record_id: resp.record_id,
include_history: false,
include_associations: false,
include_versions: false,
})
.await
.unwrap();
assert_eq!(record.content.blocks.len(), 6);
assert_eq!(record.content.blocks[0].modality, Modality::Image);
assert_eq!(record.content.blocks[2].modality, Modality::Image);
assert_eq!(record.content.blocks[4].modality, Modality::Text);
assert_eq!(record.content.blocks[5].modality, Modality::Text);
let image_one_embedding = record.content.blocks[0].embedding.as_ref().unwrap();
let image_two_embedding = record.content.blocks[2].embedding.as_ref().unwrap();
assert_eq!(image_one_embedding[0], image_one.len() as f32);
assert_eq!(image_two_embedding[0], image_two.len() as f32);
assert_eq!(
std::str::from_utf8(&record.content.blocks[4].data).unwrap(),
format!("audio-{}", audio_one.len())
);
assert_eq!(
std::str::from_utf8(&record.content.blocks[5].data).unwrap(),
format!("audio-{}", audio_two.len())
);
assert!(record.content.blocks[4].embedding.is_some());
assert!(record.content.blocks[5].embedding.is_some());
}
#[tokio::test]
async fn size_limit_validation() {
let engine = make_engine().await;
let big_text = vec![b'A'; 1_048_577]; let req = EncodeStoreRequest {
header: None,
content: MemoryContent {
blocks: vec![ContentBlock {
modality: Modality::Text,
format: "text/plain".to_string(),
data: big_text,
embedding: None,
}],
summary: None,
},
store: Some(StoreType::Episodic),
emotion: None,
context: None,
metadata: None,
associations: None,
};
let result = engine.encode_store(req).await;
assert!(matches!(
result,
Err(CerememoryError::ContentTooLarge { .. })
));
}
#[tokio::test]
async fn lifecycle_export_returns_bytes_and_response() {
let engine = make_engine().await;
engine
.encode_store(text_store_req("Export me", Some(StoreType::Episodic)))
.await
.unwrap();
let (bytes, resp) = engine
.lifecycle_export(ExportRequest {
header: None,
format: "cma".to_string(),
stores: None,
include_raw_journal: false,
encrypt: false,
encryption_key: None,
})
.await
.unwrap();
assert_eq!(resp.record_count, 1);
assert!(!bytes.is_empty());
}
#[tokio::test]
async fn lifecycle_export_accepts_jsonl_alias() {
let engine = make_engine().await;
engine
.encode_store(text_store_req("Export me too", Some(StoreType::Episodic)))
.await
.unwrap();
let (bytes, resp) = engine
.lifecycle_export(ExportRequest {
header: None,
format: "jsonl".to_string(),
stores: None,
include_raw_journal: false,
encrypt: false,
encryption_key: None,
})
.await
.unwrap();
assert_eq!(resp.record_count, 1);
let records = cerememory_archive::import_records(&bytes).unwrap();
assert_eq!(records.len(), 1);
}
#[tokio::test]
async fn lifecycle_export_rejects_unsupported_format() {
let engine = make_engine().await;
let result = engine
.lifecycle_export(ExportRequest {
header: None,
format: "zip".to_string(),
stores: None,
include_raw_journal: false,
encrypt: false,
encryption_key: None,
})
.await;
assert!(matches!(
result,
Err(CerememoryError::Validation(msg)) if msg.contains("Valid options: cma, jsonl")
));
}
#[tokio::test]
async fn lifecycle_export_store_filter() {
let engine = make_engine().await;
engine
.encode_store(text_store_req("Episodic", Some(StoreType::Episodic)))
.await
.unwrap();
engine
.encode_store(text_store_req("Semantic", Some(StoreType::Semantic)))
.await
.unwrap();
let (_, resp) = engine
.lifecycle_export(ExportRequest {
header: None,
format: "cma".to_string(),
stores: Some(vec![StoreType::Episodic]),
include_raw_journal: false,
encrypt: false,
encryption_key: None,
})
.await
.unwrap();
assert_eq!(resp.record_count, 1);
}
#[tokio::test]
async fn lifecycle_export_store_filter_deduplicates_requested_stores() {
let engine = make_engine().await;
engine
.encode_store(text_store_req("Episodic", Some(StoreType::Episodic)))
.await
.unwrap();
let (bytes, resp) = engine
.lifecycle_export(ExportRequest {
header: None,
format: "cma".to_string(),
stores: Some(vec![StoreType::Episodic, StoreType::Episodic]),
include_raw_journal: false,
encrypt: false,
encryption_key: None,
})
.await
.unwrap();
assert_eq!(resp.record_count, 1);
let records = cerememory_archive::import_records(&bytes).unwrap();
assert_eq!(records.len(), 1);
assert!(records.iter().all(|r| r.store == StoreType::Episodic));
}
#[tokio::test]
async fn lifecycle_export_encrypted_roundtrip() {
let engine = make_engine().await;
engine
.encode_store(text_store_req("Secret data", Some(StoreType::Episodic)))
.await
.unwrap();
let (encrypted_bytes, resp) = engine
.lifecycle_export(ExportRequest {
header: None,
format: "cma".to_string(),
stores: None,
include_raw_journal: false,
encrypt: true,
encryption_key: Some("my-passphrase".to_string()),
})
.await
.unwrap();
assert_eq!(resp.record_count, 1);
let result = engine.import_records(&encrypted_bytes).await;
assert!(result.is_err());
}
#[tokio::test]
async fn lifecycle_export_import_with_raw_journal_roundtrip() {
let engine = make_engine().await;
engine
.encode_store(text_store_req("Curated memory", Some(StoreType::Episodic)))
.await
.unwrap();
engine
.encode_store_raw(raw_text_store_req(
"sess-export-raw",
"Raw transcript note",
RawVisibility::Normal,
SecrecyLevel::Public,
))
.await
.unwrap();
let (bytes, resp) = engine
.lifecycle_export(ExportRequest {
header: None,
format: "cma".to_string(),
stores: None,
include_raw_journal: true,
encrypt: false,
encryption_key: None,
})
.await
.unwrap();
assert_eq!(resp.record_count, 2);
let target = make_engine().await;
let imported = target
.lifecycle_import(ImportRequest {
header: None,
archive_id: "bundle-roundtrip".to_string(),
strategy: ImportStrategy::Merge,
conflict_resolution: ConflictResolution::KeepExisting,
decryption_key: None,
archive_data: Some(bytes),
})
.await
.unwrap();
assert_eq!(imported, 2);
assert_eq!(target.episodic.count().await.unwrap(), 1);
assert_eq!(target.raw_journal_count().await.unwrap(), 1);
}
#[tokio::test]
async fn lifecycle_import_with_archive_data() {
let engine = make_engine().await;
engine
.encode_store(text_store_req("Import me", Some(StoreType::Episodic)))
.await
.unwrap();
let (bytes, _) = engine
.lifecycle_export(ExportRequest {
header: None,
format: "cma".to_string(),
stores: None,
include_raw_journal: false,
encrypt: false,
encryption_key: None,
})
.await
.unwrap();
let engine2 = make_engine().await;
let imported = engine2
.lifecycle_import(ImportRequest {
header: None,
archive_id: "test".to_string(),
strategy: ImportStrategy::Merge,
conflict_resolution: ConflictResolution::KeepExisting,
decryption_key: None,
archive_data: Some(bytes),
})
.await
.unwrap();
assert_eq!(imported, 1);
let stats = engine2.introspect_stats().await.unwrap();
assert_eq!(stats.total_records, 1);
}
#[tokio::test]
async fn import_conflict_keep_existing() {
let engine = make_engine().await;
let resp = engine
.encode_store(text_store_req("Original", Some(StoreType::Episodic)))
.await
.unwrap();
let (bytes, _) = engine
.lifecycle_export(ExportRequest {
header: None,
format: "cma".to_string(),
stores: None,
include_raw_journal: false,
encrypt: false,
encryption_key: None,
})
.await
.unwrap();
let imported = engine
.lifecycle_import(ImportRequest {
header: None,
archive_id: "test".to_string(),
strategy: ImportStrategy::Merge,
conflict_resolution: ConflictResolution::KeepExisting,
decryption_key: None,
archive_data: Some(bytes),
})
.await
.unwrap();
assert_eq!(imported, 0);
let record = engine
.introspect_record(RecordIntrospectRequest {
header: None,
record_id: resp.record_id,
include_history: false,
include_associations: false,
include_versions: false,
})
.await
.unwrap();
assert_eq!(record.text_content(), Some("Original"));
}
#[tokio::test]
async fn import_conflict_keep_imported() {
let engine = make_engine().await;
let resp = engine
.encode_store(text_store_req("Original text", Some(StoreType::Episodic)))
.await
.unwrap();
let (bytes, _) = engine
.lifecycle_export(ExportRequest {
header: None,
format: "cma".to_string(),
stores: None,
include_raw_journal: false,
encrypt: false,
encryption_key: None,
})
.await
.unwrap();
let imported = engine
.lifecycle_import(ImportRequest {
header: None,
archive_id: "test".to_string(),
strategy: ImportStrategy::Merge,
conflict_resolution: ConflictResolution::KeepImported,
decryption_key: None,
archive_data: Some(bytes),
})
.await
.unwrap();
assert_eq!(imported, 1);
let record = engine.get_store_record(&resp.record_id).await.unwrap();
assert!(record.is_some());
}
#[tokio::test]
async fn import_conflict_keep_newer() {
let engine = make_engine().await;
let resp = engine
.encode_store(text_store_req("First version", Some(StoreType::Episodic)))
.await
.unwrap();
let (bytes, _) = engine
.lifecycle_export(ExportRequest {
header: None,
format: "cma".to_string(),
stores: None,
include_raw_journal: false,
encrypt: false,
encryption_key: None,
})
.await
.unwrap();
engine
.encode_update(EncodeUpdateRequest {
header: None,
record_id: resp.record_id,
content: Some(MemoryContent {
blocks: vec![ContentBlock {
modality: Modality::Text,
format: "text/plain".to_string(),
data: b"Updated version".to_vec(),
embedding: None,
}],
summary: None,
}),
emotion: None,
metadata: None,
})
.await
.unwrap();
let imported = engine
.lifecycle_import(ImportRequest {
header: None,
archive_id: "test".to_string(),
strategy: ImportStrategy::Merge,
conflict_resolution: ConflictResolution::KeepNewer,
decryption_key: None,
archive_data: Some(bytes),
})
.await
.unwrap();
assert_eq!(imported, 0);
let record = engine
.introspect_record(RecordIntrospectRequest {
header: None,
record_id: resp.record_id,
include_history: false,
include_associations: false,
include_versions: false,
})
.await
.unwrap();
assert_eq!(record.text_content(), Some("Updated version"));
}
#[tokio::test]
async fn import_encrypted_archive() {
let engine = make_engine().await;
engine
.encode_store(text_store_req(
"Encrypted import",
Some(StoreType::Episodic),
))
.await
.unwrap();
let (encrypted, _) = engine
.lifecycle_export(ExportRequest {
header: None,
format: "cma".to_string(),
stores: None,
include_raw_journal: false,
encrypt: true,
encryption_key: Some("pass123".to_string()),
})
.await
.unwrap();
let engine2 = make_engine().await;
let imported = engine2
.lifecycle_import(ImportRequest {
header: None,
archive_id: "test".to_string(),
strategy: ImportStrategy::Merge,
conflict_resolution: ConflictResolution::KeepExisting,
decryption_key: Some("pass123".to_string()),
archive_data: Some(encrypted),
})
.await
.unwrap();
assert_eq!(imported, 1);
}
#[tokio::test]
async fn import_missing_archive_data() {
let engine = make_engine().await;
let result = engine
.lifecycle_import(ImportRequest {
header: None,
archive_id: "test".to_string(),
strategy: ImportStrategy::Merge,
conflict_resolution: ConflictResolution::KeepExisting,
decryption_key: None,
archive_data: None,
})
.await;
assert!(result.is_err());
let err = format!("{:?}", result.unwrap_err());
assert!(err.contains("archive_data"));
}
#[tokio::test]
async fn lifecycle_export_encrypt_without_key_fails() {
let engine = make_engine().await;
engine
.encode_store(text_store_req("Some data", Some(StoreType::Episodic)))
.await
.unwrap();
let result = engine
.lifecycle_export(ExportRequest {
header: None,
format: "cma".to_string(),
stores: None,
include_raw_journal: false,
encrypt: true,
encryption_key: None,
})
.await;
assert!(result.is_err());
let err = format!("{:?}", result.unwrap_err());
assert!(err.contains("encryption_key is required"));
}
#[tokio::test]
async fn import_conflict_cross_store_keep_imported_count_stays_one() {
let engine = make_engine().await;
let resp = engine
.encode_store(text_store_req(
"Original in episodic",
Some(StoreType::Episodic),
))
.await
.unwrap();
let record_id = resp.record_id;
let (bytes, _) = engine
.lifecycle_export(ExportRequest {
header: None,
format: "cma".to_string(),
stores: None,
include_raw_journal: false,
encrypt: false,
encryption_key: None,
})
.await
.unwrap();
let imported = engine
.lifecycle_import(ImportRequest {
header: None,
archive_id: "test".to_string(),
strategy: ImportStrategy::Merge,
conflict_resolution: ConflictResolution::KeepImported,
decryption_key: None,
archive_data: Some(bytes),
})
.await
.unwrap();
assert_eq!(imported, 1);
let stats = engine.introspect_stats().await.unwrap();
assert_eq!(stats.total_records, 1);
let record = engine.get_store_record(&record_id).await.unwrap();
assert!(record.is_some());
}
#[tokio::test]
async fn import_strategy_replace_replaces_existing_dataset() {
let source = make_engine().await;
let imported_episode = source
.encode_store(text_store_req(
"Imported episodic",
Some(StoreType::Episodic),
))
.await;
let imported_episode_id = imported_episode.unwrap().record_id;
let imported_semantic = source
.encode_store(text_store_req(
"Imported semantic",
Some(StoreType::Semantic),
))
.await
.unwrap()
.record_id;
let (archive_data, _) = source
.lifecycle_export(ExportRequest {
header: None,
format: "cma".to_string(),
stores: None,
include_raw_journal: false,
encrypt: false,
encryption_key: None,
})
.await
.unwrap();
let target = make_engine().await;
let old_id = target
.encode_store(text_store_req("Old episodic", Some(StoreType::Episodic)))
.await
.unwrap()
.record_id;
target
.encode_store(text_store_req("Old working", Some(StoreType::Working)))
.await
.unwrap();
let imported = target
.lifecycle_import(ImportRequest {
header: None,
archive_id: "replace-test".to_string(),
strategy: ImportStrategy::Replace,
conflict_resolution: ConflictResolution::KeepNewer,
decryption_key: None,
archive_data: Some(archive_data),
})
.await
.unwrap();
assert_eq!(imported, 2);
let stats = target.introspect_stats().await.unwrap();
assert_eq!(stats.total_records, 2);
assert_eq!(stats.records_by_store[&StoreType::Episodic], 1);
assert_eq!(stats.records_by_store[&StoreType::Semantic], 1);
assert_eq!(
*stats
.records_by_store
.get(&StoreType::Working)
.unwrap_or(&0),
0
);
assert!(target.get_store_record(&old_id).await.unwrap().is_none());
assert!(target
.get_store_record(&imported_episode_id)
.await
.unwrap()
.is_some());
assert!(target
.get_store_record(&imported_semantic)
.await
.unwrap()
.is_some());
}
#[tokio::test]
async fn import_strategy_replace_preserves_existing_data_on_invalid_archive() {
let engine = make_engine().await;
let old_id = engine
.encode_store(text_store_req("Keep me", Some(StoreType::Episodic)))
.await
.unwrap()
.record_id;
let result = engine
.lifecycle_import(ImportRequest {
header: None,
archive_id: "invalid-replace".to_string(),
strategy: ImportStrategy::Replace,
conflict_resolution: ConflictResolution::KeepNewer,
decryption_key: None,
archive_data: Some(b"not-a-valid-cma-archive".to_vec()),
})
.await;
assert!(result.is_err());
let stats = engine.introspect_stats().await.unwrap();
assert_eq!(stats.total_records, 1);
assert!(engine.get_store_record(&old_id).await.unwrap().is_some());
}
#[tokio::test]
async fn timeline_hour_granularity() {
let engine = make_engine().await;
engine
.encode_store(text_store_req("Morning event", Some(StoreType::Episodic)))
.await
.unwrap();
let now = Utc::now();
let resp = engine
.recall_timeline(RecallTimelineRequest {
header: None,
range: TemporalRange {
start: now - chrono::Duration::hours(1),
end: now + chrono::Duration::hours(1),
},
granularity: TimeGranularity::Hour,
min_fidelity: None,
emotion_filter: None,
})
.await
.unwrap();
assert!(!resp.buckets.is_empty());
let total: u32 = resp.buckets.iter().map(|b| b.count).sum();
assert!(total >= 1);
}
#[tokio::test]
async fn timeline_day_granularity() {
let engine = make_engine().await;
for i in 0..3 {
engine
.encode_store(text_store_req(
&format!("Day event {i}"),
Some(StoreType::Episodic),
))
.await
.unwrap();
}
let now = Utc::now();
let resp = engine
.recall_timeline(RecallTimelineRequest {
header: None,
range: TemporalRange {
start: now - chrono::Duration::days(1),
end: now + chrono::Duration::days(1),
},
granularity: TimeGranularity::Day,
min_fidelity: None,
emotion_filter: None,
})
.await
.unwrap();
let total: u32 = resp.buckets.iter().map(|b| b.count).sum();
assert_eq!(total, 3);
}
#[tokio::test]
async fn timeline_empty_range() {
let engine = make_engine().await;
engine
.encode_store(text_store_req("An event", Some(StoreType::Episodic)))
.await
.unwrap();
let resp = engine
.recall_timeline(RecallTimelineRequest {
header: None,
range: TemporalRange {
start: Utc::now() - chrono::Duration::days(365 * 10),
end: Utc::now() - chrono::Duration::days(365 * 9),
},
granularity: TimeGranularity::Day,
min_fidelity: None,
emotion_filter: None,
})
.await
.unwrap();
assert!(resp.buckets.is_empty());
}
#[tokio::test]
async fn timeline_min_fidelity_filter() {
let engine = make_engine().await;
engine
.encode_store(text_store_req(
"High fidelity event",
Some(StoreType::Episodic),
))
.await
.unwrap();
let now = Utc::now();
let resp = engine
.recall_timeline(RecallTimelineRequest {
header: None,
range: TemporalRange {
start: now - chrono::Duration::hours(1),
end: now + chrono::Duration::hours(1),
},
granularity: TimeGranularity::Hour,
min_fidelity: Some(0.5),
emotion_filter: None,
})
.await
.unwrap();
let total: u32 = resp.buckets.iter().map(|b| b.count).sum();
assert!(total >= 1);
}
#[tokio::test]
async fn timeline_emotion_filter() {
let engine = make_engine().await;
let req = EncodeStoreRequest {
header: None,
content: MemoryContent {
blocks: vec![ContentBlock {
modality: Modality::Text,
format: "text/plain".to_string(),
data: b"Happy event".to_vec(),
embedding: None,
}],
summary: None,
},
store: Some(StoreType::Episodic),
emotion: Some(EmotionVector {
joy: 0.9,
..Default::default()
}),
context: None,
metadata: None,
associations: None,
};
engine.encode_store(req).await.unwrap();
let req2 = EncodeStoreRequest {
header: None,
content: MemoryContent {
blocks: vec![ContentBlock {
modality: Modality::Text,
format: "text/plain".to_string(),
data: b"Sad event".to_vec(),
embedding: None,
}],
summary: None,
},
store: Some(StoreType::Episodic),
emotion: Some(EmotionVector {
sadness: 0.9,
..Default::default()
}),
context: None,
metadata: None,
associations: None,
};
engine.encode_store(req2).await.unwrap();
let now = Utc::now();
let resp = engine
.recall_timeline(RecallTimelineRequest {
header: None,
range: TemporalRange {
start: now - chrono::Duration::hours(1),
end: now + chrono::Duration::hours(1),
},
granularity: TimeGranularity::Hour,
min_fidelity: None,
emotion_filter: Some(EmotionVector {
joy: 1.0,
..Default::default()
}),
})
.await
.unwrap();
let total: u32 = resp.buckets.iter().map(|b| b.count).sum();
assert_eq!(total, 1, "Only the joyful event should match");
}
#[tokio::test]
async fn timeline_multi_store() {
let engine = make_engine().await;
engine
.encode_store(text_store_req("Episodic event", Some(StoreType::Episodic)))
.await
.unwrap();
engine
.encode_store(text_store_req(
"Procedural event",
Some(StoreType::Procedural),
))
.await
.unwrap();
let now = Utc::now();
let resp = engine
.recall_timeline(RecallTimelineRequest {
header: None,
range: TemporalRange {
start: now - chrono::Duration::hours(1),
end: now + chrono::Duration::hours(1),
},
granularity: TimeGranularity::Hour,
min_fidelity: None,
emotion_filter: None,
})
.await
.unwrap();
let total: u32 = resp.buckets.iter().map(|b| b.count).sum();
assert!(total >= 2);
}
#[tokio::test]
async fn graph_centered_traversal() {
let engine = make_engine().await;
let r1 = engine
.encode_store(text_store_req("Node A", Some(StoreType::Episodic)))
.await
.unwrap();
let r2 = engine
.encode_store(text_store_req("Node B", Some(StoreType::Episodic)))
.await
.unwrap();
let assoc = Association {
target_id: r2.record_id,
association_type: AssociationType::Semantic,
weight: 0.9,
created_at: Utc::now(),
last_co_activation: Utc::now(),
};
engine
.coordinator
.add_association(&r1.record_id, assoc)
.await
.unwrap();
let resp = engine
.recall_graph(RecallGraphRequest {
header: None,
center_id: Some(r1.record_id),
depth: 2,
edge_types: None,
limit_nodes: 10,
})
.await
.unwrap();
assert!(resp.nodes.len() >= 2);
assert!(!resp.edges.is_empty());
}
#[tokio::test]
async fn graph_empty() {
let engine = make_engine().await;
let resp = engine
.recall_graph(RecallGraphRequest {
header: None,
center_id: None,
depth: 1,
edge_types: None,
limit_nodes: 10,
})
.await
.unwrap();
assert!(resp.nodes.is_empty());
}
#[tokio::test]
async fn graph_depth_limiting() {
let engine = make_engine().await;
let r1 = engine
.encode_store(text_store_req("Node 1", Some(StoreType::Episodic)))
.await
.unwrap();
let resp = engine
.recall_graph(RecallGraphRequest {
header: None,
center_id: Some(r1.record_id),
depth: 0,
edge_types: None,
limit_nodes: 10,
})
.await
.unwrap();
assert_eq!(resp.nodes.len(), 1);
assert!(resp.edges.is_empty());
}
#[tokio::test]
async fn decay_forecast_future() {
let engine = make_engine().await;
let resp = engine
.encode_store(text_store_req("Forecast me", Some(StoreType::Episodic)))
.await
.unwrap();
let forecast = engine
.introspect_decay_forecast(DecayForecastRequest {
header: None,
record_ids: vec![resp.record_id],
forecast_at: Utc::now() + chrono::Duration::days(30),
})
.await
.unwrap();
assert_eq!(forecast.forecasts.len(), 1);
assert!(forecast.forecasts[0].forecasted_fidelity < forecast.forecasts[0].current_fidelity);
assert!(forecast.forecasts[0].forecasted_fidelity > 0.0);
}
#[tokio::test]
async fn decay_forecast_threshold_date() {
let engine = make_engine().await;
let resp = engine
.encode_store(text_store_req("Will decay", Some(StoreType::Episodic)))
.await
.unwrap();
let forecast = engine
.introspect_decay_forecast(DecayForecastRequest {
header: None,
record_ids: vec![resp.record_id],
forecast_at: Utc::now() + chrono::Duration::days(365),
})
.await
.unwrap();
assert!(forecast.forecasts[0].estimated_threshold_date.is_some());
assert!(forecast.forecasts[0].estimated_threshold_date.unwrap() > Utc::now());
}
#[tokio::test]
async fn decay_forecast_multiple_records() {
let engine = make_engine().await;
let r1 = engine
.encode_store(text_store_req("Record 1", Some(StoreType::Episodic)))
.await
.unwrap();
let r2 = engine
.encode_store(text_store_req("Record 2", Some(StoreType::Semantic)))
.await
.unwrap();
let forecast = engine
.introspect_decay_forecast(DecayForecastRequest {
header: None,
record_ids: vec![r1.record_id, r2.record_id],
forecast_at: Utc::now() + chrono::Duration::days(7),
})
.await
.unwrap();
assert_eq!(forecast.forecasts.len(), 2);
}
#[tokio::test]
async fn decay_forecast_uses_per_record_decay_rate() {
let engine = make_engine().await;
let resp = engine
.encode_store(text_store_req("Test record", Some(StoreType::Episodic)))
.await
.unwrap();
engine
.lifecycle_decay_tick(DecayTickRequest {
header: None,
tick_duration_seconds: Some(3600),
})
.await
.unwrap();
let forecast = engine
.introspect_decay_forecast(DecayForecastRequest {
header: None,
record_ids: vec![resp.record_id],
forecast_at: Utc::now() + chrono::Duration::days(30),
})
.await
.unwrap();
assert_eq!(forecast.forecasts.len(), 1);
assert!(forecast.forecasts[0].forecasted_fidelity > 0.0);
assert!(forecast.forecasts[0].forecasted_fidelity < 1.0);
}
#[tokio::test]
async fn evolution_returns_metrics() {
let engine = make_engine().await;
let metrics = engine.introspect_evolution().await.unwrap();
assert!(metrics.parameter_adjustments.is_empty());
}
#[tokio::test]
async fn evolution_after_decay() {
let engine = make_engine().await;
for i in 0..5 {
engine
.encode_store(text_store_req(
&format!("Record {i}"),
Some(StoreType::Episodic),
))
.await
.unwrap();
}
engine
.lifecycle_decay_tick(DecayTickRequest {
header: None,
tick_duration_seconds: Some(86400),
})
.await
.unwrap();
let metrics = engine.introspect_evolution().await.unwrap();
assert!(metrics.detected_patterns.is_empty() || !metrics.detected_patterns.is_empty());
}
struct MockLLMProvider {
embed_dim: usize,
}
impl MockLLMProvider {
fn new(embed_dim: usize) -> Self {
Self { embed_dim }
}
}
impl LLMProvider for MockLLMProvider {
fn embed(
&self,
text: &str,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = Result<Vec<f32>, CerememoryError>> + Send + '_>,
> {
let dim = self.embed_dim;
let hash = text.len() as f32;
Box::pin(async move {
let mut v = vec![0.0f32; dim];
v[0] = hash;
v[1] = 1.0;
Ok(v)
})
}
fn summarize(
&self,
texts: &[String],
max_tokens: usize,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = Result<String, CerememoryError>> + Send + '_>,
> {
let joined = texts.join("; ");
let truncated = if joined.len() > max_tokens {
format!("{}...", truncate_str(&joined, max_tokens))
} else {
joined
};
Box::pin(async move { Ok(truncated) })
}
fn extract_relations(
&self,
text: &str,
) -> std::pin::Pin<
Box<
dyn std::future::Future<Output = Result<Vec<ExtractedRelation>, CerememoryError>>
+ Send
+ '_,
>,
> {
let has_content = !text.is_empty();
Box::pin(async move {
if has_content {
Ok(vec![ExtractedRelation {
subject: "test".to_string(),
predicate: "is_a".to_string(),
object: "mock".to_string(),
confidence: 0.9,
}])
} else {
Ok(Vec::new())
}
})
}
fn embed_image(
&self,
data: &[u8],
_format: &str,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = Result<Vec<f32>, CerememoryError>> + Send + '_>,
> {
let dim = self.embed_dim;
let hash = data.len() as f32;
Box::pin(async move {
let mut v = vec![0.0f32; dim];
v[0] = hash;
v[1] = 2.0;
Ok(v)
})
}
fn transcribe_audio(
&self,
data: &[u8],
_format: &str,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = Result<String, CerememoryError>> + Send + '_>,
> {
let transcript = format!("audio-{}", data.len());
Box::pin(async move { Ok(transcript) })
}
fn capabilities(&self) -> ProviderCapabilities {
ProviderCapabilities {
text_embedding: true,
image_embedding: true,
audio_transcription: true,
}
}
}
#[tokio::test]
async fn engine_auto_embed_with_provider() {
let provider = Arc::new(MockLLMProvider::new(4));
let engine = CerememoryEngine::new(EngineConfig {
llm_provider: Some(provider),
..Default::default()
})
.unwrap();
let resp = engine
.encode_store(text_store_req("Auto embed me", Some(StoreType::Episodic)))
.await
.unwrap();
let record = engine
.introspect_record(RecordIntrospectRequest {
header: None,
record_id: resp.record_id,
include_history: false,
include_associations: false,
include_versions: false,
})
.await
.unwrap();
assert!(record.content.blocks[0].embedding.is_some());
let emb = record.content.blocks[0].embedding.as_ref().unwrap();
assert_eq!(emb.len(), 4);
}
#[tokio::test]
async fn engine_auto_embeds_all_image_blocks() {
let provider = Arc::new(MockLLMProvider::new(4));
let engine = CerememoryEngine::new(EngineConfig {
llm_provider: Some(provider),
..Default::default()
})
.unwrap();
let resp = engine
.encode_store(EncodeStoreRequest {
header: None,
content: MemoryContent {
blocks: vec![
ContentBlock {
modality: Modality::Image,
format: "image/png".to_string(),
data: vec![1; 8],
embedding: None,
},
ContentBlock {
modality: Modality::Image,
format: "image/png".to_string(),
data: vec![2; 13],
embedding: None,
},
],
summary: None,
},
store: Some(StoreType::Episodic),
emotion: None,
context: None,
metadata: None,
associations: None,
})
.await
.unwrap();
let record = engine
.introspect_record(RecordIntrospectRequest {
header: None,
record_id: resp.record_id,
include_history: false,
include_associations: false,
include_versions: false,
})
.await
.unwrap();
assert_eq!(record.content.blocks.len(), 2);
assert_eq!(record.content.blocks[0].modality, Modality::Image);
assert_eq!(record.content.blocks[1].modality, Modality::Image);
assert_eq!(record.content.blocks[0].embedding.as_ref().unwrap()[0], 8.0);
assert_eq!(
record.content.blocks[1].embedding.as_ref().unwrap()[0],
13.0
);
assert_eq!(record.content.blocks[0].embedding.as_ref().unwrap()[1], 2.0);
assert_eq!(record.content.blocks[1].embedding.as_ref().unwrap()[1], 2.0);
}
#[tokio::test]
async fn engine_transcribes_all_audio_blocks_in_order() {
let provider = Arc::new(MockLLMProvider::new(4));
let engine = CerememoryEngine::new(EngineConfig {
llm_provider: Some(provider),
..Default::default()
})
.unwrap();
let resp = engine
.encode_store(EncodeStoreRequest {
header: None,
content: MemoryContent {
blocks: vec![
ContentBlock {
modality: Modality::Audio,
format: "audio/wav".to_string(),
data: vec![0; 12],
embedding: None,
},
ContentBlock {
modality: Modality::Audio,
format: "audio/wav".to_string(),
data: vec![1; 123],
embedding: None,
},
],
summary: None,
},
store: Some(StoreType::Episodic),
emotion: None,
context: None,
metadata: None,
associations: None,
})
.await
.unwrap();
let record = engine
.introspect_record(RecordIntrospectRequest {
header: None,
record_id: resp.record_id,
include_history: false,
include_associations: false,
include_versions: false,
})
.await
.unwrap();
assert_eq!(record.content.blocks.len(), 4);
assert_eq!(record.content.blocks[0].modality, Modality::Audio);
assert_eq!(record.content.blocks[1].modality, Modality::Audio);
assert_eq!(record.content.blocks[2].modality, Modality::Text);
assert_eq!(record.content.blocks[3].modality, Modality::Text);
assert_eq!(
std::str::from_utf8(&record.content.blocks[2].data).unwrap(),
"audio-12"
);
assert_eq!(
std::str::from_utf8(&record.content.blocks[3].data).unwrap(),
"audio-123"
);
assert_eq!(
record.content.blocks[2].embedding.as_ref().unwrap()[0],
"audio-12".len() as f32
);
assert_eq!(
record.content.blocks[3].embedding.as_ref().unwrap()[0],
"audio-123".len() as f32
);
}
#[tokio::test]
async fn engine_no_provider_passthrough() {
let engine = make_engine().await;
let resp = engine
.encode_store(text_store_req("No provider", Some(StoreType::Episodic)))
.await
.unwrap();
let record = engine
.introspect_record(RecordIntrospectRequest {
header: None,
record_id: resp.record_id,
include_history: false,
include_associations: false,
include_versions: false,
})
.await
.unwrap();
assert!(record.content.blocks[0].embedding.is_none());
}
#[tokio::test]
async fn engine_existing_embedding_not_overwritten() {
let provider = Arc::new(MockLLMProvider::new(4));
let engine = CerememoryEngine::new(EngineConfig {
llm_provider: Some(provider),
..Default::default()
})
.unwrap();
let req = EncodeStoreRequest {
header: None,
content: MemoryContent {
blocks: vec![ContentBlock {
modality: Modality::Text,
format: "text/plain".to_string(),
data: b"Has embedding".to_vec(),
embedding: Some(vec![9.0, 9.0, 9.0]),
}],
summary: None,
},
store: Some(StoreType::Episodic),
emotion: None,
context: None,
metadata: None,
associations: None,
};
let resp = engine.encode_store(req).await.unwrap();
let record = engine
.introspect_record(RecordIntrospectRequest {
header: None,
record_id: resp.record_id,
include_history: false,
include_associations: false,
include_versions: false,
})
.await
.unwrap();
let emb = record.content.blocks[0].embedding.as_ref().unwrap();
assert_eq!(emb, &vec![9.0, 9.0, 9.0]);
}
#[tokio::test]
async fn noop_provider_embed_returns_empty() {
let provider = NoOpProvider;
let result = provider.embed("test").await;
assert!(result.unwrap().is_empty());
}
#[tokio::test]
async fn noop_provider_summarize_concatenates() {
let provider = NoOpProvider;
let texts = vec!["hello".to_string(), "world".to_string()];
let result = provider.summarize(&texts, 100).await.unwrap();
assert_eq!(result, "hello world");
}
#[tokio::test]
async fn noop_provider_extract_relations_empty() {
let provider = NoOpProvider;
let result = provider.extract_relations("test").await.unwrap();
assert!(result.is_empty());
}
#[tokio::test]
async fn noop_provider_capabilities_are_disabled() {
let provider = NoOpProvider;
let caps = provider.capabilities();
assert!(!caps.text_embedding);
assert!(!caps.image_embedding);
assert!(!caps.audio_transcription);
}
#[tokio::test]
async fn mock_provider_embed_roundtrip() {
let provider = MockLLMProvider::new(8);
let result = provider.embed("hello").await.unwrap();
assert_eq!(result.len(), 8);
assert!(result[0] > 0.0); }
#[tokio::test]
async fn mock_provider_summarize() {
let provider = MockLLMProvider::new(4);
let texts = vec!["one".to_string(), "two".to_string()];
let result = provider.summarize(&texts, 100).await.unwrap();
assert!(result.contains("one"));
assert!(result.contains("two"));
}
#[tokio::test]
async fn mock_provider_extract_relations() {
let provider = MockLLMProvider::new(4);
let result = provider.extract_relations("some text").await.unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result[0].predicate, "is_a");
}
#[tokio::test]
async fn auto_embed_enables_vector_search() {
let provider = Arc::new(MockLLMProvider::new(4));
let engine = CerememoryEngine::new(EngineConfig {
llm_provider: Some(provider),
..Default::default()
})
.unwrap();
engine
.encode_store(text_store_req("Searchable text", Some(StoreType::Episodic)))
.await
.unwrap();
let query = RecallQueryRequest {
header: None,
cue: RecallCue {
embedding: Some(vec!["Searchable text".len() as f32, 1.0, 0.0, 0.0]),
..Default::default()
},
stores: None,
limit: 10,
min_fidelity: None,
include_decayed: false,
reconsolidate: false,
activation_depth: 0,
recall_mode: RecallMode::Perfect,
};
let resp = engine.recall_query(query).await.unwrap();
assert!(!resp.memories.is_empty());
}
#[tokio::test]
async fn image_recall_cue_uses_provider_embedding() {
let provider = Arc::new(MockLLMProvider::new(4));
let engine = CerememoryEngine::new(EngineConfig {
llm_provider: Some(provider),
..Default::default()
})
.unwrap();
let image_bytes = vec![0x89, b'P', b'N', b'G', 0x0D, 0x0A, 0x1A, 0x0A, 1, 2, 3, 4];
let resp = engine
.encode_store(EncodeStoreRequest {
header: None,
content: MemoryContent {
blocks: vec![ContentBlock {
modality: Modality::Image,
format: "image/png".to_string(),
data: image_bytes.clone(),
embedding: None,
}],
summary: None,
},
store: Some(StoreType::Episodic),
emotion: None,
context: None,
metadata: None,
associations: None,
})
.await
.unwrap();
let recalled = engine
.recall_query(RecallQueryRequest {
header: None,
cue: RecallCue {
image: Some(image_bytes),
..Default::default()
},
stores: None,
limit: 10,
min_fidelity: None,
include_decayed: false,
reconsolidate: false,
activation_depth: 0,
recall_mode: RecallMode::Perfect,
})
.await
.unwrap();
assert_eq!(recalled.memories.len(), 1);
assert_eq!(recalled.memories[0].record.id, resp.record_id);
}
#[tokio::test]
async fn audio_recall_cue_uses_transcription() {
let provider = Arc::new(MockLLMProvider::new(4));
let engine = CerememoryEngine::new(EngineConfig {
llm_provider: Some(provider),
..Default::default()
})
.unwrap();
let audio_bytes = b"RIFFabcdWAVErest".to_vec();
let resp = engine
.encode_store(EncodeStoreRequest {
header: None,
content: MemoryContent {
blocks: vec![ContentBlock {
modality: Modality::Audio,
format: "audio/wav".to_string(),
data: audio_bytes.clone(),
embedding: None,
}],
summary: None,
},
store: Some(StoreType::Episodic),
emotion: None,
context: None,
metadata: None,
associations: None,
})
.await
.unwrap();
let recalled = engine
.recall_query(RecallQueryRequest {
header: None,
cue: RecallCue {
audio: Some(audio_bytes),
..Default::default()
},
stores: None,
limit: 10,
min_fidelity: None,
include_decayed: false,
reconsolidate: false,
activation_depth: 0,
recall_mode: RecallMode::Perfect,
})
.await
.unwrap();
assert_eq!(recalled.memories.len(), 1);
assert_eq!(recalled.memories[0].record.id, resp.record_id);
}
#[tokio::test]
async fn consolidation_basic_migration() {
let engine = make_engine().await;
for i in 0..3 {
engine
.encode_store(text_store_req(
&format!("Record {i}"),
Some(StoreType::Episodic),
))
.await
.unwrap();
}
let resp = engine
.lifecycle_consolidate(ConsolidateRequest {
header: None,
strategy: ConsolidationStrategy::Incremental,
min_age_hours: 0,
min_access_count: 0,
dry_run: false,
})
.await
.unwrap();
assert_eq!(resp.records_processed, 3);
assert_eq!(resp.records_migrated, 3);
assert_eq!(resp.semantic_nodes_created, 3);
}
#[tokio::test]
async fn consolidation_dry_run() {
let engine = make_engine().await;
engine
.encode_store(text_store_req("Dry run test", Some(StoreType::Episodic)))
.await
.unwrap();
let resp = engine
.lifecycle_consolidate(ConsolidateRequest {
header: None,
strategy: ConsolidationStrategy::Incremental,
min_age_hours: 0,
min_access_count: 0,
dry_run: true,
})
.await
.unwrap();
assert_eq!(resp.records_migrated, 1);
assert_eq!(engine.semantic.count().await.unwrap(), 0);
}
#[tokio::test]
async fn consolidation_with_llm_summarization() {
let provider = Arc::new(MockLLMProvider::new(4));
let engine = CerememoryEngine::new(EngineConfig {
llm_provider: Some(provider),
..Default::default()
})
.unwrap();
engine
.encode_store(text_store_req(
"A very long piece of text that needs summarization for consolidation",
Some(StoreType::Episodic),
))
.await
.unwrap();
let resp = engine
.lifecycle_consolidate(ConsolidateRequest {
header: None,
strategy: ConsolidationStrategy::Incremental,
min_age_hours: 0,
min_access_count: 0,
dry_run: false,
})
.await
.unwrap();
assert_eq!(resp.records_migrated, 1);
let sem_ids = engine.semantic.list_ids().await.unwrap();
assert_eq!(sem_ids.len(), 1);
let sem_record = engine.semantic.get(&sem_ids[0]).await.unwrap().unwrap();
assert!(sem_record.content.summary.is_some());
}
#[tokio::test]
async fn consolidation_with_relation_extraction() {
let provider = Arc::new(MockLLMProvider::new(4));
let engine = CerememoryEngine::new(EngineConfig {
llm_provider: Some(provider),
..Default::default()
})
.unwrap();
engine
.encode_store(text_store_req(
"Cats are mammals",
Some(StoreType::Episodic),
))
.await
.unwrap();
engine
.lifecycle_consolidate(ConsolidateRequest {
header: None,
strategy: ConsolidationStrategy::Incremental,
min_age_hours: 0,
min_access_count: 0,
dry_run: false,
})
.await
.unwrap();
let sem_ids = engine.semantic.list_ids().await.unwrap();
let sem_record = engine.semantic.get(&sem_ids[0]).await.unwrap().unwrap();
if let serde_json::Value::Object(ref map) = sem_record.metadata {
let relations = map.get("extracted_relations");
assert!(relations.is_some());
if let Some(serde_json::Value::Array(arr)) = relations {
assert!(!arr.is_empty());
}
}
}
#[tokio::test]
async fn consolidation_no_llm_fallback() {
let engine = make_engine().await;
engine
.encode_store(text_store_req(
"Short text without LLM",
Some(StoreType::Episodic),
))
.await
.unwrap();
let resp = engine
.lifecycle_consolidate(ConsolidateRequest {
header: None,
strategy: ConsolidationStrategy::Incremental,
min_age_hours: 0,
min_access_count: 0,
dry_run: false,
})
.await
.unwrap();
assert_eq!(resp.records_migrated, 1);
let sem_ids = engine.semantic.list_ids().await.unwrap();
let sem_record = engine.semantic.get(&sem_ids[0]).await.unwrap().unwrap();
assert!(sem_record.content.summary.is_some());
assert_eq!(
sem_record.content.summary.as_deref(),
Some("Short text without LLM")
);
}
#[tokio::test]
async fn consolidation_compression_metrics() {
let engine = make_engine().await;
for i in 0..5 {
engine
.encode_store(text_store_req(
&format!("Test {i}"),
Some(StoreType::Episodic),
))
.await
.unwrap();
}
let resp = engine
.lifecycle_consolidate(ConsolidateRequest {
header: None,
strategy: ConsolidationStrategy::Full,
min_age_hours: 0,
min_access_count: 0,
dry_run: false,
})
.await
.unwrap();
assert_eq!(resp.records_processed, 5);
assert!(resp.records_migrated > 0);
assert!(resp.records_processed > 0);
}
#[tokio::test]
async fn consolidation_min_age_filter() {
let engine = make_engine().await;
engine
.encode_store(text_store_req("Fresh record", Some(StoreType::Episodic)))
.await
.unwrap();
let resp = engine
.lifecycle_consolidate(ConsolidateRequest {
header: None,
strategy: ConsolidationStrategy::Incremental,
min_age_hours: 24, min_access_count: 0,
dry_run: false,
})
.await
.unwrap();
assert_eq!(resp.records_migrated, 0);
}
#[tokio::test]
async fn consolidation_preserves_associations() {
let engine = make_engine().await;
let r1 = engine
.encode_store(text_store_req("Memory A", Some(StoreType::Episodic)))
.await
.unwrap();
let r2 = engine
.encode_store(text_store_req("Memory B", Some(StoreType::Episodic)))
.await
.unwrap();
let assoc = Association {
target_id: r2.record_id,
association_type: AssociationType::Semantic,
weight: 0.8,
created_at: Utc::now(),
last_co_activation: Utc::now(),
};
engine
.coordinator
.add_association(&r1.record_id, assoc)
.await
.unwrap();
engine
.lifecycle_consolidate(ConsolidateRequest {
header: None,
strategy: ConsolidationStrategy::Incremental,
min_age_hours: 0,
min_access_count: 0,
dry_run: false,
})
.await
.unwrap();
let assocs = engine
.coordinator
.get_associations(&r1.record_id)
.await
.unwrap();
assert!(assocs.len() >= 2); }
#[tokio::test]
async fn duplicate_detection_with_embeddings() {
let provider = Arc::new(MockLLMProvider::new(4));
let engine = CerememoryEngine::new(EngineConfig {
llm_provider: Some(provider),
..Default::default()
})
.unwrap();
engine
.encode_store(text_store_req("Hello world", Some(StoreType::Episodic)))
.await
.unwrap();
engine
.encode_store(text_store_req("Hello worlds", Some(StoreType::Episodic)))
.await
.unwrap();
let initial_count = engine.episodic.count().await.unwrap();
let resp = engine
.lifecycle_consolidate(ConsolidateRequest {
header: None,
strategy: ConsolidationStrategy::Full,
min_age_hours: 0,
min_access_count: 0,
dry_run: false,
})
.await
.unwrap();
assert!(resp.records_processed > 0 || initial_count > 0);
}
}