1use chrono::{DateTime, Utc};
2use serde::ser::SerializeStruct;
3use serde::{Deserialize, Serialize, Serializer};
4use shlex::try_join;
5use std::collections::HashMap;
6use std::io::{self, Read, Write};
7use std::path::{Path, PathBuf};
8use std::process::{Command, ExitStatus, Stdio};
9use std::thread;
10use std::time::{Duration, Instant};
11use ulid::Ulid;
12
13#[derive(Debug, Clone, Deserialize)]
14pub struct Run<Dir = PathBuf> {
15 pub id: Ulid,
16 pub name: String,
17 pub command: Vec<String>,
18 pub run_dir: Dir,
19}
20
21pub type UnpreparedRun = Run<()>;
22pub type PreparedRun = Run<PathBuf>;
23
24impl<Dir> Run<Dir> {
25 pub fn timestamp(&self) -> DateTime<Utc> {
26 let dt: DateTime<Utc> = self.id.datetime().into();
28 dt
29 }
30
31 pub fn run_dir(&self, vault_dir: impl AsRef<Path>) -> PathBuf {
32 let timestamp = self.timestamp();
33 let date_str = timestamp.format("%Y-%m-%d").to_string();
34 let time_str = timestamp.format("%H%M%S").to_string();
35
36 let run_dir_name = format!("{}-{}--{}", time_str, self.name, self.id.to_string());
44 vault_dir.as_ref().join(date_str).join(&run_dir_name)
45 }
46}
47
48impl Run<()> {
49 pub fn setup_run_dir(
50 &self,
51 vault_dir: impl AsRef<std::path::Path>,
52 ) -> io::Result<Run<PathBuf>> {
53 setup_vault(&vault_dir)?;
54 let run_dir = self.run_dir(&vault_dir);
55 std::fs::create_dir_all(&run_dir)?;
56 Ok(Run {
57 id: self.id,
58 name: self.name.clone(),
59 command: self.command.clone(),
60 run_dir,
61 })
62 }
63}
64
65impl Serialize for Run<()> {
66 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
67 where
68 S: Serializer,
69 {
70 let mut state = serializer.serialize_struct("Run", 4)?;
71 state.serialize_field("id", &self.id)?;
72 state.serialize_field("name", &self.name)?;
73 state.serialize_field("command", &self.command)?;
74 state.serialize_field("timestamp", &self.timestamp().to_rfc3339())?;
75 state.end()
76 }
77}
78
79impl Serialize for Run<PathBuf> {
80 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
81 where
82 S: Serializer,
83 {
84 let mut state = serializer.serialize_struct("Run", 5)?;
85 state.serialize_field("id", &self.id)?;
86 state.serialize_field("name", &self.name)?;
87 state.serialize_field("command", &self.command)?;
88 state.serialize_field("timestamp", &self.timestamp().to_rfc3339())?;
89 state.serialize_field("run_dir", &self.run_dir.to_string_lossy())?;
90 state.end()
91 }
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct RunOutput {
96 pub exit_code: i32,
97 pub stdout: String,
98 pub stderr: String,
99 pub duration: Duration,
100}
101
102fn exit_code_from_status(status: ExitStatus) -> i32 {
103 match status.code() {
104 Some(c) => c,
105 None => {
106 #[cfg(unix)]
108 {
109 use std::os::unix::process::ExitStatusExt;
110 status.signal().map(|s| 128 + s).unwrap_or(1)
111 }
112 #[cfg(not(unix))]
113 {
114 1
115 }
116 }
117 }
118}
119
120impl Run<PathBuf> {
121 pub fn exec(&self) -> std::io::Result<RunOutput> {
122 if self.command.is_empty() {
123 return Err(io::Error::new(io::ErrorKind::InvalidInput, "empty command"));
124 }
125 let program = &self.command[0];
126 let args: Vec<&str> = self.command[1..].iter().map(|s| s.as_str()).collect();
127
128 let mut env_vars = HashMap::new();
129 env_vars.insert("CAPSULA_RUN_ID", self.id.to_string());
130 env_vars.insert("CAPSULA_RUN_NAME", self.name.clone());
131 env_vars.insert(
132 "CAPSULA_RUN_DIRECTORY",
133 self.run_dir.to_string_lossy().to_string(),
134 );
135 env_vars.insert("CAPSULA_RUN_TIMESTAMP", self.timestamp().to_rfc3339());
136 if let Ok(cmd_str) = try_join(self.command.iter().map(|s| s.as_str())) {
137 env_vars.insert("CAPSULA_RUN_COMMAND", cmd_str);
138 }
139
140 let start = Instant::now();
141
142 let mut child = Command::new(program)
143 .args(&args)
144 .envs(&env_vars)
145 .stdout(Stdio::piped())
146 .stderr(Stdio::piped())
147 .spawn()?;
148
149 let mut child_stdout = child.stdout.take().expect("piped stdout");
150 let mut child_stderr = child.stderr.take().expect("piped stderr");
151
152 let t_out = thread::spawn(move || -> io::Result<Vec<u8>> {
153 let mut cap = Vec::with_capacity(8 * 1024);
154 let mut buf = [0u8; 8192];
155 let mut console = io::stdout().lock();
156
157 loop {
158 let n = child_stdout.read(&mut buf)?;
159 if n == 0 {
160 break;
161 }
162 console.write_all(&buf[..n])?;
163 cap.extend_from_slice(&buf[..n]);
164 }
165 console.flush()?;
166 Ok(cap)
167 });
168
169 let t_err = thread::spawn(move || -> io::Result<Vec<u8>> {
170 let mut cap = Vec::with_capacity(8 * 1024);
171 let mut buf = [0u8; 8192];
172 let mut console = io::stderr().lock();
173
174 loop {
175 let n = child_stderr.read(&mut buf)?;
176 if n == 0 {
177 break;
178 }
179 console.write_all(&buf[..n])?;
180 cap.extend_from_slice(&buf[..n]);
181 }
182 console.flush()?;
183 Ok(cap)
184 });
185
186 let status = child.wait()?;
187 let duration = start.elapsed();
188 let cap_out = t_out.join().unwrap()?;
189 let cap_err = t_err.join().unwrap()?;
190
191 let exit_code = exit_code_from_status(status);
192
193 Ok(RunOutput {
194 exit_code,
195 stdout: String::from_utf8_lossy(&cap_out).to_string(),
196 stderr: String::from_utf8_lossy(&cap_err).to_string(),
197 duration,
198 })
199 }
200}
201
202fn setup_vault(path: impl AsRef<std::path::Path>) -> io::Result<()> {
203 let path = path.as_ref();
204 if path.exists() {
205 return Ok(());
206 }
207 std::fs::create_dir_all(path)?;
208
209 let gitignore_path = path.join(".gitignore");
211 std::fs::write(
212 gitignore_path,
213 "\
214# Automatically generated by Capsula
215*",
216 )?;
217
218 Ok(())
219}