opencrabs 0.3.57

The autonomous, self-improving AI agent. Single Rust binary. Every channel. Install with: cargo install opencrabs
//! Sub-agent progress streaming via JSON status files.
//!
//! Each sub-agent writes its state/progress to
//! `<opencrabs_home>/tmp/subagents/<agent_id>.json`. The main orchestrator
//! can `read_file` these at any time for real-time visibility — no
//! `session_search` needed.
//!
//! Files older than 7 days are cleaned up on startup.

use serde::{Deserialize, Serialize};
use std::fs;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::time::{Duration, SystemTime};

/// Base directory for all sub-agent status files.
pub fn status_dir() -> PathBuf {
    #[cfg(test)]
    {
        if let Some(p) = test_override::get() {
            return p;
        }
    }
    crate::config::opencrabs_home()
        .join("tmp")
        .join("subagents")
}

#[cfg(test)]
pub(crate) mod test_override {
    use std::cell::RefCell;
    use std::path::PathBuf;
    thread_local! {
        static DIR: RefCell<Option<PathBuf>> = const { RefCell::new(None) };
    }
    pub fn set(p: PathBuf) {
        DIR.with(|d| *d.borrow_mut() = Some(p));
    }
    pub fn get() -> Option<PathBuf> {
        DIR.with(|d| d.borrow().clone())
    }
}

/// Ensure the status directory exists.
pub fn ensure_dir() -> std::io::Result<()> {
    let dir = status_dir();
    if !dir.exists() {
        fs::create_dir_all(&dir)?;
    }
    Ok(())
}

/// Path to a specific sub-agent's status file.
pub fn status_path(agent_id: &str) -> PathBuf {
    status_dir().join(format!("{}.json", agent_id))
}

// ── Status data types ────────────────────────────────────────────────

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum AgentState {
    Pending,
    Running,
    Completed,
    Failed,
}

/// Snapshot of the latest tool-use event in a running sub-agent.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ProgressSnapshot {
    #[serde(default = "usize::default")]
    pub iteration: usize,
    #[serde(default)]
    pub last_tool: Option<String>,
    #[serde(default)]
    pub last_event: Option<String>,
    #[serde(default)]
    pub updated_at: Option<String>,
}

/// Persisted status of a single sub-agent.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AgentStatus {
    pub id: String,
    pub label: String,
    pub parent_session_id: String,
    pub state: AgentState,
    pub prompt: String,
    pub started_at: String,
    #[serde(default)]
    pub progress: Option<ProgressSnapshot>,
    #[serde(default)]
    pub completed_at: Option<String>,
    #[serde(default)]
    pub error: Option<String>,
    #[serde(default)]
    pub output_summary: Option<String>,
}

impl AgentStatus {
    /// Create a new status in `Pending` state and write the JSON file.
    pub fn new(
        agent_id: &str,
        label: &str,
        parent_session_id: &str,
        prompt: &str,
    ) -> std::io::Result<Self> {
        ensure_dir()?;
        let now = now_rfc3339();
        let status = Self {
            id: agent_id.to_string(),
            label: label.to_string(),
            parent_session_id: parent_session_id.to_string(),
            state: AgentState::Pending,
            prompt: prompt.to_string(),
            started_at: now.clone(),
            progress: None,
            completed_at: None,
            error: None,
            output_summary: None,
        };
        status.write()?;
        Ok(status)
    }

    /// Transition to `Running`.
    pub fn mark_running(&mut self) -> std::io::Result<()> {
        self.state = AgentState::Running;
        self.write()
    }

    /// Update the progress snapshot after each tool-loop iteration.
    pub fn update_progress(
        &mut self,
        iteration: usize,
        last_tool: Option<String>,
        last_event: Option<String>,
    ) -> std::io::Result<()> {
        self.progress = Some(ProgressSnapshot {
            iteration,
            last_tool,
            last_event,
            updated_at: Some(now_rfc3339()),
        });
        self.write()
    }

    /// Mark the agent as completed with a short output summary.
    pub fn mark_completed(&mut self, output_summary: String) -> std::io::Result<()> {
        self.state = AgentState::Completed;
        self.completed_at = Some(now_rfc3339());
        self.output_summary = Some(output_summary);
        self.write()
    }

