capsula_core/
run.rs

1use chrono::{DateTime, Utc};
2use serde::ser::SerializeStruct;
3use serde::{Deserialize, Serialize, Serializer};
4use std::collections::HashMap;
5use std::io::{self, Read, Write};
6use std::path::{Path, PathBuf};
7use std::process::{Command, ExitStatus, Stdio};
8use std::thread;
9use std::time::{Duration, Instant};
10use ulid::Ulid;
11
12#[derive(Debug, Clone, Deserialize)]
13pub struct Run<Dir = PathBuf> {
14    pub id: Ulid,
15    pub name: String,
16    pub command: Vec<String>,
17    pub run_dir: Dir,
18    pub project_root: PathBuf,
19}
20
21pub type UnpreparedRun = Run<()>;
22pub type PreparedRun = Run<PathBuf>;
23
24impl<Dir> Run<Dir> {
25    pub fn timestamp(&self) -> DateTime<Utc> {
26        // Calculate start time from ULID timestamp
27        let dt: DateTime<Utc> = self.id.datetime().into();
28        dt
29    }
30
31    pub fn gen_run_dir(&self, vault_dir: impl AsRef<Path>) -> PathBuf {
32        let timestamp = self.timestamp();
33        let date_str = timestamp.format("%Y-%m-%d").to_string();
34        let time_str = timestamp.format("%H%M%S").to_string();
35
36        // Prefix the run directory with time because
37        // folders are sorted in natural order, not in lexicographical order,
38        // For example, on macOS Finder, the order is:
39        // 1. 01K5K478KNQ2ZXZG68MWM1Z9X6
40        // 2. 01K5K4571FGKBFTTRJCG1J3DCZ
41        // which is not the correct chronological order.
42        // By adding time prefix, it will be sorted correctly.
43        let run_dir_name = format!("{}-{}", time_str, self.name);
44        vault_dir.as_ref().join(date_str).join(&run_dir_name)
45    }
46}
47
48impl Run<()> {
49    pub fn setup_run_dir(
50        &self,
51        vault_dir: impl AsRef<std::path::Path>,
52        max_retries: usize,
53    ) -> io::Result<Run<PathBuf>> {
54        setup_vault(&vault_dir)?;
55
56        // TODO: Consider removing retries as it is too conservative?
57        let run_dir = {
58            let mut attempt = 0;
59            loop {
60                let candidate = self.gen_run_dir(&vault_dir);
61                if candidate.exists() {
62                    // Slight delay before retrying
63                    thread::sleep(Duration::from_millis(10 * (attempt as u64 + 1)));
64                    attempt += 1;
65                    if attempt >= max_retries {
66                        return Err(io::Error::new(
67                            io::ErrorKind::AlreadyExists,
68                            format!(
69                                "Failed to create unique run directory after {max_retries} attempts",
70                            ),
71                        ));
72                    }
73                } else {
74                    break candidate;
75                }
76            }
77        };
78
79        std::fs::create_dir_all(&run_dir)?;
80        Ok(Run {
81            id: self.id,
82            name: self.name.clone(),
83            command: self.command.clone(),
84            run_dir,
85            project_root: self.project_root.clone(),
86        })
87    }
88}
89
90impl Serialize for Run<()> {
91    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
92    where
93        S: Serializer,
94    {
95        let mut state = serializer.serialize_struct("Run", 5)?;
96        state.serialize_field("id", &self.id)?;
97        state.serialize_field("name", &self.name)?;
98        state.serialize_field("command", &self.command)?;
99        state.serialize_field("timestamp", &self.timestamp().to_rfc3339())?;
100        state.serialize_field("project_root", &self.project_root.to_string_lossy())?;
101        state.end()
102    }
103}
104
105impl Serialize for Run<PathBuf> {
106    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
107    where
108        S: Serializer,
109    {
110        let mut state = serializer.serialize_struct("Run", 6)?;
111        state.serialize_field("id", &self.id)?;
112        state.serialize_field("name", &self.name)?;
113        state.serialize_field("command", &self.command)?;
114        state.serialize_field("timestamp", &self.timestamp().to_rfc3339())?;
115        state.serialize_field("run_dir", &self.run_dir.to_string_lossy())?;
116        state.serialize_field("project_root", &self.project_root.to_string_lossy())?;
117        state.end()
118    }
119}
120
121#[derive(Debug, Clone, Serialize, Deserialize)]
122pub struct RunOutput {
123    pub exit_code: i32,
124    pub stdout: String,
125    pub stderr: String,
126    pub duration: Duration,
127}
128
129fn exit_code_from_status(status: ExitStatus) -> i32 {
130    status.code().unwrap_or_else(|| {
131        // On Unix, process may be terminated by a signal.
132        #[cfg(unix)]
133        {
134            use std::os::unix::process::ExitStatusExt;
135            status.signal().map_or(1, |s| 128 + s)
136        }
137        #[cfg(not(unix))]
138        {
139            1
140        }
141    })
142}
143
144impl Run<PathBuf> {
145    pub fn exec(&self) -> std::io::Result<RunOutput> {
146        if self.command.is_empty() {
147            return Err(io::Error::new(io::ErrorKind::InvalidInput, "empty command"));
148        }
149        let program = &self.command[0];
150        let args: Vec<&str> = self.command[1..].iter().map(String::as_str).collect();
151
152        let mut env_vars = HashMap::new();
153        env_vars.insert("CAPSULA_RUN_ID", self.id.to_string());
154        env_vars.insert("CAPSULA_RUN_NAME", self.name.clone());
155        env_vars.insert(
156            "CAPSULA_RUN_DIRECTORY",
157            self.run_dir.to_string_lossy().to_string(),
158        );
159        env_vars.insert("CAPSULA_RUN_TIMESTAMP", self.timestamp().to_rfc3339());
160        let command_display = shlex::try_join(self.command.iter().map(String::as_str))
161            .unwrap_or_else(|_| self.command.join(" "));
162        env_vars.insert("CAPSULA_RUN_COMMAND", command_display);
163
164        let pre_run_output_json_path = self.run_dir.join("_capsula").join("pre-run.json");
165        env_vars.insert(
166            "CAPSULA_PRE_RUN_OUTPUT_PATH",
167            pre_run_output_json_path.to_string_lossy().to_string(),
168        );
169        env_vars.insert(
170            "CAPSULA_PROJECT_ROOT",
171            self.project_root.to_string_lossy().to_string(),
172        );
173
174        let start = Instant::now();
175
176        let mut child = Command::new(program)
177            .args(&args)
178            .envs(&env_vars)
179            .stdout(Stdio::piped())
180            .stderr(Stdio::piped())
181            .spawn()?;
182
183        let mut child_stdout = child
184            .stdout
185            .take()
186            .ok_or_else(|| io::Error::other("Failed to capture stdout"))?;
187        let mut child_stderr = child
188            .stderr
189            .take()
190            .ok_or_else(|| io::Error::other("Failed to capture stderr"))?;
191
192        let t_out = thread::spawn(move || -> io::Result<Vec<u8>> {
193            let mut cap = Vec::with_capacity(8 * 1024);
194            let mut buf = [0u8; 8192];
195            let mut console = io::stdout().lock();
196
197            loop {
198                let n = child_stdout.read(&mut buf)?;
199                if n == 0 {
200                    break;
201                }
202                console.write_all(&buf[..n])?;
203                cap.extend_from_slice(&buf[..n]);
204            }
205            console.flush()?;
206            Ok(cap)
207        });
208
209        let t_err = thread::spawn(move || -> io::Result<Vec<u8>> {
210            let mut cap = Vec::with_capacity(8 * 1024);
211            let mut buf = [0u8; 8192];
212            let mut console = io::stderr().lock();
213
214            loop {
215                let n = child_stderr.read(&mut buf)?;
216                if n == 0 {
217                    break;
218                }
219                console.write_all(&buf[..n])?;
220                cap.extend_from_slice(&buf[..n]);
221            }
222            console.flush()?;
223            Ok(cap)
224        });
225
226        let status = child.wait()?;
227        let duration = start.elapsed();
228        let cap_out = t_out
229            .join()
230            .map_err(|_| io::Error::other("stdout capture thread panicked"))??;
231        let cap_err = t_err
232            .join()
233            .map_err(|_| io::Error::other("stderr capture thread panicked"))??;
234
235        let exit_code = exit_code_from_status(status);
236
237        Ok(RunOutput {
238            exit_code,
239            stdout: String::from_utf8_lossy(&cap_out).to_string(),
240            stderr: String::from_utf8_lossy(&cap_err).to_string(),
241            duration,
242        })
243    }
244}
245
246fn setup_vault(path: impl AsRef<std::path::Path>) -> io::Result<()> {
247    let path = path.as_ref();
248    if path.exists() {
249        return Ok(());
250    }
251    std::fs::create_dir_all(path)?;
252
253    // Place a .gitignore file to ignore all contents
254    let gitignore_path = path.join(".gitignore");
255    std::fs::write(
256        gitignore_path,
257        "\
258# Automatically generated by Capsula
259*",
260    )?;
261
262    Ok(())
263}