magic_bird/store/
pending.rs1use std::fs;
8use std::path::{Path, PathBuf};
9
10use chrono::{DateTime, Utc};
11use serde::{Deserialize, Serialize};
12use uuid::Uuid;
13
14use crate::error::Result;
15use crate::schema::InvocationRecord;
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct PendingInvocation {
23 pub id: Uuid,
25
26 pub session_id: String,
28
29 pub timestamp: DateTime<Utc>,
31
32 pub cwd: String,
34
35 pub cmd: String,
37
38 pub runner_id: String,
40
41 pub client_id: String,
43}
44
45impl PendingInvocation {
46 pub fn from_record(record: &InvocationRecord) -> Option<Self> {
48 let runner_id = record.runner_id.clone()?;
49 Some(Self {
50 id: record.id,
51 session_id: record.session_id.clone(),
52 timestamp: record.timestamp,
53 cwd: record.cwd.clone(),
54 cmd: record.cmd.clone(),
55 runner_id,
56 client_id: record.client_id.clone(),
57 })
58 }
59
60 pub fn filename(&self) -> String {
62 format!("{}--{}.pending", self.session_id, self.id)
63 }
64
65 pub fn path(&self, pending_dir: &Path) -> PathBuf {
67 pending_dir.join(self.filename())
68 }
69}
70
71pub fn write_pending_file(pending_dir: &Path, pending: &PendingInvocation) -> Result<PathBuf> {
73 fs::create_dir_all(pending_dir)?;
74 let path = pending.path(pending_dir);
75 let content = serde_json::to_string_pretty(pending)?;
76 fs::write(&path, content)?;
77 Ok(path)
78}
79
80pub fn delete_pending_file(pending_dir: &Path, id: Uuid, session_id: &str) -> Result<bool> {
82 let filename = format!("{}--{}.pending", session_id, id);
83 let path = pending_dir.join(filename);
84 if path.exists() {
85 fs::remove_file(&path)?;
86 Ok(true)
87 } else {
88 Ok(false)
89 }
90}
91
92pub fn list_pending_files(pending_dir: &Path) -> Result<Vec<PendingInvocation>> {
94 if !pending_dir.exists() {
95 return Ok(Vec::new());
96 }
97
98 let mut pending = Vec::new();
99 for entry in fs::read_dir(pending_dir)? {
100 let entry = entry?;
101 let path = entry.path();
102 if path.extension().map(|e| e == "pending").unwrap_or(false) {
103 match fs::read_to_string(&path) {
104 Ok(content) => match serde_json::from_str::<PendingInvocation>(&content) {
105 Ok(p) => pending.push(p),
106 Err(e) => {
107 eprintln!("Warning: failed to parse pending file {:?}: {}", path, e);
108 }
109 },
110 Err(e) => {
111 eprintln!("Warning: failed to read pending file {:?}: {}", path, e);
112 }
113 }
114 }
115 }
116
117 Ok(pending)
118}
119
120pub fn is_runner_alive(runner_id: &str) -> bool {
127 if let Some(pid_str) = runner_id.strip_prefix("pid:") {
128 if let Ok(pid) = pid_str.parse::<i32>() {
130 return is_pid_alive(pid);
131 }
132 } else if runner_id.starts_with("gha:") {
133 return false;
136 } else if runner_id.starts_with("k8s:") {
137 return false;
140 }
141
142 false
144}
145
146#[cfg(unix)]
148fn is_pid_alive(pid: i32) -> bool {
149 unsafe { libc::kill(pid, 0) == 0 }
152}
153
154#[cfg(not(unix))]
155fn is_pid_alive(_pid: i32) -> bool {
156 true
158}
159
160#[derive(Debug, Default, Clone)]
162pub struct RecoveryStats {
163 pub pending_checked: usize,
165
166 pub still_running: usize,
168
169 pub orphaned: usize,
171
172 pub errors: usize,
174}
175
176#[cfg(test)]
177mod tests {
178 use super::*;
179 use tempfile::TempDir;
180
181 #[test]
182 fn test_pending_file_lifecycle() {
183 let tmp = TempDir::new().unwrap();
184 let pending_dir = tmp.path().join("pending");
185
186 let record = InvocationRecord::new_pending_local(
188 "test-session",
189 "echo hello",
190 "/tmp",
191 std::process::id() as i32,
192 "test@localhost",
193 );
194
195 let pending = PendingInvocation::from_record(&record).unwrap();
196
197 let path = write_pending_file(&pending_dir, &pending).unwrap();
199 assert!(path.exists());
200
201 let files = list_pending_files(&pending_dir).unwrap();
203 assert_eq!(files.len(), 1);
204 assert_eq!(files[0].id, record.id);
205 assert_eq!(files[0].cmd, "echo hello");
206
207 let deleted = delete_pending_file(&pending_dir, record.id, &record.session_id).unwrap();
209 assert!(deleted);
210 assert!(!path.exists());
211
212 let files = list_pending_files(&pending_dir).unwrap();
214 assert!(files.is_empty());
215 }
216
217 #[test]
218 fn test_is_runner_alive_current_process() {
219 let pid = std::process::id() as i32;
220 let runner_id = format!("pid:{}", pid);
221 assert!(is_runner_alive(&runner_id));
222 }
223
224 #[test]
225 fn test_is_runner_alive_dead_process() {
226 let runner_id = "pid:999999999";
228 assert!(!is_runner_alive(runner_id));
229 }
230
231 #[test]
232 fn test_is_runner_alive_unknown_format() {
233 assert!(!is_runner_alive("unknown:123"));
234 assert!(!is_runner_alive("gha:run:12345"));
235 assert!(!is_runner_alive("k8s:pod:abc123"));
236 }
237
238 #[test]
239 fn test_pending_filename() {
240 let pending = PendingInvocation {
241 id: Uuid::parse_str("01234567-89ab-cdef-0123-456789abcdef").unwrap(),
242 session_id: "test-session".to_string(),
243 timestamp: Utc::now(),
244 cwd: "/tmp".to_string(),
245 cmd: "echo hello".to_string(),
246 runner_id: "pid:12345".to_string(),
247 client_id: "test@localhost".to_string(),
248 };
249
250 assert_eq!(
251 pending.filename(),
252 "test-session--01234567-89ab-cdef-0123-456789abcdef.pending"
253 );
254 }
255}