Skip to main content

mfm_machine/
exec_transport.rs

1//! Live IO transport for external program execution.
2//!
3//! This transport powers the `exec` namespace. It is NOT part of the stable API contract
4//! (Appendix C.1) and may change.
5//!
6//! Security notes:
7//! - Request payloads are not persisted by the runtime, but MUST still be treated as sensitive.
8//! - Errors MUST NOT echo stdout/stderr or request payloads (avoid accidental secret leakage).
9
10use std::collections::HashMap;
11#[cfg(unix)]
12use std::os::unix::process::ExitStatusExt;
13use std::path::Path;
14use std::time::Duration;
15
16use async_trait::async_trait;
17use nix::unistd::{access, AccessFlags};
18use tokio::process::Command;
19
20use crate::errors::{ErrorCategory, ErrorInfo, IoError};
21use crate::ids::ErrorCode;
22use crate::io::IoCall;
23use crate::live_io::{LiveIoEnv, LiveIoTransport, LiveIoTransportFactory};
24use crate::process_exec::{run_command, ProcessRunError, StreamLimit};
25
26/// Namespace group handled by the program-execution transport.
27pub const NAMESPACE_EXEC: &str = "exec";
28
29const CODE_EXEC_REQUEST_INVALID: &str = "exec_request_invalid";
30const CODE_EXEC_PROGRAM_NOT_ALLOWED: &str = "exec_program_not_allowed";
31const CODE_EXEC_PROGRAM_MISSING: &str = "exec_program_missing";
32const CODE_EXEC_PROGRAM_NOT_EXECUTABLE: &str = "exec_program_not_executable";
33const CODE_EXEC_SPAWN_FAILED: &str = "exec_spawn_failed";
34const CODE_EXEC_STDIN_WRITE_FAILED: &str = "exec_stdin_write_failed";
35const CODE_EXEC_STDIN_TOO_LARGE: &str = "exec_stdin_too_large";
36const CODE_EXEC_TIMEOUT: &str = "exec_timeout";
37const CODE_EXEC_FAILED: &str = "exec_failed";
38const CODE_EXEC_STDOUT_INVALID_JSON: &str = "exec_stdout_invalid_json";
39const CODE_EXEC_STDOUT_TOO_LARGE: &str = "exec_stdout_too_large";
40const CODE_EXEC_STDERR_TOO_LARGE: &str = "exec_stderr_too_large";
41
42const MAX_EXEC_STDIN_BYTES: usize = 1024 * 1024;
43const MAX_EXEC_STDOUT_BYTES: usize = 1024 * 1024;
44const MAX_EXEC_STDERR_BYTES: usize = 1024 * 1024;
45
46fn info(code: &'static str, category: ErrorCategory, message: &'static str) -> ErrorInfo {
47    ErrorInfo {
48        code: ErrorCode(code.to_string()),
49        category,
50        retryable: false,
51        message: message.to_string(),
52        details: None,
53    }
54}
55
56fn info_with_details(
57    code: &'static str,
58    category: ErrorCategory,
59    message: &'static str,
60    details: serde_json::Value,
61) -> ErrorInfo {
62    ErrorInfo {
63        code: ErrorCode(code.to_string()),
64        category,
65        retryable: false,
66        message: message.to_string(),
67        details: Some(details),
68    }
69}
70
71/// Policy used by the `exec` transport to constrain which programs may be executed.
72#[derive(Clone, Debug, PartialEq, Eq)]
73pub struct ExecPolicy {
74    /// Canonical path prefixes that executable targets must reside under.
75    pub allow_prefixes: Vec<String>,
76}
77
78impl Default for ExecPolicy {
79    fn default() -> Self {
80        Self {
81            // Conservative default to keep execution reproducible without needing a "realize" step.
82            allow_prefixes: vec!["/nix/store/".to_string()],
83        }
84    }
85}
86
87/// Factory that produces Live IO transports for the [`NAMESPACE_EXEC`] namespace group.
88#[derive(Clone, Default)]
89pub struct ExecProgramTransportFactory {
90    policy: ExecPolicy,
91}
92
93impl ExecProgramTransportFactory {
94    /// Creates a factory that enforces the provided execution policy for all `exec` requests.
95    pub fn new(policy: ExecPolicy) -> Self {
96        Self { policy }
97    }
98}
99
100impl LiveIoTransportFactory for ExecProgramTransportFactory {
101    fn namespace_group(&self) -> &str {
102        NAMESPACE_EXEC
103    }
104
105    fn make(&self, _env: LiveIoEnv) -> Box<dyn LiveIoTransport> {
106        Box::new(ExecProgramTransport {
107            policy: self.policy.clone(),
108        })
109    }
110}
111
112struct ExecProgramTransport {
113    policy: ExecPolicy,
114}
115
116#[derive(Clone, Debug, PartialEq, Eq)]
117struct ExecRequestV1 {
118    program_path: String,
119    argv: Vec<String>,
120    stdin_json: serde_json::Value,
121    timeout_ms: u64,
122    env: HashMap<String, String>,
123}
124
125fn parse_request(call: &IoCall) -> Result<ExecRequestV1, IoError> {
126    let obj = call.request.as_object().ok_or_else(|| {
127        IoError::Other(info(
128            CODE_EXEC_REQUEST_INVALID,
129            ErrorCategory::ParsingInput,
130            "exec request must be a JSON object",
131        ))
132    })?;
133
134    let kind = obj.get("kind").and_then(|v| v.as_str()).ok_or_else(|| {
135        IoError::Other(info(
136            CODE_EXEC_REQUEST_INVALID,
137            ErrorCategory::ParsingInput,
138            "missing exec request kind",
139        ))
140    })?;
141
142    if kind != "run_program_v1" {
143        return Err(IoError::Other(info(
144            CODE_EXEC_REQUEST_INVALID,
145            ErrorCategory::ParsingInput,
146            "unsupported exec request kind",
147        )));
148    }
149
150    let program_path = obj
151        .get("program_path")
152        .and_then(|v| v.as_str())
153        .ok_or_else(|| {
154            IoError::Other(info(
155                CODE_EXEC_REQUEST_INVALID,
156                ErrorCategory::ParsingInput,
157                "missing program_path",
158            ))
159        })?
160        .to_string();
161
162    let argv = obj
163        .get("argv")
164        .and_then(|v| v.as_array())
165        .map(|a| {
166            a.iter()
167                .filter_map(|v| v.as_str().map(|s| s.to_string()))
168                .collect::<Vec<_>>()
169        })
170        .unwrap_or_default();
171
172    let stdin_json = obj
173        .get("stdin_json")
174        .cloned()
175        .unwrap_or(serde_json::Value::Null);
176
177    let timeout_ms = obj
178        .get("timeout_ms")
179        .and_then(|v| v.as_u64())
180        .unwrap_or(300_000);
181
182    let env = obj
183        .get("env")
184        .and_then(|v| v.as_object())
185        .map(|m| {
186            m.iter()
187                .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
188                .collect::<HashMap<_, _>>()
189        })
190        .unwrap_or_default();
191
192    Ok(ExecRequestV1 {
193        program_path,
194        argv,
195        stdin_json,
196        timeout_ms,
197        env,
198    })
199}
200
201fn program_allowed(policy: &ExecPolicy, program_path: &Path) -> bool {
202    policy
203        .allow_prefixes
204        .iter()
205        .any(|prefix| program_path.starts_with(Path::new(prefix)))
206}
207
208fn ensure_program_accessible(path: &Path) -> Result<(), IoError> {
209    access(path, AccessFlags::F_OK).map_err(|_| {
210        IoError::Other(info(
211            CODE_EXEC_PROGRAM_MISSING,
212            ErrorCategory::Unknown,
213            "program_path does not exist",
214        ))
215    })?;
216
217    access(path, AccessFlags::X_OK).map_err(|_| {
218        IoError::Other(info(
219            CODE_EXEC_PROGRAM_NOT_EXECUTABLE,
220            ErrorCategory::Unknown,
221            "program_path is not executable",
222        ))
223    })?;
224
225    Ok(())
226}
227
228fn resolve_program_path(policy: &ExecPolicy, requested_path: &str) -> Result<String, IoError> {
229    let requested = Path::new(requested_path);
230    ensure_program_accessible(requested)?;
231
232    let canonical = std::fs::canonicalize(requested).map_err(|_| {
233        IoError::Other(info(
234            CODE_EXEC_PROGRAM_MISSING,
235            ErrorCategory::Unknown,
236            "program_path does not exist",
237        ))
238    })?;
239
240    if !program_allowed(policy, &canonical) {
241        return Err(IoError::Other(info(
242            CODE_EXEC_PROGRAM_NOT_ALLOWED,
243            ErrorCategory::Unknown,
244            "program_path is not allowed by policy",
245        )));
246    }
247
248    Ok(canonical.to_string_lossy().to_string())
249}
250
251fn failure_details(program_path: &str, status: &std::process::ExitStatus) -> serde_json::Value {
252    let mut details = serde_json::Map::new();
253    details.insert(
254        "program_path".to_string(),
255        serde_json::Value::String(program_path.to_string()),
256    );
257    details.insert(
258        "exit_code".to_string(),
259        status
260            .code()
261            .map(serde_json::Value::from)
262            .unwrap_or(serde_json::Value::Null),
263    );
264    let signal_value = {
265        #[cfg(unix)]
266        {
267            status
268                .signal()
269                .map(serde_json::Value::from)
270                .unwrap_or(serde_json::Value::Null)
271        }
272        #[cfg(not(unix))]
273        {
274            serde_json::Value::Null
275        }
276    };
277    details.insert("signal".to_string(), signal_value);
278    serde_json::Value::Object(details)
279}
280
281#[async_trait]
282impl LiveIoTransport for ExecProgramTransport {
283    async fn call(&mut self, call: IoCall) -> Result<serde_json::Value, IoError> {
284        let req = parse_request(&call)?;
285        let program_path = resolve_program_path(&self.policy, &req.program_path)?;
286
287        let stdin_bytes = serde_json::to_vec(&req.stdin_json).map_err(|_| {
288            IoError::Other(info(
289                CODE_EXEC_REQUEST_INVALID,
290                ErrorCategory::ParsingInput,
291                "stdin_json must be valid JSON",
292            ))
293        })?;
294
295        if stdin_bytes.len() > MAX_EXEC_STDIN_BYTES {
296            return Err(IoError::Transport(info_with_details(
297                CODE_EXEC_STDIN_TOO_LARGE,
298                ErrorCategory::ParsingInput,
299                "stdin_json exceeded maximum size",
300                serde_json::json!({
301                    "program_path": program_path.clone(),
302                    "max_stdin_bytes": MAX_EXEC_STDIN_BYTES,
303                    "stdin_bytes": stdin_bytes.len(),
304                }),
305            )));
306        }
307
308        let mut cmd = Command::new(&program_path);
309        cmd.args(&req.argv);
310
311        for (k, v) in &req.env {
312            cmd.env(k, v);
313        }
314
315        let result = run_command(
316            cmd,
317            Some(stdin_bytes),
318            Duration::from_millis(req.timeout_ms),
319            StreamLimit {
320                max_stdout_bytes: MAX_EXEC_STDOUT_BYTES,
321                max_stderr_bytes: MAX_EXEC_STDERR_BYTES,
322            },
323        )
324        .await
325        .map_err(|err| match err {
326            ProcessRunError::SpawnFailed => IoError::Transport(info_with_details(
327                CODE_EXEC_SPAWN_FAILED,
328                ErrorCategory::Unknown,
329                "failed to spawn program",
330                serde_json::json!({
331                    "program_path": program_path.clone(),
332                }),
333            )),
334            ProcessRunError::Timeout => IoError::Transport(info_with_details(
335                CODE_EXEC_TIMEOUT,
336                ErrorCategory::Unknown,
337                "program execution timed out",
338                serde_json::json!({
339                    "program_path": program_path.clone(),
340                    "timeout_ms": req.timeout_ms,
341                }),
342            )),
343            ProcessRunError::WaitFailed
344            | ProcessRunError::StdoutReadFailed
345            | ProcessRunError::StderrReadFailed => IoError::Transport(info_with_details(
346                CODE_EXEC_FAILED,
347                ErrorCategory::Unknown,
348                "program execution failed",
349                serde_json::json!({
350                    "program_path": program_path.clone(),
351                }),
352            )),
353        })?;
354
355        if result.stdout.overflowed {
356            return Err(IoError::Transport(info_with_details(
357                CODE_EXEC_STDOUT_TOO_LARGE,
358                ErrorCategory::Unknown,
359                "program stdout exceeded maximum size",
360                serde_json::json!({
361                    "program_path": program_path.clone(),
362                    "max_stdout_bytes": MAX_EXEC_STDOUT_BYTES,
363                    "stdout_bytes": result.stdout.total_bytes,
364                }),
365            )));
366        }
367        if result.stderr.overflowed {
368            return Err(IoError::Transport(info_with_details(
369                CODE_EXEC_STDERR_TOO_LARGE,
370                ErrorCategory::Unknown,
371                "program stderr exceeded maximum size",
372                serde_json::json!({
373                    "program_path": program_path.clone(),
374                    "max_stderr_bytes": MAX_EXEC_STDERR_BYTES,
375                    "stderr_bytes": result.stderr.total_bytes,
376                }),
377            )));
378        }
379
380        if let Some(stdin_err) = result.stdin_write_error {
381            if stdin_err.kind == std::io::ErrorKind::BrokenPipe && !result.status.success() {
382                return Err(IoError::Transport(info_with_details(
383                    CODE_EXEC_FAILED,
384                    ErrorCategory::Unknown,
385                    "program exited with non-zero status",
386                    failure_details(&program_path, &result.status),
387                )));
388            }
389            if stdin_err.kind != std::io::ErrorKind::BrokenPipe {
390                return Err(IoError::Transport(info_with_details(
391                    CODE_EXEC_STDIN_WRITE_FAILED,
392                    ErrorCategory::Unknown,
393                    "failed to write program stdin",
394                    serde_json::json!({
395                        "program_path": program_path.clone(),
396                        "io_error_kind": format!("{:?}", stdin_err.kind),
397                    }),
398                )));
399            }
400        }
401
402        if let Some(stdin_err) = result.stdin_close_error {
403            if stdin_err.kind != std::io::ErrorKind::BrokenPipe {
404                return Err(IoError::Transport(info_with_details(
405                    CODE_EXEC_STDIN_WRITE_FAILED,
406                    ErrorCategory::Unknown,
407                    "failed to close program stdin",
408                    serde_json::json!({
409                        "program_path": program_path.clone(),
410                        "io_error_kind": format!("{:?}", stdin_err.kind),
411                    }),
412                )));
413            }
414        }
415
416        if !result.status.success() {
417            return Err(IoError::Transport(info_with_details(
418                CODE_EXEC_FAILED,
419                ErrorCategory::Unknown,
420                "program exited with non-zero status",
421                failure_details(&program_path, &result.status),
422            )));
423        }
424
425        serde_json::from_slice::<serde_json::Value>(&result.stdout.bytes).map_err(|_| {
426            IoError::Other(info(
427                CODE_EXEC_STDOUT_INVALID_JSON,
428                ErrorCategory::ParsingInput,
429                "program stdout was not valid JSON",
430            ))
431        })
432    }
433}
434
435#[cfg(test)]
436#[path = "tests/exec_transport_tests.rs"]
437mod exec_transport_tests;