Skip to main content

capsula_core/
run.rs

1use chrono::{DateTime, Utc};
2use serde::ser::SerializeStruct;
3use serde::{Deserialize, Serialize, Serializer};
4use std::collections::HashMap;
5use std::io::{self, Read, Write};
6use std::path::{Path, PathBuf};
7use std::process::{Command, ExitStatus, Stdio};
8use std::thread;
9use std::time::{Duration, Instant};
10use tracing::debug;
11use ulid::Ulid;
12
13#[derive(Debug, Clone, Deserialize)]
14pub struct Run<Dir = PathBuf> {
15    pub id: Ulid,
16    pub name: String,
17    pub command: Vec<String>,
18    pub run_dir: Dir,
19    pub project_root: PathBuf,
20}
21
22pub type UnpreparedRun = Run<()>;
23pub type PreparedRun = Run<PathBuf>;
24
25impl<Dir> Run<Dir> {
26    pub fn timestamp(&self) -> DateTime<Utc> {
27        // Calculate start time from ULID timestamp
28        let dt: DateTime<Utc> = self.id.datetime().into();
29        dt
30    }
31
32    pub fn gen_run_dir(&self, vault_dir: impl AsRef<Path>) -> PathBuf {
33        let timestamp = self.timestamp();
34        let date_str = timestamp.format("%Y-%m-%d").to_string();
35        let time_str = timestamp.format("%H%M%S").to_string();
36
37        // Prefix the run directory with time because
38        // folders are sorted in natural order, not in lexicographical order,
39        // For example, on macOS Finder, the order is:
40        // 1. 01K5K478KNQ2ZXZG68MWM1Z9X6
41        // 2. 01K5K4571FGKBFTTRJCG1J3DCZ
42        // which is not the correct chronological order.
43        // By adding time prefix, it will be sorted correctly.
44        let run_dir_name = format!("{}-{}", time_str, self.name);
45        vault_dir.as_ref().join(date_str).join(&run_dir_name)
46    }
47}
48
49impl Run<()> {
50    pub fn setup_run_dir(
51        &self,
52        vault_dir: impl AsRef<std::path::Path>,
53        max_retries: usize,
54    ) -> io::Result<Run<PathBuf>> {
55        debug!(
56            "Setting up vault directory: {}",
57            vault_dir.as_ref().display()
58        );
59        setup_vault(&vault_dir)?;
60
61        // TODO: Consider removing retries as it is too conservative?
62        let run_dir = {
63            let mut attempt = 0;
64            loop {
65                let candidate = self.gen_run_dir(&vault_dir);
66                debug!("Candidate run directory: {}", candidate.display());
67                if candidate.exists() {
68                    debug!(
69                        "Directory already exists, retrying (attempt {})",
70                        attempt + 1
71                    );
72                    // Slight delay before retrying
73                    thread::sleep(Duration::from_millis(10 * (attempt as u64 + 1)));
74                    attempt += 1;
75                    if attempt >= max_retries {
76                        return Err(io::Error::new(
77                            io::ErrorKind::AlreadyExists,
78                            format!(
79                                "Failed to create unique run directory after {max_retries} attempts",
80                            ),
81                        ));
82                    }
83                } else {
84                    break candidate;
85                }
86            }
87        };
88
89        debug!("Creating run directory: {}", run_dir.display());
90        std::fs::create_dir_all(&run_dir)?;
91        debug!("Run directory created successfully");
92
93        Ok(Run {
94            id: self.id,
95            name: self.name.clone(),
96            command: self.command.clone(),
97            run_dir,
98            project_root: self.project_root.clone(),
99        })
100    }
101}
102
103impl Serialize for Run<()> {
104    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
105    where
106        S: Serializer,
107    {
108        let mut state = serializer.serialize_struct("Run", 5)?;
109        state.serialize_field("id", &self.id)?;
110        state.serialize_field("name", &self.name)?;
111        state.serialize_field("command", &self.command)?;
112        state.serialize_field("timestamp", &self.timestamp().to_rfc3339())?;
113        state.serialize_field("project_root", &self.project_root.to_string_lossy())?;
114        state.end()
115    }
116}
117
118impl Serialize for Run<PathBuf> {
119    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
120    where
121        S: Serializer,
122    {
123        let mut state = serializer.serialize_struct("Run", 6)?;
124        state.serialize_field("id", &self.id)?;
125        state.serialize_field("name", &self.name)?;
126        state.serialize_field("command", &self.command)?;
127        state.serialize_field("timestamp", &self.timestamp().to_rfc3339())?;
128        state.serialize_field("run_dir", &self.run_dir.to_string_lossy())?;
129        state.serialize_field("project_root", &self.project_root.to_string_lossy())?;
130        state.end()
131    }
132}
133
134#[derive(Debug, Clone, Serialize, Deserialize)]
135pub struct RunOutput {
136    pub exit_code: i32,
137    pub stdout: String,
138    pub stderr: String,
139    pub duration: Duration,
140}
141
142fn exit_code_from_status(status: ExitStatus) -> i32 {
143    status.code().unwrap_or_else(|| {
144        // On Unix, process may be terminated by a signal.
145        #[cfg(unix)]
146        {
147            use std::os::unix::process::ExitStatusExt;
148            status.signal().map_or(1, |s| 128 + s)
149        }
150        #[cfg(not(unix))]
151        {
152            1
153        }
154    })
155}
156
157impl Run<PathBuf> {
158    pub fn exec(&self) -> std::io::Result<RunOutput> {
159        if self.command.is_empty() {
160            return Err(io::Error::new(io::ErrorKind::InvalidInput, "empty command"));
161        }
162        let program = &self.command[0];
163        let args: Vec<&str> = self.command[1..].iter().map(String::as_str).collect();
164
165        debug!("Executing command: {} with {} args", program, args.len());
166
167        let mut env_vars = HashMap::new();
168        env_vars.insert("CAPSULA_RUN_ID", self.id.to_string());
169        env_vars.insert("CAPSULA_RUN_NAME", self.name.clone());
170        env_vars.insert(
171            "CAPSULA_RUN_DIRECTORY",
172            self.run_dir.to_string_lossy().to_string(),
173        );
174        env_vars.insert("CAPSULA_RUN_TIMESTAMP", self.timestamp().to_rfc3339());
175        let command_display = shlex::try_join(self.command.iter().map(String::as_str))
176            .unwrap_or_else(|_| self.command.join(" "));
177        env_vars.insert("CAPSULA_RUN_COMMAND", command_display);
178
179        let pre_run_output_json_path = self.run_dir.join("_capsula").join("pre-run.json");
180        env_vars.insert(
181            "CAPSULA_PRE_RUN_OUTPUT_PATH",
182            pre_run_output_json_path.to_string_lossy().to_string(),
183        );
184        env_vars.insert(
185            "CAPSULA_PROJECT_ROOT",
186            self.project_root.to_string_lossy().to_string(),
187        );
188
189        debug!(
190            "Setting {} environment variables for command execution",
191            env_vars.len()
192        );
193
194        let start = Instant::now();
195
196        debug!("Spawning child process");
197        let mut child = Command::new(program)
198            .args(&args)
199            .envs(&env_vars)
200            .stdout(Stdio::piped())
201            .stderr(Stdio::piped())
202            .spawn()?;
203
204        let mut child_stdout = child
205            .stdout
206            .take()
207            .ok_or_else(|| io::Error::other("Failed to capture stdout"))?;
208        let mut child_stderr = child
209            .stderr
210            .take()
211            .ok_or_else(|| io::Error::other("Failed to capture stderr"))?;
212
213        let t_out = thread::spawn(move || -> io::Result<Vec<u8>> {
214            let mut cap = Vec::with_capacity(8 * 1024);
215            let mut buf = [0u8; 8192];
216            let mut console = io::stdout().lock();
217
218            loop {
219                let n = child_stdout.read(&mut buf)?;
220                if n == 0 {
221                    break;
222                }
223                console.write_all(&buf[..n])?;
224                cap.extend_from_slice(&buf[..n]);
225            }
226            console.flush()?;
227            Ok(cap)
228        });
229
230        let t_err = thread::spawn(move || -> io::Result<Vec<u8>> {
231            let mut cap = Vec::with_capacity(8 * 1024);
232            let mut buf = [0u8; 8192];
233            let mut console = io::stderr().lock();
234
235            loop {
236                let n = child_stderr.read(&mut buf)?;
237                if n == 0 {
238                    break;
239                }
240                console.write_all(&buf[..n])?;
241                cap.extend_from_slice(&buf[..n]);
242            }
243            console.flush()?;
244            Ok(cap)
245        });
246
247        debug!("Waiting for child process to complete");
248        let status = child.wait()?;
249        let duration = start.elapsed();
250        debug!("Child process completed in {:?}", duration);
251
252        let cap_out = t_out
253            .join()
254            .map_err(|_| io::Error::other("stdout capture thread panicked"))??;
255        let cap_err = t_err
256            .join()
257            .map_err(|_| io::Error::other("stderr capture thread panicked"))??;
258
259        let exit_code = exit_code_from_status(status);
260        debug!(
261            "Command finished with exit code {}, captured {} bytes stdout, {} bytes stderr",
262            exit_code,
263            cap_out.len(),
264            cap_err.len()
265        );
266
267        Ok(RunOutput {
268            exit_code,
269            stdout: String::from_utf8_lossy(&cap_out).to_string(),
270            stderr: String::from_utf8_lossy(&cap_err).to_string(),
271            duration,
272        })
273    }
274}
275
276fn setup_vault(path: impl AsRef<std::path::Path>) -> io::Result<()> {
277    let path = path.as_ref();
278    if path.exists() {
279        debug!("Vault directory already exists: {}", path.display());
280        return Ok(());
281    }
282
283    debug!("Creating vault directory: {}", path.display());
284    std::fs::create_dir_all(path)?;
285
286    // Place a .gitignore file to ignore all contents
287    let gitignore_path = path.join(".gitignore");
288    debug!("Creating .gitignore in vault directory");
289    std::fs::write(
290        gitignore_path,
291        "\
292# Automatically generated by Capsula
293*",
294    )?;
295
296    Ok(())
297}