Skip to main content

lean_ctx/core/workflow/
store.rs

1use crate::core::workflow::types::WorkflowRun;
2use std::path::PathBuf;
3
4/// Stale threshold: workflows inactive for over 30 minutes are auto-cleared on load.
5const STALE_MINUTES: i64 = 30;
6
7/// TTL for expired workflow files (24 hours).
8pub const WORKFLOW_TTL_SECS: u64 = 24 * 60 * 60;
9
10fn workflows_dir() -> Option<PathBuf> {
11    crate::core::data_dir::lean_ctx_data_dir()
12        .ok()
13        .map(|d| d.join("workflows"))
14}
15
16fn workflow_path_for_agent(agent_id: Option<&str>) -> Option<PathBuf> {
17    let dir = workflows_dir()?;
18    let filename = match agent_id {
19        Some(id) if !id.trim().is_empty() => {
20            let safe_id: String = id
21                .chars()
22                .map(|c| {
23                    if c.is_alphanumeric() || c == '-' || c == '_' {
24                        c
25                    } else {
26                        '_'
27                    }
28                })
29                .collect();
30            format!("workflow-{safe_id}.json")
31        }
32        _ => "active.json".to_string(),
33    };
34    Some(dir.join(filename))
35}
36
37pub fn load_active() -> Result<Option<WorkflowRun>, String> {
38    load_active_for_agent(None)
39}
40
41pub fn load_active_for_agent(agent_id: Option<&str>) -> Result<Option<WorkflowRun>, String> {
42    let Some(path) = workflow_path_for_agent(agent_id) else {
43        return Ok(None);
44    };
45    let content = match std::fs::read_to_string(&path) {
46        Ok(c) => c,
47        Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
48            // Backward compat: if agent-scoped file missing, try legacy active.json (read-only migration)
49            if agent_id.is_some() {
50                if let Some(legacy) = workflow_path_for_agent(None) {
51                    if let Ok(lc) = std::fs::read_to_string(&legacy) {
52                        let run: WorkflowRun = serde_json::from_str(&lc)
53                            .map_err(|e| format!("Invalid legacy workflow JSON: {e}"))?;
54                        let elapsed = chrono::Utc::now()
55                            .signed_duration_since(run.updated_at)
56                            .num_minutes();
57                        if elapsed <= STALE_MINUTES && run.current != "done" {
58                            return Ok(Some(run));
59                        }
60                    }
61                }
62            }
63            return Ok(None);
64        }
65        Err(e) => return Err(format!("read {}: {e}", path.display())),
66    };
67    let run: WorkflowRun =
68        serde_json::from_str(&content).map_err(|e| format!("Invalid workflow JSON: {e}"))?;
69
70    let elapsed = chrono::Utc::now()
71        .signed_duration_since(run.updated_at)
72        .num_minutes();
73    if elapsed > STALE_MINUTES || run.current == "done" {
74        let _ = std::fs::remove_file(&path);
75        return Ok(None);
76    }
77    Ok(Some(run))
78}
79
80pub fn save_active(run: &WorkflowRun) -> Result<(), String> {
81    save_active_for_agent(run, None)
82}
83
84pub fn save_active_for_agent(run: &WorkflowRun, agent_id: Option<&str>) -> Result<(), String> {
85    let Some(path) = workflow_path_for_agent(agent_id) else {
86        return Err("No home directory available".to_string());
87    };
88    if let Some(parent) = path.parent() {
89        std::fs::create_dir_all(parent).map_err(|e| format!("mkdir failed: {e}"))?;
90    }
91    let json = serde_json::to_string_pretty(run).map_err(|e| format!("serialize failed: {e}"))?;
92    let tmp = path.with_extension("tmp");
93    std::fs::write(&tmp, json).map_err(|e| format!("write failed: {e}"))?;
94    std::fs::rename(&tmp, &path).map_err(|e| format!("rename failed: {e}"))?;
95    Ok(())
96}
97
98pub fn clear_active() -> Result<(), String> {
99    clear_active_for_agent(None)
100}
101
102pub fn clear_active_for_agent(agent_id: Option<&str>) -> Result<(), String> {
103    let Some(path) = workflow_path_for_agent(agent_id) else {
104        return Ok(());
105    };
106    match std::fs::remove_file(&path) {
107        Ok(()) => Ok(()),
108        Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
109        Err(e) => Err(format!("remove {}: {e}", path.display())),
110    }
111}
112
113/// Remove workflow files older than `WORKFLOW_TTL_SECS`.
114pub fn cleanup_expired() -> (u32, u64) {
115    let Some(dir) = workflows_dir() else {
116        return (0, 0);
117    };
118    let Ok(entries) = std::fs::read_dir(&dir) else {
119        return (0, 0);
120    };
121    let now = std::time::SystemTime::now();
122    let mut removed = 0u32;
123    let mut freed = 0u64;
124
125    for entry in entries.flatten() {
126        let path = entry.path();
127        if !path.is_file() {
128            continue;
129        }
130        let ext = path.extension().and_then(|e| e.to_str());
131        if ext != Some("json") {
132            continue;
133        }
134        let Ok(meta) = std::fs::metadata(&path) else {
135            continue;
136        };
137        let age = meta
138            .modified()
139            .ok()
140            .and_then(|m| now.duration_since(m).ok())
141            .map_or(0, |d| d.as_secs());
142        if age > WORKFLOW_TTL_SECS {
143            freed += meta.len();
144            let _ = std::fs::remove_file(&path);
145            removed += 1;
146        }
147    }
148    (removed, freed)
149}