Skip to main content

magic_bird/store/
pending.rs

1//! Pending invocation file operations for crash recovery.
2//!
3//! This module handles the lightweight JSON files that track in-flight invocations.
4//! These files serve as crash-safe markers that can be checked even if DuckDB is
5//! unavailable.
6
7use std::fs;
8use std::path::{Path, PathBuf};
9
10use chrono::{DateTime, Utc};
11use serde::{Deserialize, Serialize};
12use uuid::Uuid;
13
14use crate::error::Result;
15use crate::schema::InvocationRecord;
16
17/// A pending invocation marker stored as JSON.
18///
19/// This is a lightweight representation of an in-flight invocation,
20/// stored as a JSON file for crash recovery.
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct PendingInvocation {
23    /// Unique identifier (matches the parquet record).
24    pub id: Uuid,
25
26    /// Session identifier.
27    pub session_id: String,
28
29    /// When the invocation started.
30    pub timestamp: DateTime<Utc>,
31
32    /// Working directory.
33    pub cwd: String,
34
35    /// Full command string.
36    pub cmd: String,
37
38    /// Runner identifier for liveness checking.
39    pub runner_id: String,
40
41    /// Client identifier.
42    pub client_id: String,
43}
44
45impl PendingInvocation {
46    /// Create a pending invocation from an InvocationRecord.
47    pub fn from_record(record: &InvocationRecord) -> Option<Self> {
48        let runner_id = record.runner_id.clone()?;
49        Some(Self {
50            id: record.id,
51            session_id: record.session_id.clone(),
52            timestamp: record.timestamp,
53            cwd: record.cwd.clone(),
54            cmd: record.cmd.clone(),
55            runner_id,
56            client_id: record.client_id.clone(),
57        })
58    }
59
60    /// Get the filename for this pending invocation.
61    pub fn filename(&self) -> String {
62        format!("{}--{}.pending", self.session_id, self.id)
63    }
64
65    /// Get the full path for this pending file in the given directory.
66    pub fn path(&self, pending_dir: &Path) -> PathBuf {
67        pending_dir.join(self.filename())
68    }
69}
70
71/// Write a pending invocation file.
72pub fn write_pending_file(pending_dir: &Path, pending: &PendingInvocation) -> Result<PathBuf> {
73    fs::create_dir_all(pending_dir)?;
74    let path = pending.path(pending_dir);
75    let content = serde_json::to_string_pretty(pending)?;
76    fs::write(&path, content)?;
77    Ok(path)
78}
79
80/// Delete a pending invocation file.
81pub fn delete_pending_file(pending_dir: &Path, id: Uuid, session_id: &str) -> Result<bool> {
82    let filename = format!("{}--{}.pending", session_id, id);
83    let path = pending_dir.join(filename);
84    if path.exists() {
85        fs::remove_file(&path)?;
86        Ok(true)
87    } else {
88        Ok(false)
89    }
90}
91
92/// List all pending invocation files.
93pub fn list_pending_files(pending_dir: &Path) -> Result<Vec<PendingInvocation>> {
94    if !pending_dir.exists() {
95        return Ok(Vec::new());
96    }
97
98    let mut pending = Vec::new();
99    for entry in fs::read_dir(pending_dir)? {
100        let entry = entry?;
101        let path = entry.path();
102        if path.extension().map(|e| e == "pending").unwrap_or(false) {
103            match fs::read_to_string(&path) {
104                Ok(content) => match serde_json::from_str::<PendingInvocation>(&content) {
105                    Ok(p) => pending.push(p),
106                    Err(e) => {
107                        eprintln!("Warning: failed to parse pending file {:?}: {}", path, e);
108                    }
109                },
110                Err(e) => {
111                    eprintln!("Warning: failed to read pending file {:?}: {}", path, e);
112                }
113            }
114        }
115    }
116
117    Ok(pending)
118}
119
120/// Check if a runner is still alive based on its runner_id.
121///
122/// Supports various runner ID formats:
123/// - `pid:12345` - Local process ID
124/// - `gha:run:12345678` - GitHub Actions run
125/// - `k8s:pod:abc123` - Kubernetes pod
126pub fn is_runner_alive(runner_id: &str) -> bool {
127    if let Some(pid_str) = runner_id.strip_prefix("pid:") {
128        // Local process - check if PID exists
129        if let Ok(pid) = pid_str.parse::<i32>() {
130            return is_pid_alive(pid);
131        }
132    } else if runner_id.starts_with("gha:") {
133        // GitHub Actions - we can't reliably check, assume dead after max_age
134        // TODO: Could use GitHub API to check run status
135        return false;
136    } else if runner_id.starts_with("k8s:") {
137        // Kubernetes - we can't reliably check, assume dead after max_age
138        // TODO: Could use kubectl to check pod status
139        return false;
140    }
141
142    // Unknown format - assume dead
143    false
144}
145
146/// Check if a local PID is still alive.
147#[cfg(unix)]
148fn is_pid_alive(pid: i32) -> bool {
149    // Send signal 0 to check if process exists
150    // This works even for processes owned by other users
151    unsafe { libc::kill(pid, 0) == 0 }
152}
153
154#[cfg(not(unix))]
155fn is_pid_alive(_pid: i32) -> bool {
156    // On non-Unix, we can't easily check - assume alive
157    true
158}
159
160/// Statistics from recovery operations.
161#[derive(Debug, Default, Clone)]
162pub struct RecoveryStats {
163    /// Number of pending files checked.
164    pub pending_checked: usize,
165
166    /// Number still running (runner alive).
167    pub still_running: usize,
168
169    /// Number marked as orphaned.
170    pub orphaned: usize,
171
172    /// Number of errors encountered.
173    pub errors: usize,
174}
175
176#[cfg(test)]
177mod tests {
178    use super::*;
179    use tempfile::TempDir;
180
181    #[test]
182    fn test_pending_file_lifecycle() {
183        let tmp = TempDir::new().unwrap();
184        let pending_dir = tmp.path().join("pending");
185
186        // Create a pending invocation
187        let record = InvocationRecord::new_pending_local(
188            "test-session",
189            "echo hello",
190            "/tmp",
191            std::process::id() as i32,
192            "test@localhost",
193        );
194
195        let pending = PendingInvocation::from_record(&record).unwrap();
196
197        // Write pending file
198        let path = write_pending_file(&pending_dir, &pending).unwrap();
199        assert!(path.exists());
200
201        // List pending files
202        let files = list_pending_files(&pending_dir).unwrap();
203        assert_eq!(files.len(), 1);
204        assert_eq!(files[0].id, record.id);
205        assert_eq!(files[0].cmd, "echo hello");
206
207        // Delete pending file
208        let deleted = delete_pending_file(&pending_dir, record.id, &record.session_id).unwrap();
209        assert!(deleted);
210        assert!(!path.exists());
211
212        // List should be empty now
213        let files = list_pending_files(&pending_dir).unwrap();
214        assert!(files.is_empty());
215    }
216
217    #[test]
218    fn test_is_runner_alive_current_process() {
219        let pid = std::process::id() as i32;
220        let runner_id = format!("pid:{}", pid);
221        assert!(is_runner_alive(&runner_id));
222    }
223
224    #[test]
225    fn test_is_runner_alive_dead_process() {
226        // PID 1 exists but we can't signal it, PID 999999 likely doesn't exist
227        let runner_id = "pid:999999999";
228        assert!(!is_runner_alive(runner_id));
229    }
230
231    #[test]
232    fn test_is_runner_alive_unknown_format() {
233        assert!(!is_runner_alive("unknown:123"));
234        assert!(!is_runner_alive("gha:run:12345"));
235        assert!(!is_runner_alive("k8s:pod:abc123"));
236    }
237
238    #[test]
239    fn test_pending_filename() {
240        let pending = PendingInvocation {
241            id: Uuid::parse_str("01234567-89ab-cdef-0123-456789abcdef").unwrap(),
242            session_id: "test-session".to_string(),
243            timestamp: Utc::now(),
244            cwd: "/tmp".to_string(),
245            cmd: "echo hello".to_string(),
246            runner_id: "pid:12345".to_string(),
247            client_id: "test@localhost".to_string(),
248        };
249
250        assert_eq!(
251            pending.filename(),
252            "test-session--01234567-89ab-cdef-0123-456789abcdef.pending"
253        );
254    }
255}