use car_memgine::{ConsolidationReport, MemgineEngine};
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use tokio::sync::watch;
use tracing::info;
use crate::task::{Task, TaskStore, TaskTrigger};
#[derive(Debug, Clone)]
pub struct DreamConfig {
pub interval_secs: u64,
pub embeddings_path: Option<PathBuf>,
pub trajectory_dir: Option<PathBuf>,
pub outcome_profiles_path: Option<PathBuf>,
}
impl Default for DreamConfig {
fn default() -> Self {
Self {
interval_secs: 3600,
embeddings_path: None,
trajectory_dir: None,
outcome_profiles_path: None,
}
}
}
pub struct DreamHandle {
pub cancel_tx: watch::Sender<bool>,
pub join: tokio::task::JoinHandle<Vec<ConsolidationReport>>,
}
impl DreamHandle {
pub fn cancel(&self) {
let _ = self.cancel_tx.send(true);
}
}
pub fn spawn_dream_loop(
memgine: Arc<Mutex<MemgineEngine>>,
config: DreamConfig,
max_iterations: Option<u32>,
) -> DreamHandle {
let (cancel_tx, cancel_rx) = watch::channel(false);
let join =
tokio::spawn(async move { dream_loop(memgine, config, max_iterations, cancel_rx).await });
DreamHandle { cancel_tx, join }
}
async fn dream_loop(
memgine: Arc<Mutex<MemgineEngine>>,
config: DreamConfig,
max_iterations: Option<u32>,
mut cancel: watch::Receiver<bool>,
) -> Vec<ConsolidationReport> {
let interval = tokio::time::Duration::from_secs(config.interval_secs);
let mut reports = Vec::new();
let mut iterations: u32 = 0;
let lock_path = config
.trajectory_dir
.as_ref()
.map(|d| d.join(".dream.lock"))
.unwrap_or_else(|| std::path::PathBuf::from("/tmp/.tokhn-dream.lock"));
let mut trailing_requested = false;
loop {
if let Some(max) = max_iterations {
if iterations >= max {
if trailing_requested {
trailing_requested = false;
tracing::debug!("dream: trailing run after max iterations");
} else {
break;
}
}
}
if lock_path.exists() {
if let Ok(metadata) = std::fs::metadata(&lock_path) {
if let Ok(modified) = metadata.modified() {
let age = std::time::SystemTime::now()
.duration_since(modified)
.unwrap_or_default();
if age.as_secs() < config.interval_secs / 2 {
tracing::debug!(
"dream: skipping — lock file is recent ({:.0}s old)",
age.as_secs()
);
tokio::time::sleep(interval).await;
continue;
}
}
}
}
std::fs::write(
&lock_path,
format!(
"{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
),
)
.ok();
let trajectories = config
.trajectory_dir
.as_ref()
.map(|dir| {
let store = car_memgine::TrajectoryStore::new(dir);
store.load_all()
})
.unwrap_or_default();
let model_profiles: Vec<(String, serde_json::Value)> = config
.outcome_profiles_path
.as_ref()
.and_then(|path| {
let content = std::fs::read_to_string(path).ok()?;
let arr: Vec<serde_json::Value> = serde_json::from_str(&content).ok()?;
Some(
arr.into_iter()
.filter_map(|v| {
let id = v.get("model_id")?.as_str()?.to_string();
Some((id, v))
})
.collect(),
)
})
.unwrap_or_default();
let report = {
let mut engine = memgine.lock().unwrap();
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current()
.block_on(engine.consolidate_with_data(&trajectories, &model_profiles))
})
};
info!(
expired_pruned = report.expired_pruned,
superseded_gc = report.superseded_gc,
nodes_embedded = report.nodes_embedded,
trajectory_insights = report.trajectory_insights,
model_insights = report.model_insights,
total_nodes = report.total_nodes,
candidates_promoted = report.candidates_promoted.len(),
candidates_rejected = report.candidates_rejected.len(),
embedding_coverage = %format!("{:.1}%", report.embedding_coverage * 100.0),
"dream consolidation pass complete"
);
if let Some(ref path) = config.embeddings_path {
let engine = memgine.lock().unwrap();
if let Err(e) = engine.persist_embeddings(path) {
tracing::warn!("failed to persist embeddings after dream: {}", e);
}
}
if report.expired_pruned > 0
|| report.superseded_gc > 0
|| report.nodes_embedded > 0
|| report.trajectory_insights > 0
{
trailing_requested = true;
}
reports.push(report);
iterations += 1;
if let Some(max) = max_iterations {
if iterations >= max {
if trailing_requested {
trailing_requested = false;
tracing::debug!("dream: trailing run triggered");
continue; }
break;
}
}
tokio::select! {
_ = tokio::time::sleep(interval) => {}
_ = cancel.changed() => {
if *cancel.borrow() {
std::fs::remove_file(&lock_path).ok();
info!("dream loop cancelled");
break;
}
}
}
}
std::fs::remove_file(&lock_path).ok();
reports
}
pub const DREAM_TASK_NAME: &str = "car-dream";
pub fn ensure_dream_task(store: &TaskStore) -> bool {
let existing = store.list();
if existing.iter().any(|t| t.name == DREAM_TASK_NAME) {
return false;
}
let task =
Task::new(DREAM_TASK_NAME, "memory.consolidate").with_trigger(TaskTrigger::Interval, "1h");
match store.save(&task) {
Ok(path) => {
info!(?path, "created dream task (1h interval)");
true
}
Err(e) => {
tracing::warn!("failed to create dream task: {}", e);
false
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn dream_loop_runs_and_cancels() {
let memgine = Arc::new(Mutex::new(MemgineEngine::new(None)));
{
let mut engine = memgine.lock().unwrap();
let now = chrono::Utc::now();
engine.ingest_fact(
"f1",
"test",
"data",
"user",
"peer",
now,
"global",
None,
vec![],
false,
);
engine.ingest_environment(
"stale",
"old",
now - chrono::Duration::hours(2),
Some(now - chrono::Duration::hours(1)),
);
}
let config = DreamConfig {
interval_secs: 0, ..Default::default()
};
let handle = spawn_dream_loop(memgine.clone(), config, Some(2));
let reports = handle.join.await.unwrap();
assert_eq!(reports.len(), 2);
assert_eq!(reports[0].expired_pruned, 1);
assert_eq!(reports[1].expired_pruned, 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn dream_loop_cancellation() {
let memgine = Arc::new(Mutex::new(MemgineEngine::new(None)));
let config = DreamConfig {
interval_secs: 3600, ..Default::default()
};
let handle = spawn_dream_loop(memgine, config, None);
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
handle.cancel();
let reports = handle.join.await.unwrap();
assert!(!reports.is_empty());
}
#[test]
fn ensure_dream_task_creates_once() {
let dir = TempDir::new().unwrap();
let store = TaskStore::new(dir.path());
assert!(ensure_dream_task(&store));
let tasks = store.list();
assert_eq!(tasks.len(), 1);
assert_eq!(tasks[0].name, DREAM_TASK_NAME);
assert_eq!(tasks[0].trigger, TaskTrigger::Interval);
assert_eq!(tasks[0].schedule, "1h");
assert!(!ensure_dream_task(&store));
assert_eq!(store.list().len(), 1);
}
}