capsula_core/
run.rs

1use chrono::{DateTime, Utc};
2use serde::ser::SerializeStruct;
3use serde::{Deserialize, Serialize, Serializer};
4use shlex::try_join;
5use std::collections::HashMap;
6use std::io::{self, Read, Write};
7use std::path::{Path, PathBuf};
8use std::process::{Command, ExitStatus, Stdio};
9use std::thread;
10use std::time::{Duration, Instant};
11use ulid::Ulid;
12
13#[derive(Debug, Clone, Deserialize)]
14pub struct Run<Dir = PathBuf> {
15    pub id: Ulid,
16    pub name: String,
17    pub command: Vec<String>,
18    pub run_dir: Dir,
19}
20
21pub type UnpreparedRun = Run<()>;
22pub type PreparedRun = Run<PathBuf>;
23
24impl<Dir> Run<Dir> {
25    pub fn timestamp(&self) -> DateTime<Utc> {
26        // Calculate start time from ULID timestamp
27        let dt: DateTime<Utc> = self.id.datetime().into();
28        dt
29    }
30
31    pub fn run_dir(&self, vault_dir: impl AsRef<Path>) -> PathBuf {
32        let timestamp = self.timestamp();
33        let date_str = timestamp.format("%Y-%m-%d").to_string();
34        let time_str = timestamp.format("%H%M%S").to_string();
35
36        // Prefix the run directory with time because
37        // folders are sorted in natural order, not in lexicographical order,
38        // For example, on macOS Finder, the order is:
39        // 1. 01K5K478KNQ2ZXZG68MWM1Z9X6
40        // 2. 01K5K4571FGKBFTTRJCG1J3DCZ
41        // which is not the correct chronological order.
42        // By adding time prefix, it will be sorted correctly.
43        let run_dir_name = format!("{}-{}--{}", time_str, self.name, self.id.to_string());
44        vault_dir.as_ref().join(date_str).join(&run_dir_name)
45    }
46}
47
48impl Run<()> {
49    pub fn setup_run_dir(
50        &self,
51        vault_dir: impl AsRef<std::path::Path>,
52    ) -> io::Result<Run<PathBuf>> {
53        setup_vault(&vault_dir)?;
54        let run_dir = self.run_dir(&vault_dir);
55        std::fs::create_dir_all(&run_dir)?;
56        Ok(Run {
57            id: self.id,
58            name: self.name.clone(),
59            command: self.command.clone(),
60            run_dir,
61        })
62    }
63}
64
65impl Serialize for Run<()> {
66    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
67    where
68        S: Serializer,
69    {
70        let mut state = serializer.serialize_struct("Run", 4)?;
71        state.serialize_field("id", &self.id)?;
72        state.serialize_field("name", &self.name)?;
73        state.serialize_field("command", &self.command)?;
74        state.serialize_field("timestamp", &self.timestamp().to_rfc3339())?;
75        state.end()
76    }
77}
78
79impl Serialize for Run<PathBuf> {
80    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
81    where
82        S: Serializer,
83    {
84        let mut state = serializer.serialize_struct("Run", 5)?;
85        state.serialize_field("id", &self.id)?;
86        state.serialize_field("name", &self.name)?;
87        state.serialize_field("command", &self.command)?;
88        state.serialize_field("timestamp", &self.timestamp().to_rfc3339())?;
89        state.serialize_field("run_dir", &self.run_dir.to_string_lossy())?;
90        state.end()
91    }
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct RunOutput {
96    pub exit_code: i32,
97    pub stdout: String,
98    pub stderr: String,
99    pub duration: Duration,
100}
101
102fn exit_code_from_status(status: ExitStatus) -> i32 {
103    match status.code() {
104        Some(c) => c,
105        None => {
106            // On Unix, process may be terminated by a signal.
107            #[cfg(unix)]
108            {
109                use std::os::unix::process::ExitStatusExt;
110                status.signal().map(|s| 128 + s).unwrap_or(1)
111            }
112            #[cfg(not(unix))]
113            {
114                1
115            }
116        }
117    }
118}
119
120impl Run<PathBuf> {
121    pub fn exec(&self) -> std::io::Result<RunOutput> {
122        if self.command.is_empty() {
123            return Err(io::Error::new(io::ErrorKind::InvalidInput, "empty command"));
124        }
125        let program = &self.command[0];
126        let args: Vec<&str> = self.command[1..].iter().map(|s| s.as_str()).collect();
127
128        let mut env_vars = HashMap::new();
129        env_vars.insert("CAPSULA_RUN_ID", self.id.to_string());
130        env_vars.insert("CAPSULA_RUN_NAME", self.name.clone());
131        env_vars.insert(
132            "CAPSULA_RUN_DIRECTORY",
133            self.run_dir.to_string_lossy().to_string(),
134        );
135        env_vars.insert("CAPSULA_RUN_TIMESTAMP", self.timestamp().to_rfc3339());
136        if let Ok(cmd_str) = try_join(self.command.iter().map(|s| s.as_str())) {
137            env_vars.insert("CAPSULA_RUN_COMMAND", cmd_str);
138        }
139
140        let start = Instant::now();
141
142        let mut child = Command::new(program)
143            .args(&args)
144            .envs(&env_vars)
145            .stdout(Stdio::piped())
146            .stderr(Stdio::piped())
147            .spawn()?;
148
149        let mut child_stdout = child.stdout.take().expect("piped stdout");
150        let mut child_stderr = child.stderr.take().expect("piped stderr");
151
152        let t_out = thread::spawn(move || -> io::Result<Vec<u8>> {
153            let mut cap = Vec::with_capacity(8 * 1024);
154            let mut buf = [0u8; 8192];
155            let mut console = io::stdout().lock();
156
157            loop {
158                let n = child_stdout.read(&mut buf)?;
159                if n == 0 {
160                    break;
161                }
162                console.write_all(&buf[..n])?;
163                cap.extend_from_slice(&buf[..n]);
164            }
165            console.flush()?;
166            Ok(cap)
167        });
168
169        let t_err = thread::spawn(move || -> io::Result<Vec<u8>> {
170            let mut cap = Vec::with_capacity(8 * 1024);
171            let mut buf = [0u8; 8192];
172            let mut console = io::stderr().lock();
173
174            loop {
175                let n = child_stderr.read(&mut buf)?;
176                if n == 0 {
177                    break;
178                }
179                console.write_all(&buf[..n])?;
180                cap.extend_from_slice(&buf[..n]);
181            }
182            console.flush()?;
183            Ok(cap)
184        });
185
186        let status = child.wait()?;
187        let duration = start.elapsed();
188        let cap_out = t_out.join().unwrap()?;
189        let cap_err = t_err.join().unwrap()?;
190
191        let exit_code = exit_code_from_status(status);
192
193        Ok(RunOutput {
194            exit_code,
195            stdout: String::from_utf8_lossy(&cap_out).to_string(),
196            stderr: String::from_utf8_lossy(&cap_err).to_string(),
197            duration,
198        })
199    }
200}
201
202fn setup_vault(path: impl AsRef<std::path::Path>) -> io::Result<()> {
203    let path = path.as_ref();
204    if path.exists() {
205        return Ok(());
206    }
207    std::fs::create_dir_all(path)?;
208
209    // Place a .gitignore file to ignore all contents
210    let gitignore_path = path.join(".gitignore");
211    std::fs::write(
212        gitignore_path,
213        "\
214# Automatically generated by Capsula
215*",
216    )?;
217
218    Ok(())
219}