use car_memgine::{ConsolidationReport, MemgineEngine};
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use tokio::sync::watch;
use tracing::info;
use crate::task::{Task, TaskStore, TaskTrigger};
#[derive(Debug, Clone)]
pub struct DreamConfig {
pub interval_secs: u64,
pub embeddings_path: Option<PathBuf>,
pub trajectory_dir: Option<PathBuf>,
pub outcome_profiles_path: Option<PathBuf>,
}
impl Default for DreamConfig {
fn default() -> Self {
Self {
interval_secs: 3600,
embeddings_path: None,
trajectory_dir: None,
outcome_profiles_path: None,
}
}
}
pub struct DreamHandle {
pub cancel_tx: watch::Sender<bool>,
pub join: tokio::task::JoinHandle<Vec<ConsolidationReport>>,
}
impl DreamHandle {
pub fn cancel(&self) {
let _ = self.cancel_tx.send(true);
}
}
pub fn spawn_dream_loop(
memgine: Arc<Mutex<MemgineEngine>>,
config: DreamConfig,
max_iterations: Option<u32>,
) -> DreamHandle {
let (cancel_tx, cancel_rx) = watch::channel(false);
let join = tokio::spawn(async move {
dream_loop(memgine, config, max_iterations, cancel_rx).await
});
DreamHandle { cancel_tx, join }
}
async fn dream_loop(
memgine: Arc<Mutex<MemgineEngine>>,
config: DreamConfig,
max_iterations: Option<u32>,
mut cancel: watch::Receiver<bool>,
) -> Vec<ConsolidationReport> {
let interval = tokio::time::Duration::from_secs(config.interval_secs);
let mut reports = Vec::new();
let mut iterations: u32 = 0;
let lock_path = config.trajectory_dir.as_ref()
.map(|d| d.join(".dream.lock"))
.unwrap_or_else(|| std::path::PathBuf::from("/tmp/.tokhn-dream.lock"));
let mut trailing_requested = false;
loop {
if let Some(max) = max_iterations {
if iterations >= max {
if trailing_requested {
trailing_requested = false;
tracing::debug!("dream: trailing run after max iterations");
} else {
break;
}
}
}
if lock_path.exists() {
if let Ok(metadata) = std::fs::metadata(&lock_path) {
if let Ok(modified) = metadata.modified() {
let age = std::time::SystemTime::now().duration_since(modified).unwrap_or_default();
if age.as_secs() < config.interval_secs / 2 {
tracing::debug!("dream: skipping — lock file is recent ({:.0}s old)", age.as_secs());
tokio::time::sleep(interval).await;
continue;
}
}
}
}
std::fs::write(&lock_path, format!("{}", std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_secs())).ok();
let trajectories = config.trajectory_dir.as_ref()
.map(|dir| {
let store = car_memgine::TrajectoryStore::new(dir);
store.load_all()
})
.unwrap_or_default();
let model_profiles: Vec<(String, serde_json::Value)> = config.outcome_profiles_path.as_ref()
.and_then(|path| {
let content = std::fs::read_to_string(path).ok()?;
let arr: Vec<serde_json::Value> = serde_json::from_str(&content).ok()?;
Some(arr.into_iter().filter_map(|v| {
let id = v.get("model_id")?.as_str()?.to_string();
Some((id, v))
}).collect())
})
.unwrap_or_default();
let report = {
let mut engine = memgine.lock().unwrap();
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(
engine.consolidate_with_data(&trajectories, &model_profiles)
)
})
};
info!(
expired_pruned = report.expired_pruned,
superseded_gc = report.superseded_gc,
nodes_embedded = report.nodes_embedded,
trajectory_insights = report.trajectory_insights,
model_insights = report.model_insights,
total_nodes = report.total_nodes,
embedding_coverage = %format!("{:.1}%", report.embedding_coverage * 100.0),
"dream consolidation pass complete"
);
if let Some(ref path) = config.embeddings_path {
let engine = memgine.lock().unwrap();
if let Err(e) = engine.persist_embeddings(path) {
tracing::warn!("failed to persist embeddings after dream: {}", e);
}
}
if report.expired_pruned > 0 || report.superseded_gc > 0 || report.nodes_embedded > 0 || report.trajectory_insights > 0 {
trailing_requested = true;
}
reports.push(report);
iterations += 1;
if let Some(max) = max_iterations {
if iterations >= max {
if trailing_requested {
trailing_requested = false;
tracing::debug!("dream: trailing run triggered");
continue; }
break;
}
}
tokio::select! {
_ = tokio::time::sleep(interval) => {}
_ = cancel.changed() => {
if *cancel.borrow() {
std::fs::remove_file(&lock_path).ok();
info!("dream loop cancelled");
break;
}
}
}
}
std::fs::remove_file(&lock_path).ok();
reports
}
pub const DREAM_TASK_NAME: &str = "car-dream";
pub fn ensure_dream_task(store: &TaskStore) -> bool {
let existing = store.list();
if existing.iter().any(|t| t.name == DREAM_TASK_NAME) {
return false;
}
let task = Task::new(DREAM_TASK_NAME, "memory.consolidate")
.with_trigger(TaskTrigger::Interval, "1h");
match store.save(&task) {
Ok(path) => {
info!(?path, "created dream task (1h interval)");
true
}
Err(e) => {
tracing::warn!("failed to create dream task: {}", e);
false
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn dream_loop_runs_and_cancels() {
let memgine = Arc::new(Mutex::new(MemgineEngine::new(None)));
{
let mut engine = memgine.lock().unwrap();
let now = chrono::Utc::now();
engine.ingest_fact("f1", "test", "data", "user", "peer", now, "global", None, vec![], false);
engine.ingest_environment("stale", "old", now - chrono::Duration::hours(2),
Some(now - chrono::Duration::hours(1)));
}
let config = DreamConfig {
interval_secs: 0, ..Default::default()
};
let handle = spawn_dream_loop(memgine.clone(), config, Some(2));
let reports = handle.join.await.unwrap();
assert_eq!(reports.len(), 2);
assert_eq!(reports[0].expired_pruned, 1);
assert_eq!(reports[1].expired_pruned, 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn dream_loop_cancellation() {
let memgine = Arc::new(Mutex::new(MemgineEngine::new(None)));
let config = DreamConfig {
interval_secs: 3600, ..Default::default()
};
let handle = spawn_dream_loop(memgine, config, None);
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
handle.cancel();
let reports = handle.join.await.unwrap();
assert!(!reports.is_empty());
}
#[test]
fn ensure_dream_task_creates_once() {
let dir = TempDir::new().unwrap();
let store = TaskStore::new(dir.path());
assert!(ensure_dream_task(&store));
let tasks = store.list();
assert_eq!(tasks.len(), 1);
assert_eq!(tasks[0].name, DREAM_TASK_NAME);
assert_eq!(tasks[0].trigger, TaskTrigger::Interval);
assert_eq!(tasks[0].schedule, "1h");
assert!(!ensure_dream_task(&store));
assert_eq!(store.list().len(), 1);
}
}