1use chrono::{DateTime, Utc};
2use serde::ser::SerializeStruct;
3use serde::{Deserialize, Serialize, Serializer};
4use std::collections::HashMap;
5use std::io::{self, Read, Write};
6use std::path::{Path, PathBuf};
7use std::process::{Command, ExitStatus, Stdio};
8use std::thread;
9use std::time::{Duration, Instant};
10use tracing::debug;
11use ulid::Ulid;
12
13#[derive(Debug, Clone, Deserialize)]
14pub struct Run<Dir = PathBuf> {
15 pub id: Ulid,
16 pub name: String,
17 pub command: Vec<String>,
18 pub run_dir: Dir,
19 pub project_root: PathBuf,
20}
21
22pub type UnpreparedRun = Run<()>;
23pub type PreparedRun = Run<PathBuf>;
24
25impl<Dir> Run<Dir> {
26 pub fn timestamp(&self) -> DateTime<Utc> {
27 let dt: DateTime<Utc> = self.id.datetime().into();
29 dt
30 }
31
32 pub fn gen_run_dir(&self, vault_dir: impl AsRef<Path>) -> PathBuf {
33 let timestamp = self.timestamp();
34 let date_str = timestamp.format("%Y-%m-%d").to_string();
35 let time_str = timestamp.format("%H%M%S").to_string();
36
37 let run_dir_name = format!("{}-{}", time_str, self.name);
45 vault_dir.as_ref().join(date_str).join(&run_dir_name)
46 }
47}
48
49impl Run<()> {
50 pub fn setup_run_dir(
51 &self,
52 vault_dir: impl AsRef<std::path::Path>,
53 max_retries: usize,
54 ) -> io::Result<Run<PathBuf>> {
55 debug!(
56 "Setting up vault directory: {}",
57 vault_dir.as_ref().display()
58 );
59 setup_vault(&vault_dir)?;
60
61 let run_dir = {
63 let mut attempt = 0;
64 loop {
65 let candidate = self.gen_run_dir(&vault_dir);
66 debug!("Candidate run directory: {}", candidate.display());
67 if candidate.exists() {
68 debug!(
69 "Directory already exists, retrying (attempt {})",
70 attempt + 1
71 );
72 thread::sleep(Duration::from_millis(10 * (attempt as u64 + 1)));
74 attempt += 1;
75 if attempt >= max_retries {
76 return Err(io::Error::new(
77 io::ErrorKind::AlreadyExists,
78 format!(
79 "Failed to create unique run directory after {max_retries} attempts",
80 ),
81 ));
82 }
83 } else {
84 break candidate;
85 }
86 }
87 };
88
89 debug!("Creating run directory: {}", run_dir.display());
90 std::fs::create_dir_all(&run_dir)?;
91 debug!("Run directory created successfully");
92
93 Ok(Run {
94 id: self.id,
95 name: self.name.clone(),
96 command: self.command.clone(),
97 run_dir,
98 project_root: self.project_root.clone(),
99 })
100 }
101}
102
103impl Serialize for Run<()> {
104 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
105 where
106 S: Serializer,
107 {
108 let mut state = serializer.serialize_struct("Run", 5)?;
109 state.serialize_field("id", &self.id)?;
110 state.serialize_field("name", &self.name)?;
111 state.serialize_field("command", &self.command)?;
112 state.serialize_field("timestamp", &self.timestamp().to_rfc3339())?;
113 state.serialize_field("project_root", &self.project_root.to_string_lossy())?;
114 state.end()
115 }
116}
117
118impl Serialize for Run<PathBuf> {
119 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
120 where
121 S: Serializer,
122 {
123 let mut state = serializer.serialize_struct("Run", 6)?;
124 state.serialize_field("id", &self.id)?;
125 state.serialize_field("name", &self.name)?;
126 state.serialize_field("command", &self.command)?;
127 state.serialize_field("timestamp", &self.timestamp().to_rfc3339())?;
128 state.serialize_field("run_dir", &self.run_dir.to_string_lossy())?;
129 state.serialize_field("project_root", &self.project_root.to_string_lossy())?;
130 state.end()
131 }
132}
133
134#[derive(Debug, Clone, Serialize, Deserialize)]
135pub struct RunOutput {
136 pub exit_code: i32,
137 pub stdout: String,
138 pub stderr: String,
139 pub duration: Duration,
140}
141
142fn exit_code_from_status(status: ExitStatus) -> i32 {
143 status.code().unwrap_or_else(|| {
144 #[cfg(unix)]
146 {
147 use std::os::unix::process::ExitStatusExt;
148 status.signal().map_or(1, |s| 128 + s)
149 }
150 #[cfg(not(unix))]
151 {
152 1
153 }
154 })
155}
156
157impl Run<PathBuf> {
158 pub fn exec(&self) -> std::io::Result<RunOutput> {
159 if self.command.is_empty() {
160 return Err(io::Error::new(io::ErrorKind::InvalidInput, "empty command"));
161 }
162 let program = &self.command[0];
163 let args: Vec<&str> = self.command[1..].iter().map(String::as_str).collect();
164
165 debug!("Executing command: {} with {} args", program, args.len());
166
167 let mut env_vars = HashMap::new();
168 env_vars.insert("CAPSULA_RUN_ID", self.id.to_string());
169 env_vars.insert("CAPSULA_RUN_NAME", self.name.clone());
170 env_vars.insert(
171 "CAPSULA_RUN_DIRECTORY",
172 self.run_dir.to_string_lossy().to_string(),
173 );
174 env_vars.insert("CAPSULA_RUN_TIMESTAMP", self.timestamp().to_rfc3339());
175 let command_display = shlex::try_join(self.command.iter().map(String::as_str))
176 .unwrap_or_else(|_| self.command.join(" "));
177 env_vars.insert("CAPSULA_RUN_COMMAND", command_display);
178
179 let pre_run_output_json_path = self.run_dir.join("_capsula").join("pre-run.json");
180 env_vars.insert(
181 "CAPSULA_PRE_RUN_OUTPUT_PATH",
182 pre_run_output_json_path.to_string_lossy().to_string(),
183 );
184 env_vars.insert(
185 "CAPSULA_PROJECT_ROOT",
186 self.project_root.to_string_lossy().to_string(),
187 );
188
189 debug!(
190 "Setting {} environment variables for command execution",
191 env_vars.len()
192 );
193
194 let start = Instant::now();
195
196 debug!("Spawning child process");
197 let mut child = Command::new(program)
198 .args(&args)
199 .envs(&env_vars)
200 .stdout(Stdio::piped())
201 .stderr(Stdio::piped())
202 .spawn()?;
203
204 let mut child_stdout = child
205 .stdout
206 .take()
207 .ok_or_else(|| io::Error::other("Failed to capture stdout"))?;
208 let mut child_stderr = child
209 .stderr
210 .take()
211 .ok_or_else(|| io::Error::other("Failed to capture stderr"))?;
212
213 let t_out = thread::spawn(move || -> io::Result<Vec<u8>> {
214 let mut cap = Vec::with_capacity(8 * 1024);
215 let mut buf = [0u8; 8192];
216 let mut console = io::stdout().lock();
217
218 loop {
219 let n = child_stdout.read(&mut buf)?;
220 if n == 0 {
221 break;
222 }
223 console.write_all(&buf[..n])?;
224 cap.extend_from_slice(&buf[..n]);
225 }
226 console.flush()?;
227 Ok(cap)
228 });
229
230 let t_err = thread::spawn(move || -> io::Result<Vec<u8>> {
231 let mut cap = Vec::with_capacity(8 * 1024);
232 let mut buf = [0u8; 8192];
233 let mut console = io::stderr().lock();
234
235 loop {
236 let n = child_stderr.read(&mut buf)?;
237 if n == 0 {
238 break;
239 }
240 console.write_all(&buf[..n])?;
241 cap.extend_from_slice(&buf[..n]);
242 }
243 console.flush()?;
244 Ok(cap)
245 });
246
247 debug!("Waiting for child process to complete");
248 let status = child.wait()?;
249 let duration = start.elapsed();
250 debug!("Child process completed in {:?}", duration);
251
252 let cap_out = t_out
253 .join()
254 .map_err(|_| io::Error::other("stdout capture thread panicked"))??;
255 let cap_err = t_err
256 .join()
257 .map_err(|_| io::Error::other("stderr capture thread panicked"))??;
258
259 let exit_code = exit_code_from_status(status);
260 debug!(
261 "Command finished with exit code {}, captured {} bytes stdout, {} bytes stderr",
262 exit_code,
263 cap_out.len(),
264 cap_err.len()
265 );
266
267 Ok(RunOutput {
268 exit_code,
269 stdout: String::from_utf8_lossy(&cap_out).to_string(),
270 stderr: String::from_utf8_lossy(&cap_err).to_string(),
271 duration,
272 })
273 }
274}
275
276fn setup_vault(path: impl AsRef<std::path::Path>) -> io::Result<()> {
277 let path = path.as_ref();
278 if path.exists() {
279 debug!("Vault directory already exists: {}", path.display());
280 return Ok(());
281 }
282
283 debug!("Creating vault directory: {}", path.display());
284 std::fs::create_dir_all(path)?;
285
286 let gitignore_path = path.join(".gitignore");
288 debug!("Creating .gitignore in vault directory");
289 std::fs::write(
290 gitignore_path,
291 "\
292# Automatically generated by Capsula
293*",
294 )?;
295
296 Ok(())
297}