use crate::memory_core::decay::DecayConfig;
use crate::memory_core::embed::Embedder;
use crate::memory_core::palace::Drawer;
use crate::memory_core::retrieval::{PalaceHandle, shared_embedder};
use crate::memory_core::store::vector::VectorStore;
use anyhow::{Context, Result};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use uuid::Uuid;
#[derive(Debug, Clone)]
pub struct DreamConfig {
pub idle_secs: u64,
pub dedup_threshold: f32,
pub prune_importance: f32,
pub max_cycle_ms: u64,
}
impl Default for DreamConfig {
fn default() -> Self {
Self {
idle_secs: 300,
dedup_threshold: 0.95,
prune_importance: 0.05,
max_cycle_ms: 60_000,
}
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct DreamStats {
pub merged: usize,
pub pruned: usize,
pub closets_updated: usize,
pub compacted: usize,
pub duration_ms: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PersistedDreamStats {
pub last_run_at: chrono::DateTime<chrono::Utc>,
#[serde(flatten)]
pub stats: DreamStats,
}
impl PersistedDreamStats {
pub const FILE_NAME: &'static str = "dream_stats.json";
pub fn load(data_dir: &Path) -> Result<Option<Self>> {
let path = data_dir.join(Self::FILE_NAME);
if !path.exists() {
return Ok(None);
}
let raw =
std::fs::read_to_string(&path).with_context(|| format!("read {}", path.display()))?;
let parsed: Self =
serde_json::from_str(&raw).with_context(|| format!("parse {}", path.display()))?;
Ok(Some(parsed))
}
pub fn save(&self, data_dir: &Path) -> Result<()> {
let path = data_dir.join(Self::FILE_NAME);
let raw = serde_json::to_string_pretty(self).context("serialize dream stats")?;
std::fs::write(&path, raw).with_context(|| format!("write {}", path.display()))?;
Ok(())
}
}
pub struct Dreamer {
pub config: DreamConfig,
last_activity: Arc<AtomicU64>,
}
impl Dreamer {
pub fn new(config: DreamConfig) -> Self {
Self {
config,
last_activity: Arc::new(AtomicU64::new(now_secs())),
}
}
pub fn touch(&self) {
self.last_activity.store(now_secs(), Ordering::Relaxed);
}
pub fn is_idle(&self) -> bool {
let last = self.last_activity.load(Ordering::Relaxed);
now_secs().saturating_sub(last) >= self.config.idle_secs
}
pub fn start(self: Arc<Self>, handle: Arc<PalaceHandle>) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let interval = Duration::from_secs(self.config.idle_secs.max(1));
loop {
tokio::time::sleep(interval).await;
if !self.is_idle() {
continue;
}
match self.dream_cycle(&handle).await {
Ok(stats) => tracing::info!(
palace = %handle.id,
merged = stats.merged,
pruned = stats.pruned,
compacted = stats.compacted,
closets_updated = stats.closets_updated,
duration_ms = stats.duration_ms,
"dream cycle complete"
),
Err(e) => tracing::warn!(palace = %handle.id, "dream cycle failed: {e:#}"),
}
}
})
}
pub fn start_with_shutdown(
self: Arc<Self>,
handle: Arc<PalaceHandle>,
mut shutdown: tokio::sync::watch::Receiver<bool>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let interval = Duration::from_secs(self.config.idle_secs.max(1));
loop {
tokio::select! {
_ = tokio::time::sleep(interval) => {}
res = shutdown.changed() => {
if res.is_err() || *shutdown.borrow() {
tracing::info!(palace = %handle.id, "dreamer shutting down");
return;
}
}
}
if *shutdown.borrow() {
tracing::info!(palace = %handle.id, "dreamer shutting down");
return;
}
if !self.is_idle() {
continue;
}
match self.dream_cycle(&handle).await {
Ok(stats) => tracing::info!(
palace = %handle.id,
merged = stats.merged,
pruned = stats.pruned,
compacted = stats.compacted,
closets_updated = stats.closets_updated,
duration_ms = stats.duration_ms,
"dream cycle complete"
),
Err(e) => tracing::warn!(palace = %handle.id, "dream cycle failed: {e:#}"),
}
}
})
}
pub async fn dream_cycle(&self, handle: &Arc<PalaceHandle>) -> Result<DreamStats> {
let started = std::time::Instant::now();
let budget = Duration::from_millis(self.config.max_cycle_ms);
let merged = self
.dedup_pass(handle, started, budget)
.await
.context("dream dedup pass")?;
let pruned = self
.prune_pass(handle, started, budget)
.await
.context("dream prune pass")?;
let compacted = self
.compact_pass(handle, started, budget)
.await
.context("dream compact pass")?;
let closets_updated = self.refresh_closets(handle);
if let Err(e) = handle.flush() {
tracing::warn!("dream flush failed: {e:#}");
}
let stats = DreamStats {
merged,
pruned,
closets_updated,
compacted,
duration_ms: started.elapsed().as_millis() as u64,
};
match handle.kg.checkpoint() {
Ok((wal, done)) => {
tracing::debug!(
palace = %handle.id,
wal_pages = wal,
checkpointed = done,
"WAL checkpoint complete"
);
}
Err(e) => {
tracing::warn!(
palace = %handle.id,
error = %e,
"WAL checkpoint failed (non-fatal)"
);
}
}
if let Some(data_dir) = handle.data_dir.as_ref() {
let persisted = PersistedDreamStats {
last_run_at: chrono::Utc::now(),
stats: stats.clone(),
};
if let Err(e) = persisted.save(data_dir) {
tracing::warn!(palace = %handle.id, "persist dream_stats.json failed: {e:#}");
}
}
Ok(stats)
}
async fn compact_pass(
&self,
handle: &Arc<PalaceHandle>,
started: std::time::Instant,
budget: Duration,
) -> Result<usize> {
let drawer_ids: std::collections::HashSet<Uuid> =
handle.drawers.read().iter().map(|d| d.id).collect();
let vector_ids = handle.vector_store.all_ids();
let mut removed: usize = 0;
for vid in vector_ids {
if started.elapsed() >= budget {
break;
}
if drawer_ids.contains(&vid) {
continue;
}
match handle.vector_store.remove(vid).await {
Ok(()) => removed += 1,
Err(e) => tracing::warn!(?vid, "dream compact: vector remove failed: {e:#}"),
}
}
let drawer_count = drawer_ids.len();
let index_size_after = handle.vector_store.index_size();
if drawer_count > 0 && index_size_after > drawer_count.saturating_mul(2) + 1 {
let rebuilt = rebuild_index_from_drawers(handle, started, budget)
.await
.context("dream compact rebuild")?;
let delta = index_size_after.saturating_sub(rebuilt);
removed = removed.saturating_add(delta);
}
Ok(removed)
}
async fn dedup_pass(
&self,
handle: &Arc<PalaceHandle>,
started: std::time::Instant,
budget: Duration,
) -> Result<usize> {
let snapshot: Vec<Drawer> = handle.drawers.read().clone();
if snapshot.len() < 2 {
return Ok(0);
}
let embedder = shared_embedder()
.await
.context("acquire shared embedder for dream dedup")?;
let contents: Vec<String> = snapshot.iter().map(|d| d.content.clone()).collect();
let vectors = embedder
.embed_batch(&contents)
.await
.context("batch embed drawers for dream dedup")?;
if vectors.len() != snapshot.len() {
anyhow::bail!(
"embedder returned {} vectors for {} drawers",
vectors.len(),
snapshot.len()
);
}
let mut merges: usize = 0;
let mut already_removed: std::collections::HashSet<Uuid> = std::collections::HashSet::new();
for (drawer, query_vec) in snapshot.iter().zip(vectors.iter()) {
if started.elapsed() >= budget {
break;
}
if already_removed.contains(&drawer.id) {
continue;
}
let hits = handle.vector_store.search(query_vec, 3).await?;
for hit in hits.into_iter() {
if hit.drawer_id == drawer.id || already_removed.contains(&hit.drawer_id) {
continue;
}
if hit.score < self.config.dedup_threshold {
continue;
}
let Some(hit_drawer) = snapshot.iter().find(|d| d.id == hit.drawer_id) else {
continue;
};
let (survivor, loser) = if drawer.importance >= hit_drawer.importance {
(drawer.clone(), hit_drawer.clone())
} else {
(hit_drawer.clone(), drawer.clone())
};
merge_into(handle, &survivor, &loser);
let _ = handle.forget(loser.id).await;
already_removed.insert(loser.id);
merges += 1;
break;
}
}
Ok(merges)
}
async fn prune_pass(
&self,
handle: &Arc<PalaceHandle>,
started: std::time::Instant,
budget: Duration,
) -> Result<usize> {
const MIN_AGE_DAYS: f32 = 30.0;
let snapshot: Vec<Drawer> = handle.drawers.read().clone();
let mut victims: Vec<Uuid> = Vec::new();
for drawer in snapshot.iter() {
if started.elapsed() >= budget {
break;
}
let age = DecayConfig::age_days(drawer.created_at);
let boost = drawer.accumulated_boost(&handle.decay_config);
let eff = handle
.decay_config
.effective_importance(drawer.importance, age, boost);
if eff <= self.config.prune_importance && age > MIN_AGE_DAYS {
victims.push(drawer.id);
}
}
let count = victims.len();
for id in victims {
let _ = handle.forget(id).await;
}
Ok(count)
}
fn refresh_closets(&self, handle: &Arc<PalaceHandle>) -> usize {
let snapshot: Vec<Drawer> = handle.drawers.read().clone();
let mut new_index: HashMap<String, Vec<Uuid>> = HashMap::new();
for drawer in snapshot.iter() {
for kw in extract_keywords(&drawer.content) {
new_index.entry(kw).or_default().push(drawer.id);
}
}
let count = new_index.len();
let mut closets = handle.closets.write();
*closets = new_index;
count
}
}
async fn rebuild_index_from_drawers(
handle: &Arc<PalaceHandle>,
started: std::time::Instant,
budget: Duration,
) -> Result<usize> {
let snapshot: Vec<Drawer> = handle.drawers.read().clone();
handle
.vector_store
.reset()
.context("reset vector index for rebuild")?;
if snapshot.is_empty() {
return Ok(0);
}
let embedder = shared_embedder()
.await
.context("acquire shared embedder for dream rebuild")?;
let mut rebuilt: usize = 0;
for drawer in snapshot.iter() {
if started.elapsed() >= budget {
break;
}
let vecs = embedder
.embed_batch(std::slice::from_ref(&drawer.content))
.await
.with_context(|| format!("re-embed drawer {}", drawer.id))?;
if let Some(v) = vecs.into_iter().next() {
handle
.vector_store
.upsert(drawer.id, v)
.await
.with_context(|| format!("re-upsert drawer {}", drawer.id))?;
rebuilt += 1;
}
}
Ok(rebuilt)
}
fn merge_into(handle: &Arc<PalaceHandle>, survivor: &Drawer, loser: &Drawer) {
let mut drawers = handle.drawers.write();
if let Some(target) = drawers.iter_mut().find(|d| d.id == survivor.id) {
let mut combined = target.content.clone();
combined.push_str("\n\nAlso: ");
combined.push_str(&loser.content);
if combined.len() > 500 {
combined.truncate(500);
}
target.content = combined;
target.importance = target.importance.max(loser.importance);
for tag in &loser.tags {
if !target.tags.contains(tag) {
target.tags.push(tag.clone());
}
}
}
}
const STOP_WORDS: &[&str] = &[
"the", "a", "an", "is", "are", "was", "were", "be", "been", "being", "of", "in", "on", "at",
"to", "for", "with", "and", "or", "but", "not", "no", "yes", "i", "you", "he", "she", "it",
"we", "they", "this", "that", "these", "those", "as", "by", "from", "into", "over", "under",
"if", "then", "than", "so", "do", "does", "did", "have", "has", "had", "will", "would",
"shall", "should", "can", "could", "may", "might", "must", "about", "any", "all", "some",
"more", "most", "such",
];
pub fn extract_keywords(content: &str) -> Vec<String> {
let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
let mut out: Vec<String> = Vec::new();
for raw in content.split_whitespace() {
let token: String = raw
.chars()
.filter(|c| c.is_alphanumeric())
.flat_map(|c| c.to_lowercase())
.collect();
if token.len() < 3 {
continue;
}
if STOP_WORDS.iter().any(|s| *s == token) {
continue;
}
if seen.insert(token.clone()) {
out.push(token);
}
}
out
}
fn now_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
}
#[allow(dead_code)]
type _PalaceHandleRef = RwLock<()>;
#[cfg(test)]
mod tests {
use super::*;
use crate::memory_core::palace::{Palace, PalaceId, RoomType};
use crate::memory_core::retrieval::PalaceHandle;
use chrono::{Duration as ChronoDuration, Utc};
use tempfile::tempdir;
#[test]
fn dream_config_defaults() {
let cfg = DreamConfig::default();
assert_eq!(cfg.idle_secs, 300);
assert!((cfg.dedup_threshold - 0.95).abs() < 1e-6);
assert!((cfg.prune_importance - 0.05).abs() < 1e-6);
assert_eq!(cfg.max_cycle_ms, 60_000);
}
#[test]
fn dreamer_touch_resets_idle() {
let dreamer = Dreamer::new(DreamConfig {
idle_secs: 2,
..DreamConfig::default()
});
assert!(!dreamer.is_idle(), "fresh dreamer should not be idle yet");
dreamer
.last_activity
.store(now_secs().saturating_sub(10), Ordering::Relaxed);
assert!(dreamer.is_idle(), "should be idle after 10s simulated wait");
dreamer.touch();
assert!(!dreamer.is_idle(), "touch should reset idle clock");
}
async fn open_test_handle(name: &str) -> Arc<PalaceHandle> {
let dir = tempdir().unwrap();
let palace = Palace {
id: PalaceId::new(name),
name: name.into(),
description: None,
created_at: Utc::now(),
data_dir: dir.path().join(name),
};
std::fs::create_dir_all(&palace.data_dir).unwrap();
let handle = PalaceHandle::open(&palace).unwrap();
std::mem::forget(dir);
handle
}
#[tokio::test]
async fn dream_cycle_merges_duplicates() {
let handle = open_test_handle("dream-merge").await;
handle
.remember(
"Rust uses HNSW for vector search".into(),
RoomType::Backend,
vec!["rust".into()],
0.7,
)
.await
.unwrap();
handle
.remember(
"Rust uses HNSW for vector search".into(),
RoomType::Backend,
vec!["rust".into()],
0.6,
)
.await
.unwrap();
assert_eq!(handle.drawers.read().len(), 2);
let dreamer = Dreamer::new(DreamConfig::default());
let stats = dreamer.dream_cycle(&handle).await.unwrap();
assert_eq!(stats.merged, 1, "expected exactly one merge");
assert_eq!(handle.drawers.read().len(), 1, "expected dedup to 1 drawer");
}
#[tokio::test]
async fn dream_cycle_prunes_low_importance() {
let handle = open_test_handle("dream-prune").await;
handle
.remember(
"very stale fact nobody cares about".into(),
RoomType::General,
vec![],
0.01,
)
.await
.unwrap();
{
let mut drawers = handle.drawers.write();
for d in drawers.iter_mut() {
d.created_at = Utc::now() - ChronoDuration::days(60);
}
}
assert_eq!(handle.drawers.read().len(), 1);
let dreamer = Dreamer::new(DreamConfig::default());
let stats = dreamer.dream_cycle(&handle).await.unwrap();
assert_eq!(stats.pruned, 1, "expected exactly one prune");
assert!(
handle.drawers.read().is_empty(),
"low-importance aged drawer should be removed"
);
}
#[tokio::test]
async fn dream_cycle_prunes_at_floor_importance() {
let handle = open_test_handle("dream-prune-floor").await;
handle
.remember(
"drawer that decays to the floor".into(),
RoomType::General,
vec![],
0.05,
)
.await
.unwrap();
{
let mut drawers = handle.drawers.write();
for d in drawers.iter_mut() {
d.created_at = Utc::now() - ChronoDuration::days(60);
}
}
assert_eq!(handle.drawers.read().len(), 1);
let dreamer = Dreamer::new(DreamConfig::default());
let stats = dreamer.dream_cycle(&handle).await.unwrap();
assert_eq!(
stats.pruned, 1,
"drawer at floor importance + aged > 30d must be prunable (was unsatisfiable under strict `<`)"
);
assert!(handle.drawers.read().is_empty());
}
#[tokio::test]
async fn dreamer_shutdown_terminates_loop() {
let handle = open_test_handle("dream-shutdown").await;
let dreamer = Arc::new(Dreamer::new(DreamConfig {
idle_secs: 10,
..DreamConfig::default()
}));
let (tx, rx) = tokio::sync::watch::channel(false);
let join = dreamer.clone().start_with_shutdown(handle, rx);
tokio::task::yield_now().await;
tx.send(true).expect("send shutdown signal");
let outcome = tokio::time::timeout(Duration::from_secs(2), join).await;
assert!(
outcome.is_ok(),
"dream loop did not exit within 2s of shutdown"
);
outcome.unwrap().expect("join handle clean exit");
}
#[tokio::test]
async fn dream_cycle_compacts_orphaned_vectors() {
let handle = open_test_handle("dream-compact").await;
let id_keep = handle
.remember(
"alpha drawer about HNSW".into(),
RoomType::Backend,
vec![],
0.7,
)
.await
.unwrap();
let id_orphan_a = handle
.remember(
"beta drawer about something else".into(),
RoomType::General,
vec![],
0.5,
)
.await
.unwrap();
let id_orphan_b = handle
.remember(
"gamma drawer about yet another topic".into(),
RoomType::General,
vec![],
0.5,
)
.await
.unwrap();
assert_eq!(handle.drawers.read().len(), 3);
let before_idx = handle.vector_store.index_size();
let before_ids = handle.vector_store.all_ids().len();
assert_eq!(before_ids, 3, "key_map should track all three upserts");
{
let mut drawers = handle.drawers.write();
drawers.retain(|d| d.id == id_keep);
}
let _ = handle.kg.delete_drawer(id_orphan_a);
let _ = handle.kg.delete_drawer(id_orphan_b);
let dreamer = Dreamer::new(DreamConfig {
dedup_threshold: 0.999,
..DreamConfig::default()
});
let stats = dreamer.dream_cycle(&handle).await.unwrap();
assert_eq!(
stats.compacted, 2,
"expected exactly two orphan vectors removed; got stats={stats:?}"
);
let after_ids = handle.vector_store.all_ids().len();
assert_eq!(
after_ids, 1,
"key_map should only track the surviving drawer (before={before_ids}, before_idx={before_idx})"
);
assert!(
handle.vector_store.all_ids().contains(&id_keep),
"compaction must not remove the live drawer's vector"
);
}
#[tokio::test]
async fn dream_stats_persisted_after_cycle() {
let handle = open_test_handle("dream-persist").await;
handle
.remember(
"non-duplicate baseline drawer".into(),
RoomType::General,
vec![],
0.5,
)
.await
.unwrap();
let dreamer = Dreamer::new(DreamConfig::default());
let stats = dreamer.dream_cycle(&handle).await.unwrap();
let data_dir = handle.data_dir.clone().expect("data_dir set");
let loaded = PersistedDreamStats::load(&data_dir)
.unwrap()
.expect("dream_stats.json should exist after a cycle");
assert_eq!(
loaded.stats, stats,
"persisted stats must match cycle output"
);
let age = chrono::Utc::now().signed_duration_since(loaded.last_run_at);
assert!(
age.num_seconds().abs() < 5,
"last_run_at must be within a few seconds of now; got {age}"
);
}
#[tokio::test]
async fn closet_refresh_builds_index() {
let handle = open_test_handle("dream-closets").await;
let id = handle
.remember(
"Quokkas are the happiest marsupials in Australia".into(),
RoomType::General,
vec![],
0.5,
)
.await
.unwrap();
let dreamer = Dreamer::new(DreamConfig::default());
let stats = dreamer.dream_cycle(&handle).await.unwrap();
assert!(
stats.closets_updated > 0,
"closet index should be non-empty"
);
let closets = handle.closets.read();
let entry = closets.get("quokkas").expect("expected `quokkas` keyword");
assert!(
entry.contains(&id),
"closet entry must reference the source drawer"
);
}
}