nu_cmd_lang/core_commands/
do_.rs

1use nu_engine::{command_prelude::*, get_eval_block_with_early_return, redirect_env};
2#[cfg(feature = "os")]
3use nu_protocol::process::{ChildPipe, ChildProcess};
4use nu_protocol::{
5    ByteStream, ByteStreamSource, OutDest, engine::Closure, shell_error::io::IoError,
6};
7
8use std::{
9    io::{Cursor, Read},
10    thread,
11};
12
13#[derive(Clone)]
14pub struct Do;
15
16impl Command for Do {
17    fn name(&self) -> &str {
18        "do"
19    }
20
21    fn description(&self) -> &str {
22        "Run a closure, providing it with the pipeline input."
23    }
24
25    fn signature(&self) -> Signature {
26        Signature::build("do")
27            .required("closure", SyntaxShape::Closure(None), "The closure to run.")
28            .input_output_types(vec![(Type::Any, Type::Any)])
29            .switch(
30                "ignore-errors",
31                "ignore errors as the closure runs",
32                Some('i'),
33            )
34            .switch(
35                "capture-errors",
36                "catch errors as the closure runs, and return them",
37                Some('c'),
38            )
39            .switch(
40                "env",
41                "keep the environment defined inside the command",
42                None,
43            )
44            .rest(
45                "rest",
46                SyntaxShape::Any,
47                "The parameter(s) for the closure.",
48            )
49            .category(Category::Core)
50    }
51
52    fn run(
53        &self,
54        engine_state: &EngineState,
55        caller_stack: &mut Stack,
56        call: &Call,
57        input: PipelineData,
58    ) -> Result<PipelineData, ShellError> {
59        let head = call.head;
60        let block: Closure = call.req(engine_state, caller_stack, 0)?;
61        let rest: Vec<Value> = call.rest(engine_state, caller_stack, 1)?;
62        let ignore_all_errors = call.has_flag(engine_state, caller_stack, "ignore-errors")?;
63
64        let capture_errors = call.has_flag(engine_state, caller_stack, "capture-errors")?;
65        let has_env = call.has_flag(engine_state, caller_stack, "env")?;
66
67        let mut callee_stack = caller_stack.captures_to_stack_preserve_out_dest(block.captures);
68        let block = engine_state.get_block(block.block_id);
69
70        bind_args_to(&mut callee_stack, &block.signature, rest, head)?;
71        let eval_block_with_early_return = get_eval_block_with_early_return(engine_state);
72
73        let result = eval_block_with_early_return(engine_state, &mut callee_stack, block, input)
74            .map(|p| p.body);
75
76        if has_env {
77            // Merge the block's environment to the current stack
78            redirect_env(engine_state, caller_stack, &callee_stack);
79        }
80
81        match result {
82            Ok(PipelineData::ByteStream(stream, metadata)) if capture_errors => {
83                let span = stream.span();
84                #[cfg(not(feature = "os"))]
85                return Err(ShellError::DisabledOsSupport {
86                    msg: "Cannot create a thread to receive stdout message.".to_string(),
87                    span,
88                });
89
90                #[cfg(feature = "os")]
91                match stream.into_child() {
92                    Ok(mut child) => {
93                        // Use a thread to receive stdout message.
94                        // Or we may get a deadlock if child process sends out too much bytes to stderr.
95                        //
96                        // For example: in normal linux system, stderr pipe's limit is 65535 bytes.
97                        // if child process sends out 65536 bytes, the process will be hanged because no consumer
98                        // consumes the first 65535 bytes
99                        // So we need a thread to receive stdout message, then the current thread can continue to consume
100                        // stderr messages.
101                        let stdout_handler = child
102                            .stdout
103                            .take()
104                            .map(|mut stdout| {
105                                thread::Builder::new()
106                                    .name("stdout consumer".to_string())
107                                    .spawn(move || {
108                                        let mut buf = Vec::new();
109                                        stdout.read_to_end(&mut buf).map_err(|err| {
110                                            IoError::new_internal(
111                                                err,
112                                                "Could not read stdout to end",
113                                                nu_protocol::location!(),
114                                            )
115                                        })?;
116                                        Ok::<_, ShellError>(buf)
117                                    })
118                                    .map_err(|err| IoError::new(err, head, None))
119                            })
120                            .transpose()?;
121
122                        // Intercept stderr so we can return it in the error if the exit code is non-zero.
123                        // The threading issues mentioned above dictate why we also need to intercept stdout.
124                        let stderr_msg = match child.stderr.take() {
125                            None => String::new(),
126                            Some(mut stderr) => {
127                                let mut buf = String::new();
128                                stderr
129                                    .read_to_string(&mut buf)
130                                    .map_err(|err| IoError::new(err, span, None))?;
131                                buf
132                            }
133                        };
134
135                        let stdout = if let Some(handle) = stdout_handler {
136                            match handle.join() {
137                                Err(err) => {
138                                    return Err(ShellError::ExternalCommand {
139                                        label: "Fail to receive external commands stdout message"
140                                            .to_string(),
141                                        help: format!("{err:?}"),
142                                        span,
143                                    });
144                                }
145                                Ok(res) => Some(res?),
146                            }
147                        } else {
148                            None
149                        };
150
151                        child.ignore_error(false);
152                        child.wait()?;
153
154                        let mut child = ChildProcess::from_raw(None, None, None, span);
155                        if let Some(stdout) = stdout {
156                            child.stdout = Some(ChildPipe::Tee(Box::new(Cursor::new(stdout))));
157                        }
158                        if !stderr_msg.is_empty() {
159                            child.stderr = Some(ChildPipe::Tee(Box::new(Cursor::new(stderr_msg))));
160                        }
161                        Ok(PipelineData::byte_stream(
162                            ByteStream::child(child, span),
163                            metadata,
164                        ))
165                    }
166                    Err(stream) => Ok(PipelineData::byte_stream(stream, metadata)),
167                }
168            }
169            Ok(PipelineData::ByteStream(mut stream, metadata))
170                if ignore_all_errors
171                    && !matches!(
172                        caller_stack.stdout(),
173                        OutDest::Pipe | OutDest::PipeSeparate | OutDest::Value
174                    ) =>
175            {
176                #[cfg(feature = "os")]
177                if let ByteStreamSource::Child(child) = stream.source_mut() {
178                    child.ignore_error(true);
179                }
180                Ok(PipelineData::byte_stream(stream, metadata))
181            }
182            Ok(PipelineData::Value(Value::Error { .. }, ..)) | Err(_) if ignore_all_errors => {
183                Ok(PipelineData::empty())
184            }
185            Ok(PipelineData::ListStream(stream, metadata)) if ignore_all_errors => {
186                let stream = stream.map(move |value| {
187                    if let Value::Error { .. } = value {
188                        Value::nothing(head)
189                    } else {
190                        value
191                    }
192                });
193                Ok(PipelineData::list_stream(stream, metadata))
194            }
195            r => r,
196        }
197    }
198
199    fn examples(&self) -> Vec<Example<'_>> {
200        vec![
201            Example {
202                description: "Run the closure",
203                example: r#"do { echo hello }"#,
204                result: Some(Value::test_string("hello")),
205            },
206            Example {
207                description: "Run a stored first-class closure",
208                example: r#"let text = "I am enclosed"; let hello = {|| echo $text}; do $hello"#,
209                result: Some(Value::test_string("I am enclosed")),
210            },
211            Example {
212                description: "Run the closure and ignore both shell and external program errors",
213                example: r#"do --ignore-errors { thisisnotarealcommand }"#,
214                result: None,
215            },
216            Example {
217                description: "Abort the pipeline if a program returns a non-zero exit code",
218                example: r#"do --capture-errors { nu --commands 'exit 1' } | myscarycommand"#,
219                result: None,
220            },
221            Example {
222                description: "Run the closure with a positional, type-checked parameter",
223                example: r#"do {|x:int| 100 + $x } 77"#,
224                result: Some(Value::test_int(177)),
225            },
226            Example {
227                description: "Run the closure with pipeline input",
228                example: r#"77 | do { 100 + $in }"#,
229                result: Some(Value::test_int(177)),
230            },
231            Example {
232                description: "Run the closure with a default parameter value",
233                example: r#"77 | do {|x=100| $x + $in }"#,
234                result: Some(Value::test_int(177)),
235            },
236            Example {
237                description: "Run the closure with two positional parameters",
238                example: r#"do {|x,y| $x + $y } 77 100"#,
239                result: Some(Value::test_int(177)),
240            },
241            Example {
242                description: "Run the closure and keep changes to the environment",
243                example: r#"do --env { $env.foo = 'bar' }; $env.foo"#,
244                result: Some(Value::test_string("bar")),
245            },
246        ]
247    }
248}
249
250fn bind_args_to(
251    stack: &mut Stack,
252    signature: &Signature,
253    args: Vec<Value>,
254    head_span: Span,
255) -> Result<(), ShellError> {
256    let mut val_iter = args.into_iter();
257    for (param, required) in signature
258        .required_positional
259        .iter()
260        .map(|p| (p, true))
261        .chain(signature.optional_positional.iter().map(|p| (p, false)))
262    {
263        let var_id = param
264            .var_id
265            .expect("internal error: all custom parameters must have var_ids");
266        if let Some(result) = val_iter.next() {
267            let param_type = param.shape.to_type();
268            if !result.is_subtype_of(&param_type) {
269                return Err(ShellError::CantConvert {
270                    to_type: param.shape.to_type().to_string(),
271                    from_type: result.get_type().to_string(),
272                    span: result.span(),
273                    help: None,
274                });
275            }
276            stack.add_var(var_id, result);
277        } else if let Some(value) = &param.default_value {
278            stack.add_var(var_id, value.to_owned())
279        } else if !required {
280            stack.add_var(var_id, Value::nothing(head_span))
281        } else {
282            return Err(ShellError::MissingParameter {
283                param_name: param.name.to_string(),
284                span: head_span,
285            });
286        }
287    }
288
289    if let Some(rest_positional) = &signature.rest_positional {
290        let mut rest_items = vec![];
291
292        for result in val_iter {
293            rest_items.push(result);
294        }
295
296        let span = if let Some(rest_item) = rest_items.first() {
297            rest_item.span()
298        } else {
299            head_span
300        };
301
302        stack.add_var(
303            rest_positional
304                .var_id
305                .expect("Internal error: rest positional parameter lacks var_id"),
306            Value::list(rest_items, span),
307        )
308    }
309    Ok(())
310}
311
312mod test {
313    #[test]
314    fn test_examples() {
315        use super::Do;
316        use crate::test_examples;
317        test_examples(Do {})
318    }
319}