use super::config::{DreamConfig, DreamStats};
use super::cycle::{
compact_pass, content_prune_pass, dedup_pass, prune_pass, refresh_closets,
semantic_consolidation_pass,
};
use super::guard::CompactionGuard;
use super::recall_benchmark::run_benchmark;
use crate::memory_core::retrieval::PalaceHandle;
use crate::memory_core::semantic_consolidation::SemanticConsolidator;
use anyhow::{Context, Result};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use super::helpers::now_secs;
pub struct Dreamer {
pub config: DreamConfig,
pub(super) last_activity: Arc<AtomicU64>,
pub(super) consolidator: Option<Arc<SemanticConsolidator>>,
}
impl Dreamer {
pub fn new(config: DreamConfig) -> Self {
Self {
config,
last_activity: Arc::new(AtomicU64::new(now_secs())),
consolidator: None,
}
}
pub fn with_consolidator(config: DreamConfig, consolidator: Arc<SemanticConsolidator>) -> Self {
Self {
config,
last_activity: Arc::new(AtomicU64::new(now_secs())),
consolidator: Some(consolidator),
}
}
pub fn touch(&self) {
self.last_activity.store(now_secs(), Ordering::Relaxed);
}
pub fn is_idle(&self) -> bool {
let last = self.last_activity.load(Ordering::Relaxed);
now_secs().saturating_sub(last) >= self.config.idle_secs
}
pub fn start(self: Arc<Self>, handle: Arc<PalaceHandle>) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let interval = Duration::from_secs(self.config.idle_secs.max(1));
loop {
tokio::time::sleep(interval).await;
if !self.is_idle() {
continue;
}
match self.dream_cycle(&handle).await {
Ok(stats) => tracing::info!(
palace = %handle.id,
merged = stats.merged,
pruned = stats.pruned,
content_pruned = stats.content_pruned,
compacted = stats.compacted,
closets_updated = stats.closets_updated,
semantically_consolidated = stats.semantically_consolidated,
semantic_llm_calls = stats.semantic_llm_calls,
duration_ms = stats.duration_ms,
drawers_before = stats.drawers_before,
drawers_after = stats.drawers_after,
compression_ratio = stats.compression_ratio,
"dream cycle complete"
),
Err(e) => tracing::warn!(palace = %handle.id, "dream cycle failed: {e:#}"),
}
}
})
}
pub fn start_with_shutdown(
self: Arc<Self>,
handle: Arc<PalaceHandle>,
mut shutdown: tokio::sync::watch::Receiver<bool>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let interval = Duration::from_secs(self.config.idle_secs.max(1));
loop {
tokio::select! {
_ = tokio::time::sleep(interval) => {}
res = shutdown.changed() => {
if res.is_err() || *shutdown.borrow() {
tracing::info!(palace = %handle.id, "dreamer shutting down");
return;
}
}
}
if *shutdown.borrow() {
tracing::info!(palace = %handle.id, "dreamer shutting down");
return;
}
if !self.is_idle() {
continue;
}
match self.dream_cycle(&handle).await {
Ok(stats) => tracing::info!(
palace = %handle.id,
merged = stats.merged,
pruned = stats.pruned,
content_pruned = stats.content_pruned,
compacted = stats.compacted,
closets_updated = stats.closets_updated,
semantically_consolidated = stats.semantically_consolidated,
semantic_llm_calls = stats.semantic_llm_calls,
duration_ms = stats.duration_ms,
drawers_before = stats.drawers_before,
drawers_after = stats.drawers_after,
compression_ratio = stats.compression_ratio,
"dream cycle complete"
),
Err(e) => tracing::warn!(palace = %handle.id, "dream cycle failed: {e:#}"),
}
}
})
}
pub async fn dream_cycle(&self, handle: &Arc<PalaceHandle>) -> Result<DreamStats> {
let started = std::time::Instant::now();
let budget = Duration::from_millis(self.config.max_cycle_ms);
let _compaction_guard = CompactionGuard::new(handle.is_compacting.clone());
let drawers_before = handle.drawers.read().len() as u64;
let recall_score_before = if self.config.recall_benchmark_enabled {
run_benchmark(handle).await
} else {
None
};
let content_pruned = if self.config.content_prune_enabled {
content_prune_pass(handle, started, budget, self.config.content_prune_min_words)
.await
.context("dream content prune pass")?
} else {
0
};
let merged = dedup_pass(handle, started, budget, self.config.dedup_threshold)
.await
.context("dream dedup pass")?;
let pruned = prune_pass(handle, started, budget, self.config.prune_importance)
.await
.context("dream prune pass")?;
let compacted = compact_pass(handle, started, budget)
.await
.context("dream compact pass")?;
let closets_updated = refresh_closets(handle);
let (semantically_consolidated, semantic_llm_calls, semantic_cache_hits) =
semantic_consolidation_pass(handle, &self.config, self.consolidator.clone()).await;
if let Err(e) = handle.flush() {
tracing::warn!("dream flush failed: {e:#}");
}
let drawers_after = handle.drawers.read().len() as u64;
let recall_score_after = if self.config.recall_benchmark_enabled {
run_benchmark(handle).await
} else {
None
};
let mut stats = DreamStats {
merged,
pruned,
closets_updated,
compacted,
content_pruned,
semantically_consolidated,
semantic_llm_calls,
semantic_cache_hits,
duration_ms: started.elapsed().as_millis() as u64,
drawers_before,
drawers_after,
compression_ratio: 0.0, recall_score_before,
recall_score_after,
};
stats.update_compression_ratio();
match handle.kg.checkpoint() {
Ok((wal, done)) => {
tracing::debug!(
palace = %handle.id,
wal_pages = wal,
checkpointed = done,
"WAL checkpoint complete"
);
}
Err(e) => {
tracing::warn!(
palace = %handle.id,
error = %e,
"WAL checkpoint failed (non-fatal)"
);
}
}
if let Some(data_dir) = handle.data_dir.as_ref() {
use super::config::PersistedDreamStats;
let persisted = PersistedDreamStats {
last_run_at: chrono::Utc::now(),
stats: stats.clone(),
};
if let Err(e) = persisted.save(data_dir) {
tracing::warn!(palace = %handle.id, "persist dream_stats.json failed: {e:#}");
}
}
Ok(stats)
}
}