xtask_todo_lib/devshell/command/dispatch/
mod.rs1mod builtin_impl;
4mod workspace;
5
6use std::io::Cursor;
7use std::io::{Read, Write};
8
9use super::super::parser::{Pipeline, SimpleCommand};
10use super::super::vfs::Vfs;
11use super::types::{BuiltinError, ExecContext, RunResult};
12
13use builtin_impl::run_builtin_core;
14use workspace::{workspace_read_file, workspace_write_file};
15
16pub const PIPELINE_INTER_STAGE_MAX_BYTES: usize = 16 * 1024 * 1024;
18
19#[inline]
20const fn check_pipeline_inter_stage_size(len: usize) -> Result<(), BuiltinError> {
21 if len > PIPELINE_INTER_STAGE_MAX_BYTES {
22 return Err(BuiltinError::PipelineInterStageBufferExceeded {
23 limit: PIPELINE_INTER_STAGE_MAX_BYTES,
24 actual: len,
25 });
26 }
27 Ok(())
28}
29
30fn run_builtin_with_streams(
32 vfs: &mut Vfs,
33 vm_session: &mut super::super::vm::SessionHolder,
34 default_stdin: &mut dyn Read,
35 default_stdout: &mut dyn Write,
36 default_stderr: &mut dyn Write,
37 cmd: &SimpleCommand,
38) -> Result<(), BuiltinError> {
39 let redirects = &cmd.redirects;
40 let argv = &cmd.argv;
41
42 let mut stdin_override: Option<Cursor<Vec<u8>>> = None;
43 let mut stdout_override: Option<Vec<u8>> = None;
44 let mut stderr_override: Option<Vec<u8>> = None;
45 let mut stdout_redirect_path: Option<String> = None;
46 let mut stderr_redirect_path: Option<String> = None;
47
48 for r in redirects {
49 match r.fd {
50 0 => {
51 let content = workspace_read_file(vfs, vm_session, &r.path)?;
52 stdin_override = Some(Cursor::new(content));
53 }
54 1 => {
55 stdout_override = Some(Vec::new());
56 stdout_redirect_path = Some(r.path.clone());
57 }
58 2 => {
59 stderr_override = Some(Vec::new());
60 stderr_redirect_path = Some(r.path.clone());
61 }
62 _ => {}
63 }
64 }
65
66 let stdin: &mut dyn Read = stdin_override
67 .as_mut()
68 .map_or(default_stdin, |c| c as &mut dyn Read);
69 let stdout: &mut dyn Write = stdout_override
70 .as_mut()
71 .map_or(default_stdout, |v| v as &mut dyn Write);
72 let stderr: &mut dyn Write = stderr_override
73 .as_mut()
74 .map_or(default_stderr, |v| v as &mut dyn Write);
75
76 let result = run_builtin_core(vfs, vm_session, stdin, stdout, stderr, argv);
77
78 if let Some(path) = stdout_redirect_path {
79 if let Some(buf) = &stdout_override {
80 workspace_write_file(vfs, vm_session, &path, buf)?;
81 }
82 }
83 if let Some(path) = stderr_redirect_path {
84 if let Some(buf) = &stderr_override {
85 workspace_write_file(vfs, vm_session, &path, buf)?;
86 }
87 }
88
89 result
90}
91
92pub fn run_builtin(ctx: &mut ExecContext<'_>, cmd: &SimpleCommand) -> Result<(), BuiltinError> {
98 run_builtin_with_streams(
99 ctx.vfs,
100 ctx.vm_session,
101 ctx.stdin,
102 ctx.stdout,
103 ctx.stderr,
104 cmd,
105 )
106}
107
108pub fn execute_pipeline(
120 ctx: &mut ExecContext<'_>,
121 pipeline: &Pipeline,
122) -> Result<RunResult, BuiltinError> {
123 let commands = &pipeline.commands;
124 if commands.is_empty() {
125 return Ok(RunResult::Continue);
126 }
127
128 let first_argv0 = commands
129 .first()
130 .and_then(|c| c.argv.first())
131 .map(String::as_str);
132 if first_argv0 == Some("exit") || first_argv0 == Some("quit") {
133 return Ok(RunResult::Exit);
134 }
135
136 let mut prev_output: Option<Vec<u8>> = None;
137
138 for (i, cmd) in commands.iter().enumerate() {
139 let is_first = i == 0;
140 let is_last = i == commands.len() - 1;
141
142 let mut pipe_stdin: Option<Cursor<Vec<u8>>> = prev_output.take().map(Cursor::new);
143 let mut next_buffer = Vec::new();
144
145 let stdin: &mut dyn Read = if is_first {
146 ctx.stdin
147 } else {
148 pipe_stdin
149 .as_mut()
150 .expect("non-first pipeline stage has pipe input") as &mut dyn Read
151 };
152 let stdout: &mut dyn Write = if is_last {
153 ctx.stdout
154 } else {
155 &mut next_buffer
156 };
157
158 run_builtin_with_streams(ctx.vfs, ctx.vm_session, stdin, stdout, ctx.stderr, cmd)?;
159
160 if !is_last {
161 check_pipeline_inter_stage_size(next_buffer.len())?;
162 prev_output = Some(next_buffer);
163 }
164 }
165
166 Ok(RunResult::Continue)
167}
168
169#[cfg(test)]
170mod pipeline_limit_tests {
171 use super::*;
172 use crate::devshell::parser::{Pipeline, SimpleCommand};
173 use crate::devshell::vfs::Vfs;
174 use crate::devshell::vm::SessionHolder;
175 use std::io::Cursor;
176
177 #[test]
178 fn pipeline_inter_stage_limit_boundary() {
179 assert!(check_pipeline_inter_stage_size(PIPELINE_INTER_STAGE_MAX_BYTES).is_ok());
180 let e = check_pipeline_inter_stage_size(PIPELINE_INTER_STAGE_MAX_BYTES + 1).unwrap_err();
181 match e {
182 BuiltinError::PipelineInterStageBufferExceeded { limit, actual } => {
183 assert_eq!(limit, PIPELINE_INTER_STAGE_MAX_BYTES);
184 assert_eq!(actual, PIPELINE_INTER_STAGE_MAX_BYTES + 1);
185 }
186 _ => panic!("expected PipelineInterStageBufferExceeded"),
187 }
188 }
189
190 #[test]
191 fn execute_pipeline_stops_on_non_terminal_stage_overflow() {
192 let mut vfs = Vfs::new();
193 let mut vm_session = SessionHolder::new_host();
194 let mut stdin = Cursor::new(Vec::<u8>::new());
195 let mut stdout = Vec::new();
196 let mut stderr = Vec::new();
197 let mut ctx = ExecContext {
198 vfs: &mut vfs,
199 stdin: &mut stdin,
200 stdout: &mut stdout,
201 stderr: &mut stderr,
202 vm_session: &mut vm_session,
203 };
204 let oversized = "x".repeat(PIPELINE_INTER_STAGE_MAX_BYTES + 1);
205 let pipeline = Pipeline {
206 commands: vec![
207 SimpleCommand {
208 argv: vec!["echo".to_string(), oversized],
209 redirects: vec![],
210 },
211 SimpleCommand {
212 argv: vec!["echo".to_string(), "stage2_should_not_run".to_string()],
213 redirects: vec![],
214 },
215 ],
216 };
217
218 let err = execute_pipeline(&mut ctx, &pipeline).unwrap_err();
219 match err {
220 BuiltinError::PipelineInterStageBufferExceeded { limit, actual } => {
221 assert_eq!(limit, PIPELINE_INTER_STAGE_MAX_BYTES);
222 assert!(actual > PIPELINE_INTER_STAGE_MAX_BYTES);
223 }
224 _ => panic!("expected PipelineInterStageBufferExceeded"),
225 }
226 let out = String::from_utf8(stdout).expect("stdout should be valid UTF-8");
227 assert!(
228 !out.contains("stage2_should_not_run"),
229 "pipeline should stop before executing stage2: {out}"
230 );
231 }
232}