car-scheduler 0.14.0

Task scheduling and background execution for Common Agent Runtime
Documentation
//! Automatic memory consolidation ("dream") loop.
//!
//! Runs `MemgineEngine::consolidate()` on a periodic interval.
//! Unlike agent tasks, this is purely algorithmic — no LLM needed.
//!
//! ## Usage
//!
//! ```rust,ignore
//! let memgine = Arc::new(Mutex::new(MemgineEngine::new(None)));
//! let handle = spawn_dream_loop(memgine, DreamConfig::default());
//!
//! // Later...
//! handle.cancel();
//! ```

use car_memgine::{ConsolidationReport, MemgineEngine};
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use tokio::sync::watch;
use tracing::info;

use crate::task::{Task, TaskStore, TaskTrigger};

/// Configuration for the dream loop.
#[derive(Debug, Clone)]
pub struct DreamConfig {
    /// Interval between dream passes in seconds. Default: 1 hour.
    pub interval_secs: u64,
    /// Path to persist embeddings after each consolidation.
    pub embeddings_path: Option<PathBuf>,
    /// Path to trajectory store directory (for ingesting tool success patterns).
    pub trajectory_dir: Option<PathBuf>,
    /// Path to outcome profiles JSON (for ingesting model performance facts).
    pub outcome_profiles_path: Option<PathBuf>,
}

impl Default for DreamConfig {
    fn default() -> Self {
        Self {
            interval_secs: 3600,
            embeddings_path: None,
            trajectory_dir: None,
            outcome_profiles_path: None,
        }
    }
}

/// Handle for a running dream loop. Send cancel to stop.
pub struct DreamHandle {
    pub cancel_tx: watch::Sender<bool>,
    pub join: tokio::task::JoinHandle<Vec<ConsolidationReport>>,
}

impl DreamHandle {
    pub fn cancel(&self) {
        let _ = self.cancel_tx.send(true);
    }
}

/// Spawn a background dream loop that periodically consolidates memory.
///
/// Returns a handle for cancellation. Reports are collected and returned
/// when the loop exits (via cancellation or max_iterations).
pub fn spawn_dream_loop(
    memgine: Arc<Mutex<MemgineEngine>>,
    config: DreamConfig,
    max_iterations: Option<u32>,
) -> DreamHandle {
    let (cancel_tx, cancel_rx) = watch::channel(false);

    let join =
        tokio::spawn(async move { dream_loop(memgine, config, max_iterations, cancel_rx).await });

    DreamHandle { cancel_tx, join }
}

async fn dream_loop(
    memgine: Arc<Mutex<MemgineEngine>>,
    config: DreamConfig,
    max_iterations: Option<u32>,
    mut cancel: watch::Receiver<bool>,
) -> Vec<ConsolidationReport> {
    let interval = tokio::time::Duration::from_secs(config.interval_secs);
    let mut reports = Vec::new();
    let mut iterations: u32 = 0;

    // Lock file for coalescing: prevents concurrent dream loops
    let lock_path = config
        .trajectory_dir
        .as_ref()
        .map(|d| d.join(".dream.lock"))
        .unwrap_or_else(|| std::path::PathBuf::from("/tmp/.tokhn-dream.lock"));
    let mut trailing_requested = false;

    loop {
        if let Some(max) = max_iterations {
            if iterations >= max {
                // Trailing run: if changes happened during last iteration, do one more pass
                if trailing_requested {
                    trailing_requested = false;
                    tracing::debug!("dream: trailing run after max iterations");
                } else {
                    break;
                }
            }
        }

        // Lock file check: skip if another dream loop is running
        // Use file modification time as "last consolidated" timestamp
        if lock_path.exists() {
            if let Ok(metadata) = std::fs::metadata(&lock_path) {
                if let Ok(modified) = metadata.modified() {
                    let age = std::time::SystemTime::now()
                        .duration_since(modified)
                        .unwrap_or_default();
                    if age.as_secs() < config.interval_secs / 2 {
                        // Another loop consolidated recently — skip this cycle
                        tracing::debug!(
                            "dream: skipping — lock file is recent ({:.0}s old)",
                            age.as_secs()
                        );
                        tokio::time::sleep(interval).await;
                        continue;
                    }
                }
            }
        }

        // Touch lock file to claim this consolidation window
        std::fs::write(
            &lock_path,
            format!(
                "{}",
                std::time::SystemTime::now()
                    .duration_since(std::time::UNIX_EPOCH)
                    .unwrap_or_default()
                    .as_secs()
            ),
        )
        .ok();

        // Load external data for enhanced consolidation
        let trajectories = config
            .trajectory_dir
            .as_ref()
            .map(|dir| {
                let store = car_memgine::TrajectoryStore::new(dir);
                store.load_all()
            })
            .unwrap_or_default();

        let model_profiles: Vec<(String, serde_json::Value)> = config
            .outcome_profiles_path
            .as_ref()
            .and_then(|path| {
                let content = std::fs::read_to_string(path).ok()?;
                let arr: Vec<serde_json::Value> = serde_json::from_str(&content).ok()?;
                Some(
                    arr.into_iter()
                        .filter_map(|v| {
                            let id = v.get("model_id")?.as_str()?.to_string();
                            Some((id, v))
                        })
                        .collect(),
                )
            })
            .unwrap_or_default();

        // Run enhanced consolidation with trajectory + model data
        let report = {
            let mut engine = memgine.lock().unwrap();
            tokio::task::block_in_place(|| {
                tokio::runtime::Handle::current()
                    .block_on(engine.consolidate_with_data(&trajectories, &model_profiles))
            })
        };

        info!(
            expired_pruned = report.expired_pruned,
            superseded_gc = report.superseded_gc,
            nodes_embedded = report.nodes_embedded,
            trajectory_insights = report.trajectory_insights,
            model_insights = report.model_insights,
            total_nodes = report.total_nodes,
            embedding_coverage = %format!("{:.1}%", report.embedding_coverage * 100.0),
            "dream consolidation pass complete"
        );

        // Persist embeddings if configured
        if let Some(ref path) = config.embeddings_path {
            let engine = memgine.lock().unwrap();
            if let Err(e) = engine.persist_embeddings(path) {
                tracing::warn!("failed to persist embeddings after dream: {}", e);
            }
        }

        // Check if this pass did meaningful work (trailing-run trigger)
        if report.expired_pruned > 0
            || report.superseded_gc > 0
            || report.nodes_embedded > 0
            || report.trajectory_insights > 0
        {
            trailing_requested = true;
        }

        reports.push(report);
        iterations += 1;

        if let Some(max) = max_iterations {
            if iterations >= max {
                if trailing_requested {
                    trailing_requested = false;
                    tracing::debug!("dream: trailing run triggered");
                    continue; // One more pass
                }
                break;
            }
        }

        // Wait for next interval or cancellation
        tokio::select! {
            _ = tokio::time::sleep(interval) => {}
            _ = cancel.changed() => {
                if *cancel.borrow() {
                    // Clean up lock file on cancellation
                    std::fs::remove_file(&lock_path).ok();
                    info!("dream loop cancelled");
                    break;
                }
            }
        }
    }

    // Clean up lock file on exit
    std::fs::remove_file(&lock_path).ok();

    reports
}

