Skip to main content

imp_core/
mana_run_state.rs

1use serde::{Deserialize, Serialize};
2
3const FINISHED_RUN_TTL_MS: u128 = 24 * 60 * 60 * 1000;
4const INTERRUPTED_RUN_STALE_MS: u128 = 6 * 60 * 60 * 1000;
5
6#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
7pub struct ManaRunAgentSummary {
8    pub unit_id: String,
9    pub title: String,
10    pub action: String,
11    pub status: String,
12}
13
14#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
15pub struct ManaRunSummary {
16    pub run_id: String,
17    pub scope: String,
18    pub status: String,
19    pub total_units: usize,
20    pub total_closed: usize,
21    pub total_failed: usize,
22    pub total_awaiting_verify: usize,
23    pub latest: Option<String>,
24    pub logs: Vec<String>,
25    pub agents: Vec<ManaRunAgentSummary>,
26}
27
28#[derive(Debug, Clone, Serialize, Deserialize, Default)]
29struct PersistedRunStore {
30    #[allow(dead_code)]
31    next_id: u64,
32    #[serde(default)]
33    runs: Vec<PersistedRunState>,
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
37struct PersistedRunState {
38    run_id: String,
39    scope: String,
40    status: String,
41    error: Option<String>,
42    finished_at_ms: Option<u128>,
43    #[serde(default)]
44    last_event_at_ms: u128,
45    summary: PersistedRunSummary,
46    #[serde(default)]
47    log_lines: Vec<String>,
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize, Default)]
51struct PersistedRunSummary {
52    #[serde(default)]
53    total_units: usize,
54    #[serde(default)]
55    total_closed: usize,
56    #[serde(default)]
57    total_failed: usize,
58    #[serde(default)]
59    total_awaiting_verify: usize,
60}
61
62pub fn mana_run_summary(run_id: &str) -> Result<Option<ManaRunSummary>, String> {
63    let store = load_run_store()?;
64    Ok(store
65        .runs
66        .into_iter()
67        .find(|run| run.run_id == run_id)
68        .map(PersistedRunState::into_summary))
69}
70
71pub fn stop_mana_run(run_id: &str) -> Result<Option<ManaRunSummary>, String> {
72    let mut store = load_run_store()?;
73    let Some(run) = store.runs.iter_mut().find(|run| run.run_id == run_id) else {
74        return Ok(None);
75    };
76
77    let now = unix_time_ms();
78    if run.finished_at_ms.is_none() {
79        run.status = "interrupted".to_string();
80        run.error =
81            Some("Stopped from imp /stop; external workers may need manual cleanup".to_string());
82        run.finished_at_ms = Some(now);
83        run.last_event_at_ms = now;
84        run.log_lines.push("Run stopped from imp /stop".to_string());
85        save_run_store(&store)?;
86    }
87
88    Ok(load_run_store()?
89        .runs
90        .into_iter()
91        .find(|run| run.run_id == run_id)
92        .map(PersistedRunState::into_summary))
93}
94
95fn load_run_store() -> Result<PersistedRunStore, String> {
96    let path = run_state_file();
97    if !path.exists() {
98        return Ok(PersistedRunStore::default());
99    }
100    let contents =
101        std::fs::read_to_string(&path).map_err(|err| format!("read {}: {err}", path.display()))?;
102    if contents.trim().is_empty() {
103        return Ok(PersistedRunStore::default());
104    }
105    let mut store: PersistedRunStore = serde_json::from_str(&contents)
106        .map_err(|err| format!("parse {}: {err}", path.display()))?;
107    classify_stale_unfinished_runs(&mut store);
108    Ok(store)
109}
110
111fn save_run_store(store: &PersistedRunStore) -> Result<(), String> {
112    let path = run_state_file();
113    if let Some(parent) = path.parent() {
114        std::fs::create_dir_all(parent)
115            .map_err(|err| format!("create {}: {err}", parent.display()))?;
116    }
117    let json = serde_json::to_string_pretty(store)
118        .map_err(|err| format!("serialize {}: {err}", path.display()))?;
119    std::fs::write(&path, json).map_err(|err| format!("write {}: {err}", path.display()))
120}
121
122fn classify_stale_unfinished_runs(store: &mut PersistedRunStore) {
123    let cutoff = unix_time_ms().saturating_sub(INTERRUPTED_RUN_STALE_MS);
124    for run in &mut store.runs {
125        if (run.status == "starting" || run.status == "running")
126            && run.finished_at_ms.is_none()
127            && run.last_event_at_ms > 0
128            && run.last_event_at_ms < cutoff
129        {
130            run.status = "interrupted".to_string();
131            run.error = Some(
132                "Run state is stale after process restart or lost background worker; inspect logs before rerun"
133                    .to_string(),
134            );
135            run.finished_at_ms = Some(run.last_event_at_ms);
136        }
137    }
138
139    let cutoff = unix_time_ms().saturating_sub(FINISHED_RUN_TTL_MS);
140    store.runs.retain(|run| match run.finished_at_ms {
141        Some(finished_at_ms) => finished_at_ms >= cutoff,
142        None => true,
143    });
144}
145
146impl PersistedRunState {
147    fn into_summary(self) -> ManaRunSummary {
148        let logs = self
149            .log_lines
150            .into_iter()
151            .filter(|line| !line.trim().is_empty())
152            .collect::<Vec<_>>();
153        let latest = logs.last().cloned();
154        let agents = load_agent_summaries().unwrap_or_default();
155        ManaRunSummary {
156            run_id: self.run_id,
157            scope: self.scope,
158            status: self.status,
159            total_units: self.summary.total_units,
160            total_closed: self.summary.total_closed,
161            total_failed: self.summary.total_failed,
162            total_awaiting_verify: self.summary.total_awaiting_verify,
163            latest,
164            logs,
165            agents,
166        }
167    }
168}
169
170#[derive(Debug, Clone, Serialize, Deserialize)]
171struct PersistedAgentEntry {
172    title: String,
173    action: String,
174    #[serde(default)]
175    finished_at: Option<i64>,
176    #[serde(default)]
177    exit_code: Option<i32>,
178}
179
180fn load_agent_summaries() -> Result<Vec<ManaRunAgentSummary>, String> {
181    let path = agents_file();
182    if !path.exists() {
183        return Ok(Vec::new());
184    }
185    let contents =
186        std::fs::read_to_string(&path).map_err(|err| format!("read {}: {err}", path.display()))?;
187    if contents.trim().is_empty() {
188        return Ok(Vec::new());
189    }
190    let agents: std::collections::HashMap<String, PersistedAgentEntry> =
191        serde_json::from_str(&contents)
192            .map_err(|err| format!("parse {}: {err}", path.display()))?;
193    let mut summaries = agents
194        .into_iter()
195        .map(|(unit_id, entry)| {
196            let status = agent_status(&entry);
197            ManaRunAgentSummary {
198                unit_id,
199                title: entry.title,
200                action: entry.action,
201                status,
202            }
203        })
204        .collect::<Vec<_>>();
205    summaries.sort_by(|a, b| a.unit_id.cmp(&b.unit_id));
206    Ok(summaries)
207}
208
209fn agent_status(entry: &PersistedAgentEntry) -> String {
210    match (entry.finished_at, entry.exit_code) {
211        (None, _) => "running".to_string(),
212        (Some(_), Some(0)) => "done".to_string(),
213        (Some(_), Some(code)) => format!("failed({code})"),
214        (Some(_), None) => "done".to_string(),
215    }
216}
217
218fn agents_file() -> std::path::PathBuf {
219    if let Ok(path) = mana::commands::agents::agents_file_path() {
220        return path;
221    }
222    let dir = std::env::var("HOME")
223        .map(|home| {
224            std::path::PathBuf::from(home)
225                .join(".local")
226                .join("share")
227                .join("units")
228        })
229        .unwrap_or_else(|_| std::path::PathBuf::from("/tmp").join("mana"));
230    std::fs::create_dir_all(&dir).ok();
231    dir.join("agents.json")
232}
233
234fn run_state_file() -> std::path::PathBuf {
235    if let Ok(path) = mana::commands::agents::agents_file_path() {
236        if let Some(dir) = path.parent() {
237            std::fs::create_dir_all(dir).ok();
238            return dir.join("run_state.json");
239        }
240    }
241
242    let dir = std::env::var("HOME")
243        .map(|home| {
244            std::path::PathBuf::from(home)
245                .join(".local")
246                .join("share")
247                .join("units")
248        })
249        .unwrap_or_else(|_| std::path::PathBuf::from("/tmp").join("mana"));
250    std::fs::create_dir_all(&dir).ok();
251    dir.join("run_state.json")
252}
253
254fn unix_time_ms() -> u128 {
255    std::time::SystemTime::now()
256        .duration_since(std::time::UNIX_EPOCH)
257        .unwrap_or_default()
258        .as_millis()
259}