Skip to main content

xtask_todo_lib/devshell/command/dispatch/
mod.rs

1//! Redirect handling, pipeline execution, and builtin dispatch.
2
3mod builtin_impl;
4mod workspace;
5
6use std::io::Cursor;
7use std::io::{Read, Write};
8
9use super::super::parser::{Pipeline, SimpleCommand};
10use super::super::vfs::Vfs;
11use super::types::{BuiltinError, ExecContext, RunResult};
12
13use builtin_impl::run_builtin_core;
14use workspace::{workspace_read_file, workspace_write_file};
15
16/// Maximum bytes buffered for a **non-terminal** pipeline stage’s stdout (host memory; design §8.2).
17pub const PIPELINE_INTER_STAGE_MAX_BYTES: usize = 16 * 1024 * 1024;
18
19#[inline]
20const fn check_pipeline_inter_stage_size(len: usize) -> Result<(), BuiltinError> {
21    if len > PIPELINE_INTER_STAGE_MAX_BYTES {
22        return Err(BuiltinError::PipelineInterStageBufferExceeded {
23            limit: PIPELINE_INTER_STAGE_MAX_BYTES,
24            actual: len,
25        });
26    }
27    Ok(())
28}
29
30/// Run a single command with given streams. Redirects override the provided stdin/stdout/stderr.
31fn run_builtin_with_streams(
32    vfs: &mut Vfs,
33    vm_session: &mut super::super::vm::SessionHolder,
34    default_stdin: &mut dyn Read,
35    default_stdout: &mut dyn Write,
36    default_stderr: &mut dyn Write,
37    cmd: &SimpleCommand,
38) -> Result<(), BuiltinError> {
39    let redirects = &cmd.redirects;
40    let argv = &cmd.argv;
41
42    let mut stdin_override: Option<Cursor<Vec<u8>>> = None;
43    let mut stdout_override: Option<Vec<u8>> = None;
44    let mut stderr_override: Option<Vec<u8>> = None;
45    let mut stdout_redirect_path: Option<String> = None;
46    let mut stderr_redirect_path: Option<String> = None;
47
48    for r in redirects {
49        match r.fd {
50            0 => {
51                let content = workspace_read_file(vfs, vm_session, &r.path)?;
52                stdin_override = Some(Cursor::new(content));
53            }
54            1 => {
55                stdout_override = Some(Vec::new());
56                stdout_redirect_path = Some(r.path.clone());
57            }
58            2 => {
59                stderr_override = Some(Vec::new());
60                stderr_redirect_path = Some(r.path.clone());
61            }
62            _ => {}
63        }
64    }
65
66    let stdin: &mut dyn Read = stdin_override
67        .as_mut()
68        .map_or(default_stdin, |c| c as &mut dyn Read);
69    let stdout: &mut dyn Write = stdout_override
70        .as_mut()
71        .map_or(default_stdout, |v| v as &mut dyn Write);
72    let stderr: &mut dyn Write = stderr_override
73        .as_mut()
74        .map_or(default_stderr, |v| v as &mut dyn Write);
75
76    let result = run_builtin_core(vfs, vm_session, stdin, stdout, stderr, argv);
77
78    if let Some(path) = stdout_redirect_path {
79        if let Some(buf) = &stdout_override {
80            workspace_write_file(vfs, vm_session, &path, buf)?;
81        }
82    }
83    if let Some(path) = stderr_redirect_path {
84        if let Some(buf) = &stderr_override {
85            workspace_write_file(vfs, vm_session, &path, buf)?;
86        }
87    }
88
89    result
90}
91
92/// Apply redirects: build optional stdin (Cursor over file content), stdout/stderr buffers.
93/// Then run the builtin with the effective streams; after that write stdout/stderr buffers to vfs if redirected.
94///
95/// # Errors
96/// Returns `BuiltinError` on redirect or builtin execution failure.
97pub fn run_builtin(ctx: &mut ExecContext<'_>, cmd: &SimpleCommand) -> Result<(), BuiltinError> {
98    run_builtin_with_streams(
99        ctx.vfs,
100        ctx.vm_session,
101        ctx.stdin,
102        ctx.stdout,
103        ctx.stderr,
104        cmd,
105    )
106}
107
108/// Execute a pipeline: run each command with stdin from previous stage (or `ctx.stdin` for first),
109/// stdout to a buffer; last command's stdout is written to `ctx.stdout`. Redirects override pipe.
110///
111/// Non-final stages buffer **all** stdout in host memory; size is capped at
112/// [`PIPELINE_INTER_STAGE_MAX_BYTES`] (design §8.2).
113///
114/// # Errors
115/// Returns `BuiltinError` if any command or redirect fails.
116///
117/// # Panics
118/// Panics if the pipeline state is inconsistent (non-first stage without pipe input); this is a programming error.
119pub fn execute_pipeline(
120    ctx: &mut ExecContext<'_>,
121    pipeline: &Pipeline,
122) -> Result<RunResult, BuiltinError> {
123    let commands = &pipeline.commands;
124    if commands.is_empty() {
125        return Ok(RunResult::Continue);
126    }
127
128    let first_argv0 = commands
129        .first()
130        .and_then(|c| c.argv.first())
131        .map(String::as_str);
132    if first_argv0 == Some("exit") || first_argv0 == Some("quit") {
133        return Ok(RunResult::Exit);
134    }
135
136    let mut prev_output: Option<Vec<u8>> = None;
137
138    for (i, cmd) in commands.iter().enumerate() {
139        let is_first = i == 0;
140        let is_last = i == commands.len() - 1;
141
142        let mut pipe_stdin: Option<Cursor<Vec<u8>>> = prev_output.take().map(Cursor::new);
143        let mut next_buffer = Vec::new();
144
145        let stdin: &mut dyn Read = if is_first {
146            ctx.stdin
147        } else {
148            pipe_stdin
149                .as_mut()
150                .expect("non-first pipeline stage has pipe input") as &mut dyn Read
151        };
152        let stdout: &mut dyn Write = if is_last {
153            ctx.stdout
154        } else {
155            &mut next_buffer
156        };
157
158        run_builtin_with_streams(ctx.vfs, ctx.vm_session, stdin, stdout, ctx.stderr, cmd)?;
159
160        if !is_last {
161            check_pipeline_inter_stage_size(next_buffer.len())?;
162            prev_output = Some(next_buffer);
163        }
164    }
165
166    Ok(RunResult::Continue)
167}
168
169#[cfg(test)]
170mod pipeline_limit_tests {
171    use super::*;
172
173    #[test]
174    fn pipeline_inter_stage_limit_boundary() {
175        assert!(check_pipeline_inter_stage_size(PIPELINE_INTER_STAGE_MAX_BYTES).is_ok());
176        let e = check_pipeline_inter_stage_size(PIPELINE_INTER_STAGE_MAX_BYTES + 1).unwrap_err();
177        match e {
178            BuiltinError::PipelineInterStageBufferExceeded { limit, actual } => {
179                assert_eq!(limit, PIPELINE_INTER_STAGE_MAX_BYTES);
180                assert_eq!(actual, PIPELINE_INTER_STAGE_MAX_BYTES + 1);
181            }
182            _ => panic!("expected PipelineInterStageBufferExceeded"),
183        }
184    }
185}