use std::sync::Arc;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use crate::error::Result;
use crate::intelligence::auto_consolidate::{
run_consolidation, ConsolidationCounts, ConsolidationPolicy,
};
use crate::intelligence::consolidation_offline::{
ConsolidationConfig, ConsolidationReport, OfflineConsolidator,
};
use crate::storage::Storage;
pub mod candidates;
pub mod eval;
#[derive(Debug, Clone)]
pub struct DreamConfig {
pub interval: std::time::Duration,
pub consolidation: ConsolidationConfig,
pub owner_id: String,
pub lock_ttl_seconds: u64,
pub auto_consolidate: ConsolidationPolicy,
}
impl Default for DreamConfig {
fn default() -> Self {
Self {
interval: std::time::Duration::from_secs(6 * 60 * 60),
consolidation: ConsolidationConfig::default(),
owner_id: uuid::Uuid::new_v4().to_string(),
lock_ttl_seconds: 300,
auto_consolidate: ConsolidationPolicy::default(),
}
}
}
const DREAM_LOCK_ID: &str = "dream_phase_all";
struct DreamLockGuard<'a> {
storage: &'a Storage,
owner_id: String,
}
impl<'a> Drop for DreamLockGuard<'a> {
fn drop(&mut self) {
let res = self.storage.with_transaction(|conn| {
crate::storage::queries::release_dream_lock(conn, DREAM_LOCK_ID, &self.owner_id)
});
if let Err(e) = res {
tracing::error!(
target = "engram::dream",
error = %e,
owner_id = %self.owner_id,
"Failed to release dream-phase advisory lock — lock may persist until TTL expires"
);
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DreamWorkspaceReport {
pub workspace: String,
pub groups_found: usize,
pub memories_merged: usize,
pub memories_archived: usize,
pub tokens_before: usize,
pub tokens_after: usize,
pub tokens_saved: usize,
#[serde(skip_serializing_if = "Option::is_none", default)]
pub auto_consolidate: Option<ConsolidationCounts>,
}
impl From<(&str, ConsolidationReport)> for DreamWorkspaceReport {
fn from((workspace, r): (&str, ConsolidationReport)) -> Self {
Self {
workspace: workspace.to_string(),
groups_found: r.groups_found,
memories_merged: r.memories_merged,
memories_archived: r.memories_archived,
tokens_before: r.tokens_before,
tokens_after: r.tokens_after,
tokens_saved: r.tokens_saved,
auto_consolidate: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DreamReport {
pub started_at: DateTime<Utc>,
pub finished_at: DateTime<Utc>,
pub workspaces: Vec<DreamWorkspaceReport>,
pub errors: Vec<String>,
}
pub fn run_once_workspace(
storage: &Storage,
workspace: &str,
config: &DreamConfig,
) -> Result<DreamWorkspaceReport> {
let consolidator = OfflineConsolidator::new(config.consolidation.clone());
let report = storage.with_transaction(|conn| {
let r = consolidator.consolidate(conn, workspace)?;
if r.groups_found > 0 {
let digest_content = format!(
"Dream Phase Digest: Consolidated {} memories into {} groups, saving {} tokens.",
r.memories_merged, r.groups_found, r.tokens_saved
);
let digest_input = crate::types::CreateMemoryInput {
content: digest_content,
memory_type: crate::types::MemoryType::Summary,
workspace: Some(workspace.to_string()),
tags: vec!["dream-digest".to_string(), "distillate".to_string()],
importance: Some(0.8),
..Default::default()
};
if let Err(e) = crate::storage::queries::create_memory(conn, &digest_input) {
tracing::warn!(
target = "engram::dream",
workspace = %workspace,
error = %e,
"Failed to emit dream-phase digest memory"
);
}
}
Ok(r)
})?;
let mut workspace_report: DreamWorkspaceReport = (workspace, report).into();
match run_consolidation(storage, workspace, &config.auto_consolidate) {
Ok(report) => {
let counts = report.counts();
tracing::debug!(
target = "engram::dream",
workspace = %workspace,
dry_run = report.dry_run,
duplicates_merged = counts.duplicates_merged,
conflicts_resolved = counts.conflicts_resolved,
summarized = counts.summarized,
skipped = counts.skipped,
"auto-consolidate phase finished"
);
workspace_report.auto_consolidate = Some(counts);
}
Err(e) => {
tracing::warn!(
target = "engram::dream",
workspace = %workspace,
error = %e,
"auto-consolidate phase failed; continuing"
);
}
}
Ok(workspace_report)
}
pub fn run_once_all(storage: &Storage, config: &DreamConfig) -> DreamReport {
let started_at = Utc::now();
let mut workspaces = Vec::new();
let mut errors = Vec::new();
let lock_attempt = storage.with_transaction(|conn| {
crate::storage::queries::acquire_dream_lock(
conn,
DREAM_LOCK_ID,
&config.owner_id,
config.lock_ttl_seconds,
)
});
let lock_acquired = match lock_attempt {
Ok(acquired) => acquired,
Err(e) => {
tracing::error!(
target = "engram::dream",
error = %e,
"acquire_dream_lock failed"
);
return DreamReport {
started_at,
finished_at: Utc::now(),
workspaces: Vec::new(),
errors: vec![format!("acquire_dream_lock: {}", e)],
};
}
};
if !lock_acquired {
return DreamReport {
started_at,
finished_at: Utc::now(),
workspaces: Vec::new(),
errors: vec![
"Could not acquire advisory lock (another instance may be running)".to_string(),
],
};
}
let _guard = DreamLockGuard {
storage,
owner_id: config.owner_id.clone(),
};
match storage.with_transaction(crate::storage::queries::list_workspaces) {
Ok(list) => {
for stats in list {
match run_once_workspace(storage, &stats.workspace, config) {
Ok(report) => workspaces.push(report),
Err(e) => errors.push(format!("{}: {}", stats.workspace, e)),
}
}
}
Err(e) => errors.push(format!("list_workspaces: {}", e)),
}
let report = DreamReport {
started_at,
finished_at: Utc::now(),
workspaces,
errors,
};
if let Err(e) =
storage.with_transaction(|conn| crate::storage::queries::insert_dream_run(conn, &report))
{
tracing::error!(
target = "engram::dream",
error = %e,
"Failed to persist dream_runs history row"
);
}
report
}
pub fn spawn_scheduler(storage: Arc<Storage>, config: DreamConfig) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut ticker = tokio::time::interval(config.interval);
ticker.tick().await;
loop {
ticker.tick().await;
let storage_for_pass = Arc::clone(&storage);
let config_for_pass = config.clone();
let report = match tokio::task::spawn_blocking(move || {
run_once_all(&storage_for_pass, &config_for_pass)
})
.await
{
Ok(r) => r,
Err(e) => {
tracing::error!(
target = "engram::dream",
error = %e,
"Dream Phase pass panicked"
);
continue;
}
};
tracing::info!(
target = "engram::dream",
workspaces = report.workspaces.len(),
errors = report.errors.len(),
duration_ms = (report.finished_at - report.started_at).num_milliseconds(),
"Dream Phase pass complete"
);
for err in &report.errors {
tracing::warn!(target = "engram::dream", error = %err, "Dream Phase workspace failed");
}
}
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_run_once_workspace_on_empty_storage() {
let storage = Storage::open_in_memory().unwrap();
let cfg = DreamConfig::default();
let report = run_once_workspace(&storage, "default", &cfg).unwrap();
assert_eq!(report.workspace, "default");
assert_eq!(report.groups_found, 0);
assert_eq!(report.memories_merged, 0);
}
#[test]
fn test_run_once_all_on_empty_storage() {
let storage = Storage::open_in_memory().unwrap();
let cfg = DreamConfig::default();
let report = run_once_all(&storage, &cfg);
assert!(report.errors.is_empty(), "errors: {:?}", report.errors);
assert!(report.finished_at >= report.started_at);
}
}