1use std::collections::HashMap;
11#[cfg(unix)]
12use std::os::unix::process::ExitStatusExt;
13use std::path::Path;
14use std::time::Duration;
15
16use async_trait::async_trait;
17use nix::unistd::{access, AccessFlags};
18use tokio::process::Command;
19
20use crate::errors::{ErrorCategory, ErrorInfo, IoError};
21use crate::ids::ErrorCode;
22use crate::io::IoCall;
23use crate::live_io::{LiveIoEnv, LiveIoTransport, LiveIoTransportFactory};
24use crate::process_exec::{run_command, ProcessRunError, StreamLimit};
25
26pub const NAMESPACE_EXEC: &str = "exec";
28
29const CODE_EXEC_REQUEST_INVALID: &str = "exec_request_invalid";
30const CODE_EXEC_PROGRAM_NOT_ALLOWED: &str = "exec_program_not_allowed";
31const CODE_EXEC_PROGRAM_MISSING: &str = "exec_program_missing";
32const CODE_EXEC_PROGRAM_NOT_EXECUTABLE: &str = "exec_program_not_executable";
33const CODE_EXEC_SPAWN_FAILED: &str = "exec_spawn_failed";
34const CODE_EXEC_STDIN_WRITE_FAILED: &str = "exec_stdin_write_failed";
35const CODE_EXEC_STDIN_TOO_LARGE: &str = "exec_stdin_too_large";
36const CODE_EXEC_TIMEOUT: &str = "exec_timeout";
37const CODE_EXEC_FAILED: &str = "exec_failed";
38const CODE_EXEC_STDOUT_INVALID_JSON: &str = "exec_stdout_invalid_json";
39const CODE_EXEC_STDOUT_TOO_LARGE: &str = "exec_stdout_too_large";
40const CODE_EXEC_STDERR_TOO_LARGE: &str = "exec_stderr_too_large";
41
42const MAX_EXEC_STDIN_BYTES: usize = 1024 * 1024;
43const MAX_EXEC_STDOUT_BYTES: usize = 1024 * 1024;
44const MAX_EXEC_STDERR_BYTES: usize = 1024 * 1024;
45
46fn info(code: &'static str, category: ErrorCategory, message: &'static str) -> ErrorInfo {
47 ErrorInfo {
48 code: ErrorCode(code.to_string()),
49 category,
50 retryable: false,
51 message: message.to_string(),
52 details: None,
53 }
54}
55
56fn info_with_details(
57 code: &'static str,
58 category: ErrorCategory,
59 message: &'static str,
60 details: serde_json::Value,
61) -> ErrorInfo {
62 ErrorInfo {
63 code: ErrorCode(code.to_string()),
64 category,
65 retryable: false,
66 message: message.to_string(),
67 details: Some(details),
68 }
69}
70
71#[derive(Clone, Debug, PartialEq, Eq)]
73pub struct ExecPolicy {
74 pub allow_prefixes: Vec<String>,
76}
77
78impl Default for ExecPolicy {
79 fn default() -> Self {
80 Self {
81 allow_prefixes: vec!["/nix/store/".to_string()],
83 }
84 }
85}
86
87#[derive(Clone, Default)]
89pub struct ExecProgramTransportFactory {
90 policy: ExecPolicy,
91}
92
93impl ExecProgramTransportFactory {
94 pub fn new(policy: ExecPolicy) -> Self {
96 Self { policy }
97 }
98}
99
100impl LiveIoTransportFactory for ExecProgramTransportFactory {
101 fn namespace_group(&self) -> &str {
102 NAMESPACE_EXEC
103 }
104
105 fn make(&self, _env: LiveIoEnv) -> Box<dyn LiveIoTransport> {
106 Box::new(ExecProgramTransport {
107 policy: self.policy.clone(),
108 })
109 }
110}
111
112struct ExecProgramTransport {
113 policy: ExecPolicy,
114}
115
116#[derive(Clone, Debug, PartialEq, Eq)]
117struct ExecRequestV1 {
118 program_path: String,
119 argv: Vec<String>,
120 stdin_json: serde_json::Value,
121 timeout_ms: u64,
122 env: HashMap<String, String>,
123}
124
125fn parse_request(call: &IoCall) -> Result<ExecRequestV1, IoError> {
126 let obj = call.request.as_object().ok_or_else(|| {
127 IoError::Other(info(
128 CODE_EXEC_REQUEST_INVALID,
129 ErrorCategory::ParsingInput,
130 "exec request must be a JSON object",
131 ))
132 })?;
133
134 let kind = obj.get("kind").and_then(|v| v.as_str()).ok_or_else(|| {
135 IoError::Other(info(
136 CODE_EXEC_REQUEST_INVALID,
137 ErrorCategory::ParsingInput,
138 "missing exec request kind",
139 ))
140 })?;
141
142 if kind != "run_program_v1" {
143 return Err(IoError::Other(info(
144 CODE_EXEC_REQUEST_INVALID,
145 ErrorCategory::ParsingInput,
146 "unsupported exec request kind",
147 )));
148 }
149
150 let program_path = obj
151 .get("program_path")
152 .and_then(|v| v.as_str())
153 .ok_or_else(|| {
154 IoError::Other(info(
155 CODE_EXEC_REQUEST_INVALID,
156 ErrorCategory::ParsingInput,
157 "missing program_path",
158 ))
159 })?
160 .to_string();
161
162 let argv = obj
163 .get("argv")
164 .and_then(|v| v.as_array())
165 .map(|a| {
166 a.iter()
167 .filter_map(|v| v.as_str().map(|s| s.to_string()))
168 .collect::<Vec<_>>()
169 })
170 .unwrap_or_default();
171
172 let stdin_json = obj
173 .get("stdin_json")
174 .cloned()
175 .unwrap_or(serde_json::Value::Null);
176
177 let timeout_ms = obj
178 .get("timeout_ms")
179 .and_then(|v| v.as_u64())
180 .unwrap_or(300_000);
181
182 let env = obj
183 .get("env")
184 .and_then(|v| v.as_object())
185 .map(|m| {
186 m.iter()
187 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
188 .collect::<HashMap<_, _>>()
189 })
190 .unwrap_or_default();
191
192 Ok(ExecRequestV1 {
193 program_path,
194 argv,
195 stdin_json,
196 timeout_ms,
197 env,
198 })
199}
200
201fn program_allowed(policy: &ExecPolicy, program_path: &Path) -> bool {
202 policy
203 .allow_prefixes
204 .iter()
205 .any(|prefix| program_path.starts_with(Path::new(prefix)))
206}
207
208fn ensure_program_accessible(path: &Path) -> Result<(), IoError> {
209 access(path, AccessFlags::F_OK).map_err(|_| {
210 IoError::Other(info(
211 CODE_EXEC_PROGRAM_MISSING,
212 ErrorCategory::Unknown,
213 "program_path does not exist",
214 ))
215 })?;
216
217 access(path, AccessFlags::X_OK).map_err(|_| {
218 IoError::Other(info(
219 CODE_EXEC_PROGRAM_NOT_EXECUTABLE,
220 ErrorCategory::Unknown,
221 "program_path is not executable",
222 ))
223 })?;
224
225 Ok(())
226}
227
228fn resolve_program_path(policy: &ExecPolicy, requested_path: &str) -> Result<String, IoError> {
229 let requested = Path::new(requested_path);
230 ensure_program_accessible(requested)?;
231
232 let canonical = std::fs::canonicalize(requested).map_err(|_| {
233 IoError::Other(info(
234 CODE_EXEC_PROGRAM_MISSING,
235 ErrorCategory::Unknown,
236 "program_path does not exist",
237 ))
238 })?;
239
240 if !program_allowed(policy, &canonical) {
241 return Err(IoError::Other(info(
242 CODE_EXEC_PROGRAM_NOT_ALLOWED,
243 ErrorCategory::Unknown,
244 "program_path is not allowed by policy",
245 )));
246 }
247
248 Ok(canonical.to_string_lossy().to_string())
249}
250
251fn failure_details(program_path: &str, status: &std::process::ExitStatus) -> serde_json::Value {
252 let mut details = serde_json::Map::new();
253 details.insert(
254 "program_path".to_string(),
255 serde_json::Value::String(program_path.to_string()),
256 );
257 details.insert(
258 "exit_code".to_string(),
259 status
260 .code()
261 .map(serde_json::Value::from)
262 .unwrap_or(serde_json::Value::Null),
263 );
264 let signal_value = {
265 #[cfg(unix)]
266 {
267 status
268 .signal()
269 .map(serde_json::Value::from)
270 .unwrap_or(serde_json::Value::Null)
271 }
272 #[cfg(not(unix))]
273 {
274 serde_json::Value::Null
275 }
276 };
277 details.insert("signal".to_string(), signal_value);
278 serde_json::Value::Object(details)
279}
280
281#[async_trait]
282impl LiveIoTransport for ExecProgramTransport {
283 async fn call(&mut self, call: IoCall) -> Result<serde_json::Value, IoError> {
284 let req = parse_request(&call)?;
285 let program_path = resolve_program_path(&self.policy, &req.program_path)?;
286
287 let stdin_bytes = serde_json::to_vec(&req.stdin_json).map_err(|_| {
288 IoError::Other(info(
289 CODE_EXEC_REQUEST_INVALID,
290 ErrorCategory::ParsingInput,
291 "stdin_json must be valid JSON",
292 ))
293 })?;
294
295 if stdin_bytes.len() > MAX_EXEC_STDIN_BYTES {
296 return Err(IoError::Transport(info_with_details(
297 CODE_EXEC_STDIN_TOO_LARGE,
298 ErrorCategory::ParsingInput,
299 "stdin_json exceeded maximum size",
300 serde_json::json!({
301 "program_path": program_path.clone(),
302 "max_stdin_bytes": MAX_EXEC_STDIN_BYTES,
303 "stdin_bytes": stdin_bytes.len(),
304 }),
305 )));
306 }
307
308 let mut cmd = Command::new(&program_path);
309 cmd.args(&req.argv);
310
311 for (k, v) in &req.env {
312 cmd.env(k, v);
313 }
314
315 let result = run_command(
316 cmd,
317 Some(stdin_bytes),
318 Duration::from_millis(req.timeout_ms),
319 StreamLimit {
320 max_stdout_bytes: MAX_EXEC_STDOUT_BYTES,
321 max_stderr_bytes: MAX_EXEC_STDERR_BYTES,
322 },
323 )
324 .await
325 .map_err(|err| match err {
326 ProcessRunError::SpawnFailed => IoError::Transport(info_with_details(
327 CODE_EXEC_SPAWN_FAILED,
328 ErrorCategory::Unknown,
329 "failed to spawn program",
330 serde_json::json!({
331 "program_path": program_path.clone(),
332 }),
333 )),
334 ProcessRunError::Timeout => IoError::Transport(info_with_details(
335 CODE_EXEC_TIMEOUT,
336 ErrorCategory::Unknown,
337 "program execution timed out",
338 serde_json::json!({
339 "program_path": program_path.clone(),
340 "timeout_ms": req.timeout_ms,
341 }),
342 )),
343 ProcessRunError::WaitFailed
344 | ProcessRunError::StdoutReadFailed
345 | ProcessRunError::StderrReadFailed => IoError::Transport(info_with_details(
346 CODE_EXEC_FAILED,
347 ErrorCategory::Unknown,
348 "program execution failed",
349 serde_json::json!({
350 "program_path": program_path.clone(),
351 }),
352 )),
353 })?;
354
355 if result.stdout.overflowed {
356 return Err(IoError::Transport(info_with_details(
357 CODE_EXEC_STDOUT_TOO_LARGE,
358 ErrorCategory::Unknown,
359 "program stdout exceeded maximum size",
360 serde_json::json!({
361 "program_path": program_path.clone(),
362 "max_stdout_bytes": MAX_EXEC_STDOUT_BYTES,
363 "stdout_bytes": result.stdout.total_bytes,
364 }),
365 )));
366 }
367 if result.stderr.overflowed {
368 return Err(IoError::Transport(info_with_details(
369 CODE_EXEC_STDERR_TOO_LARGE,
370 ErrorCategory::Unknown,
371 "program stderr exceeded maximum size",
372 serde_json::json!({
373 "program_path": program_path.clone(),
374 "max_stderr_bytes": MAX_EXEC_STDERR_BYTES,
375 "stderr_bytes": result.stderr.total_bytes,
376 }),
377 )));
378 }
379
380 if let Some(stdin_err) = result.stdin_write_error {
381 if stdin_err.kind == std::io::ErrorKind::BrokenPipe && !result.status.success() {
382 return Err(IoError::Transport(info_with_details(
383 CODE_EXEC_FAILED,
384 ErrorCategory::Unknown,
385 "program exited with non-zero status",
386 failure_details(&program_path, &result.status),
387 )));
388 }
389 if stdin_err.kind != std::io::ErrorKind::BrokenPipe {
390 return Err(IoError::Transport(info_with_details(
391 CODE_EXEC_STDIN_WRITE_FAILED,
392 ErrorCategory::Unknown,
393 "failed to write program stdin",
394 serde_json::json!({
395 "program_path": program_path.clone(),
396 "io_error_kind": format!("{:?}", stdin_err.kind),
397 }),
398 )));
399 }
400 }
401
402 if let Some(stdin_err) = result.stdin_close_error {
403 if stdin_err.kind != std::io::ErrorKind::BrokenPipe {
404 return Err(IoError::Transport(info_with_details(
405 CODE_EXEC_STDIN_WRITE_FAILED,
406 ErrorCategory::Unknown,
407 "failed to close program stdin",
408 serde_json::json!({
409 "program_path": program_path.clone(),
410 "io_error_kind": format!("{:?}", stdin_err.kind),
411 }),
412 )));
413 }
414 }
415
416 if !result.status.success() {
417 return Err(IoError::Transport(info_with_details(
418 CODE_EXEC_FAILED,
419 ErrorCategory::Unknown,
420 "program exited with non-zero status",
421 failure_details(&program_path, &result.status),
422 )));
423 }
424
425 serde_json::from_slice::<serde_json::Value>(&result.stdout.bytes).map_err(|_| {
426 IoError::Other(info(
427 CODE_EXEC_STDOUT_INVALID_JSON,
428 ErrorCategory::ParsingInput,
429 "program stdout was not valid JSON",
430 ))
431 })
432 }
433}
434
435#[cfg(test)]
436#[path = "tests/exec_transport_tests.rs"]
437mod exec_transport_tests;