// ---------------------------------------------------------------------------
// Auto-setup: ensure a dream task exists in the TaskStore
// ---------------------------------------------------------------------------

/// Well-known name for the dream task in the task store.
pub const DREAM_TASK_NAME: &str = "car-dream";

/// Ensure a dream task definition exists in the task store.
/// Returns true if a new task was created, false if one already existed.
pub fn ensure_dream_task(store: &TaskStore) -> bool {
    // Check if a dream task already exists
    let existing = store.list();
    if existing.iter().any(|t| t.name == DREAM_TASK_NAME) {
        return false;
    }

    let task =
        Task::new(DREAM_TASK_NAME, "memory.consolidate").with_trigger(TaskTrigger::Interval, "1h");

    match store.save(&task) {
        Ok(path) => {
            info!(?path, "created dream task (1h interval)");
            true
        }
        Err(e) => {
            tracing::warn!("failed to create dream task: {}", e);
            false
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use tempfile::TempDir;

    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
    async fn dream_loop_runs_and_cancels() {
        let memgine = Arc::new(Mutex::new(MemgineEngine::new(None)));

        // Ingest some data so consolidation has work to do
        {
            let mut engine = memgine.lock().unwrap();
            let now = chrono::Utc::now();
            engine.ingest_fact(
                "f1",
                "test",
                "data",
                "user",
                "peer",
                now,
                "global",
                None,
                vec![],
                false,
            );
            engine.ingest_environment(
                "stale",
                "old",
                now - chrono::Duration::hours(2),
                Some(now - chrono::Duration::hours(1)),
            );
        }

        let config = DreamConfig {
            interval_secs: 0, // instant for testing
            ..Default::default()
        };

        let handle = spawn_dream_loop(memgine.clone(), config, Some(2));
        let reports = handle.join.await.unwrap();

        assert_eq!(reports.len(), 2);
        // First pass should prune the expired env node
        assert_eq!(reports[0].expired_pruned, 1);
        // Second pass: nothing left to prune
        assert_eq!(reports[1].expired_pruned, 0);
    }

    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
    async fn dream_loop_cancellation() {
        let memgine = Arc::new(Mutex::new(MemgineEngine::new(None)));
        let config = DreamConfig {
            interval_secs: 3600, // long interval
            ..Default::default()
        };

        let handle = spawn_dream_loop(memgine, config, None);

        // Let it run one iteration then cancel
        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
        handle.cancel();
        let reports = handle.join.await.unwrap();

        // Should have completed at least 1 pass before being cancelled during sleep
        assert!(!reports.is_empty());
    }

    #[test]
    fn ensure_dream_task_creates_once() {
        let dir = TempDir::new().unwrap();
        let store = TaskStore::new(dir.path());

        // First call creates
        assert!(ensure_dream_task(&store));
        let tasks = store.list();
        assert_eq!(tasks.len(), 1);
        assert_eq!(tasks[0].name, DREAM_TASK_NAME);
        assert_eq!(tasks[0].trigger, TaskTrigger::Interval);
        assert_eq!(tasks[0].schedule, "1h");

        // Second call is idempotent
        assert!(!ensure_dream_task(&store));
        assert_eq!(store.list().len(), 1);
    }
}