capsula_core/
run.rs

1use chrono::{DateTime, Utc};
2use serde::ser::SerializeStruct;
3use serde::{Deserialize, Serialize, Serializer};
4use std::collections::HashMap;
5use std::io::{self, Read, Write};
6use std::path::{Path, PathBuf};
7use std::process::{Command, ExitStatus, Stdio};
8use std::thread;
9use std::time::{Duration, Instant};
10use ulid::Ulid;
11
12#[derive(Debug, Clone, Deserialize)]
13pub struct Run<Dir = PathBuf> {
14    pub id: Ulid,
15    pub name: String,
16    pub command: Vec<String>,
17    pub run_dir: Dir,
18    pub project_root: PathBuf,
19}
20
21pub type UnpreparedRun = Run<()>;
22pub type PreparedRun = Run<PathBuf>;
23
24impl<Dir> Run<Dir> {
25    pub fn timestamp(&self) -> DateTime<Utc> {
26        // Calculate start time from ULID timestamp
27        let dt: DateTime<Utc> = self.id.datetime().into();
28        dt
29    }
30
31    pub fn gen_run_dir(&self, vault_dir: impl AsRef<Path>) -> PathBuf {
32        let timestamp = self.timestamp();
33        let date_str = timestamp.format("%Y-%m-%d").to_string();
34        let time_str = timestamp.format("%H%M%S").to_string();
35
36        // Prefix the run directory with time because
37        // folders are sorted in natural order, not in lexicographical order,
38        // For example, on macOS Finder, the order is:
39        // 1. 01K5K478KNQ2ZXZG68MWM1Z9X6
40        // 2. 01K5K4571FGKBFTTRJCG1J3DCZ
41        // which is not the correct chronological order.
42        // By adding time prefix, it will be sorted correctly.
43        let run_dir_name = format!("{}-{}", time_str, self.name);
44        vault_dir.as_ref().join(date_str).join(&run_dir_name)
45    }
46}
47
48impl Run<()> {
49    pub fn setup_run_dir(
50        &self,
51        vault_dir: impl AsRef<std::path::Path>,
52        max_retries: usize,
53    ) -> io::Result<Run<PathBuf>> {
54        setup_vault(&vault_dir)?;
55
56        // TODO: Consider removing retries as it is too conservative?
57        let run_dir = {
58            let mut attempt = 0;
59            loop {
60                let candidate = self.gen_run_dir(&vault_dir);
61                if candidate.exists() {
62                    // Slight delay before retrying
63                    thread::sleep(Duration::from_millis(10 * (attempt as u64 + 1)));
64                    attempt += 1;
65                    if attempt >= max_retries {
66                        return Err(io::Error::new(
67                            io::ErrorKind::AlreadyExists,
68                            format!(
69                                "Failed to create unique run directory after {} attempts",
70                                max_retries
71                            ),
72                        ));
73                    }
74                    continue;
75                } else {
76                    break candidate;
77                }
78            }
79        };
80
81        std::fs::create_dir_all(&run_dir)?;
82        Ok(Run {
83            id: self.id,
84            name: self.name.clone(),
85            command: self.command.clone(),
86            run_dir,
87            project_root: self.project_root.clone(),
88        })
89    }
90}
91
92impl Serialize for Run<()> {
93    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
94    where
95        S: Serializer,
96    {
97        let mut state = serializer.serialize_struct("Run", 5)?;
98        state.serialize_field("id", &self.id)?;
99        state.serialize_field("name", &self.name)?;
100        state.serialize_field("command", &self.command)?;
101        state.serialize_field("timestamp", &self.timestamp().to_rfc3339())?;
102        state.serialize_field("project_root", &self.project_root.to_string_lossy())?;
103        state.end()
104    }
105}
106
107impl Serialize for Run<PathBuf> {
108    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
109    where
110        S: Serializer,
111    {
112        let mut state = serializer.serialize_struct("Run", 6)?;
113        state.serialize_field("id", &self.id)?;
114        state.serialize_field("name", &self.name)?;
115        state.serialize_field("command", &self.command)?;
116        state.serialize_field("timestamp", &self.timestamp().to_rfc3339())?;
117        state.serialize_field("run_dir", &self.run_dir.to_string_lossy())?;
118        state.serialize_field("project_root", &self.project_root.to_string_lossy())?;
119        state.end()
120    }
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct RunOutput {
125    pub exit_code: i32,
126    pub stdout: String,
127    pub stderr: String,
128    pub duration: Duration,
129}
130
131fn exit_code_from_status(status: ExitStatus) -> i32 {
132    match status.code() {
133        Some(c) => c,
134        None => {
135            // On Unix, process may be terminated by a signal.
136            #[cfg(unix)]
137            {
138                use std::os::unix::process::ExitStatusExt;
139                status.signal().map(|s| 128 + s).unwrap_or(1)
140            }
141            #[cfg(not(unix))]
142            {
143                1
144            }
145        }
146    }
147}
148
149impl Run<PathBuf> {
150    pub fn exec(&self) -> std::io::Result<RunOutput> {
151        if self.command.is_empty() {
152            return Err(io::Error::new(io::ErrorKind::InvalidInput, "empty command"));
153        }
154        let program = &self.command[0];
155        let args: Vec<&str> = self.command[1..].iter().map(|s| s.as_str()).collect();
156
157        let mut env_vars = HashMap::new();
158        env_vars.insert("CAPSULA_RUN_ID", self.id.to_string());
159        env_vars.insert("CAPSULA_RUN_NAME", self.name.clone());
160        env_vars.insert(
161            "CAPSULA_RUN_DIRECTORY",
162            self.run_dir.to_string_lossy().to_string(),
163        );
164        env_vars.insert("CAPSULA_RUN_TIMESTAMP", self.timestamp().to_rfc3339());
165        let command_display = shlex::try_join(self.command.iter().map(|s| s.as_str()))
166            .unwrap_or_else(|_| self.command.join(" "));
167        env_vars.insert("CAPSULA_RUN_COMMAND", command_display);
168
169        let pre_run_output_json_path = self.run_dir.join("_capsula").join("pre-run.json");
170        env_vars.insert(
171            "CAPSULA_PRE_RUN_OUTPUT_PATH",
172            pre_run_output_json_path.to_string_lossy().to_string(),
173        );
174        env_vars.insert(
175            "CAPSULA_PROJECT_ROOT",
176            self.project_root.to_string_lossy().to_string(),
177        );
178
179        let start = Instant::now();
180
181        let mut child = Command::new(program)
182            .args(&args)
183            .envs(&env_vars)
184            .stdout(Stdio::piped())
185            .stderr(Stdio::piped())
186            .spawn()?;
187
188        let mut child_stdout = child
189            .stdout
190            .take()
191            .ok_or_else(|| io::Error::other("Failed to capture stdout"))?;
192        let mut child_stderr = child
193            .stderr
194            .take()
195            .ok_or_else(|| io::Error::other("Failed to capture stderr"))?;
196
197        let t_out = thread::spawn(move || -> io::Result<Vec<u8>> {
198            let mut cap = Vec::with_capacity(8 * 1024);
199            let mut buf = [0u8; 8192];
200            let mut console = io::stdout().lock();
201
202            loop {
203                let n = child_stdout.read(&mut buf)?;
204                if n == 0 {
205                    break;
206                }
207                console.write_all(&buf[..n])?;
208                cap.extend_from_slice(&buf[..n]);
209            }
210            console.flush()?;
211            Ok(cap)
212        });
213
214        let t_err = thread::spawn(move || -> io::Result<Vec<u8>> {
215            let mut cap = Vec::with_capacity(8 * 1024);
216            let mut buf = [0u8; 8192];
217            let mut console = io::stderr().lock();
218
219            loop {
220                let n = child_stderr.read(&mut buf)?;
221                if n == 0 {
222                    break;
223                }
224                console.write_all(&buf[..n])?;
225                cap.extend_from_slice(&buf[..n]);
226            }
227            console.flush()?;
228            Ok(cap)
229        });
230
231        let status = child.wait()?;
232        let duration = start.elapsed();
233        let cap_out = t_out
234            .join()
235            .map_err(|_| io::Error::other("stdout capture thread panicked"))??;
236        let cap_err = t_err
237            .join()
238            .map_err(|_| io::Error::other("stderr capture thread panicked"))??;
239
240        let exit_code = exit_code_from_status(status);
241
242        Ok(RunOutput {
243            exit_code,
244            stdout: String::from_utf8_lossy(&cap_out).to_string(),
245            stderr: String::from_utf8_lossy(&cap_err).to_string(),
246            duration,
247        })
248    }
249}
250
251fn setup_vault(path: impl AsRef<std::path::Path>) -> io::Result<()> {
252    let path = path.as_ref();
253    if path.exists() {
254        return Ok(());
255    }
256    std::fs::create_dir_all(path)?;
257
258    // Place a .gitignore file to ignore all contents
259    let gitignore_path = path.join(".gitignore");
260    std::fs::write(
261        gitignore_path,
262        "\
263# Automatically generated by Capsula
264*",
265    )?;
266
267    Ok(())
268}