    /// Mark the agent as failed with an error message.
    pub fn mark_failed(&mut self, error: String) -> std::io::Result<()> {
        self.state = AgentState::Failed;
        self.completed_at = Some(now_rfc3339());
        self.error = Some(error);
        self.write()
    }

    /// Read the persisted status for an agent, if the file exists.
    pub fn read(agent_id: &str) -> Option<Self> {
        let path = status_path(agent_id);
        if !path.exists() {
            return None;
        }
        let data = fs::read_to_string(&path).ok()?;
        serde_json::from_str(&data).ok()
    }

    /// Persist status to disk. Uses atomic rename for crash safety.
    fn write(&self) -> std::io::Result<()> {
        let path = status_path(&self.id);
        ensure_dir()?;
        let tmp = path.with_extension("json.tmp");
        let data = serde_json::to_string_pretty(self).map_err(std::io::Error::other)?;
        let mut f = fs::File::create(&tmp)?;
        f.write_all(data.as_bytes())?;
        f.sync_all()?;
        fs::rename(tmp, path)
    }

    /// List all known sub-agent status files (by agent_id).
    pub fn list_all() -> std::io::Result<Vec<String>> {
        let dir = status_dir();
        if !dir.exists() {
            return Ok(Vec::new());
        }
        let mut ids = Vec::new();
        for entry in fs::read_dir(&dir)? {
            let entry = entry?;
            if let Some(name) = entry.file_name().to_str()
                && let Some(id) = name.strip_suffix(".json")
            {
                ids.push(id.to_string());
            }
        }
        ids.sort();
        Ok(ids)
    }
}

// ── Auto-cleanup ─────────────────────────────────────────────────────

/// Remove status files whose `completed_at` is older than `max_age`
/// or whose on-disk mtime is older than `max_age` (for files without
/// a `completed_at` field — covers old/corrupted files).
pub fn cleanup_stale(max_age: Duration) -> std::io::Result<(usize, usize)> {
    let dir = status_dir();
    if !dir.exists() {
        return Ok((0, 0));
    }

    let cutoff = SystemTime::now()
        .checked_sub(max_age)
        .unwrap_or(SystemTime::UNIX_EPOCH);

    let mut scanned = 0usize;
    let mut removed = 0usize;

    for entry in fs::read_dir(&dir)? {
        let entry = entry?;
        let path = entry.path();
        if path.extension().is_none_or(|e| e != "json") {
            continue;
        }
        scanned += 1;

        let should_delete = if let Ok(data) = fs::read_to_string(&path) {
            if let Ok(status) = serde_json::from_str::<AgentStatus>(&data) {
                status
                    .completed_at
                    .as_ref()
                    .is_some_and(|ts| parse_completed_at(&cutoff, ts))
                    || status.completed_at.is_none() && file_stale(&path, &cutoff)
            } else {
                file_stale(&path, &cutoff)
            }
        } else {
            file_stale(&path, &cutoff)
        };

        if should_delete {
            fs::remove_file(&path)?;
            removed += 1;
        }
    }

    Ok((scanned, removed))
}

fn parse_completed_at(cutoff: &SystemTime, ts: &str) -> bool {
    // Naïve UTC parser — enough for RFC3339 without subseconds.
    let Ok(dt) = chrono::DateTime::parse_from_rfc3339(ts) else {
        return false; // can't parse — skip, let cleanup catch it later
    };
    let completed = SystemTime::UNIX_EPOCH
        .checked_add(Duration::from_secs(dt.timestamp() as u64))
        .unwrap_or(SystemTime::UNIX_EPOCH);
    completed < *cutoff
}

fn file_stale(path: &Path, cutoff: &SystemTime) -> bool {
    path.metadata()
        .and_then(|m| m.modified())
        .map(|mtime| mtime < *cutoff)
        .unwrap_or(true) // can't stat → delete to be safe
}

fn now_rfc3339() -> String {
    chrono::Utc::now().to_rfc3339()
}