Skip to main content

zag_agent/
process_store.rs

1//! Process tracking store.
2//!
3//! Persists process metadata in `~/.zag/processes.json` so that
4//! `zag ps` can list, inspect, and kill running agent processes.
5
6use crate::config::Config;
7use anyhow::{Context, Result};
8use chrono::{DateTime, FixedOffset};
9use log::debug;
10use serde::{Deserialize, Serialize};
11use std::path::PathBuf;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct ProcessEntry {
15    /// UUID used as the CLI reference handle.
16    pub id: String,
17    /// OS PID of the zag wrapper process.
18    pub pid: u32,
19    /// Associated session ID (links to SessionEntry), if any.
20    #[serde(default)]
21    pub session_id: Option<String>,
22    pub provider: String,
23    pub model: String,
24    /// Subcommand: "run", "exec", "review".
25    pub command: String,
26    /// First 100 characters of the prompt, if any.
27    #[serde(default)]
28    pub prompt: Option<String>,
29    pub started_at: String,
30    /// "running" | "exited" | "killed"
31    pub status: String,
32    #[serde(default)]
33    pub exit_code: Option<i32>,
34    #[serde(default)]
35    pub exited_at: Option<String>,
36    /// Project root path (for context).
37    #[serde(default)]
38    pub root: Option<String>,
39    /// Process ID of the parent zag process (if nested).
40    #[serde(default)]
41    pub parent_process_id: Option<String>,
42    /// Session ID of the parent zag process (if nested).
43    #[serde(default)]
44    pub parent_session_id: Option<String>,
45}
46
47#[derive(Debug, Clone, Default, Serialize, Deserialize)]
48pub struct ProcessStore {
49    pub processes: Vec<ProcessEntry>,
50}
51
52impl ProcessStore {
53    fn path() -> PathBuf {
54        Config::global_base_dir().join("processes.json")
55    }
56
57    /// Load process store from disk. Returns empty store if the file doesn't exist.
58    pub fn load() -> Result<Self> {
59        let path = Self::path();
60        debug!("Loading process store from {}", path.display());
61        if !path.exists() {
62            return Ok(Self::default());
63        }
64        let content = std::fs::read_to_string(&path)
65            .with_context(|| format!("Failed to read process store: {}", path.display()))?;
66        let store: ProcessStore = serde_json::from_str(&content)
67            .with_context(|| format!("Failed to parse process store: {}", path.display()))?;
68        debug!("Loaded {} process entries", store.processes.len());
69        Ok(store)
70    }
71
72    /// Save process store to disk.
73    pub fn save(&self) -> Result<()> {
74        let path = Self::path();
75        if let Some(parent) = path.parent() {
76            std::fs::create_dir_all(parent)
77                .with_context(|| format!("Failed to create directory: {}", parent.display()))?;
78        }
79        let content =
80            serde_json::to_string_pretty(self).context("Failed to serialize process store")?;
81        crate::file_util::atomic_write_str(&path, &content)
82            .with_context(|| format!("Failed to write process store: {}", path.display()))?;
83        debug!("Process store saved ({} entries)", self.processes.len());
84        Ok(())
85    }
86
87    /// Add a new process entry, replacing any existing entry with the same id.
88    pub fn add(&mut self, entry: ProcessEntry) {
89        self.processes.retain(|e| e.id != entry.id);
90        debug!(
91            "Adding process: id={}, pid={}, provider={}",
92            entry.id, entry.pid, entry.provider
93        );
94        self.processes.push(entry);
95    }
96
97    /// Update the OS pid of a process entry.
98    ///
99    /// Used when the entry was first registered with a placeholder pid
100    /// (typically the parent zag pid) and the actual agent subprocess
101    /// pid becomes available only after spawn. Keeping the registry
102    /// pointed at the agent child lets `zag ps kill self` SIGTERM the
103    /// agent without tearing down the parent orchestrator.
104    pub fn update_pid(&mut self, id: &str, pid: u32) {
105        if let Some(entry) = self.processes.iter_mut().find(|e| e.id == id) {
106            debug!(
107                "Updated process {id}: pid {} -> {} (agent subprocess)",
108                entry.pid, pid
109            );
110            entry.pid = pid;
111        }
112    }
113
114    /// Update the status and exit metadata for a process entry.
115    pub fn update_status(&mut self, id: &str, status: &str, exit_code: Option<i32>) {
116        if let Some(entry) = self.processes.iter_mut().find(|e| e.id == id) {
117            entry.status = status.to_string();
118            entry.exit_code = exit_code;
119            entry.exited_at = Some(chrono::Utc::now().to_rfc3339());
120            debug!("Updated process {id}: status={status}, exit_code={exit_code:?}");
121        }
122    }
123
124    /// Find a process entry by id.
125    pub fn find(&self, id: &str) -> Option<&ProcessEntry> {
126        self.processes.iter().find(|e| e.id == id)
127    }
128
129    /// List process entries sorted by started_at descending (newest first).
130    pub fn list_recent(&self, limit: Option<usize>) -> Vec<&ProcessEntry> {
131        let mut entries: Vec<&ProcessEntry> = self.processes.iter().collect();
132        entries.sort_by(|a, b| {
133            parse_started_at(&b.started_at)
134                .cmp(&parse_started_at(&a.started_at))
135                .then_with(|| b.id.cmp(&a.id))
136        });
137        if let Some(n) = limit {
138            entries.truncate(n);
139        }
140        entries
141    }
142}
143
144fn parse_started_at(s: &str) -> Option<DateTime<FixedOffset>> {
145    DateTime::parse_from_rfc3339(s).ok()
146}
147
148#[cfg(test)]
149#[path = "process_store_tests.rs"]
150mod tests;