Skip to main content

nu_cmd_lang/core_commands/
do_.rs

1use nu_engine::{command_prelude::*, get_eval_block_with_early_return, redirect_env};
2#[cfg(feature = "os")]
3use nu_protocol::process::{ChildPipe, ChildProcess};
4use nu_protocol::{
5    ByteStream, ByteStreamSource, OutDest, engine::Closure, shell_error::io::IoError,
6};
7
8use std::{
9    io::{Cursor, Read},
10    thread,
11};
12
13#[derive(Clone)]
14pub struct Do;
15
16impl Command for Do {
17    fn name(&self) -> &str {
18        "do"
19    }
20
21    fn description(&self) -> &str {
22        "Run a closure, providing it with the pipeline input."
23    }
24
25    fn signature(&self) -> Signature {
26        Signature::build("do")
27            .required("closure", SyntaxShape::Closure(None), "The closure to run.")
28            .input_output_types(vec![(Type::Any, Type::Any)])
29            .switch(
30                "ignore-errors",
31                "Ignore errors as the closure runs.",
32                Some('i'),
33            )
34            .switch(
35                "capture-errors",
36                "Catch errors as the closure runs, and return them.",
37                Some('c'),
38            )
39            .switch(
40                "env",
41                "Keep the environment defined inside the command.",
42                None,
43            )
44            .rest(
45                "rest",
46                SyntaxShape::Any,
47                "The parameter(s) for the closure.",
48            )
49            .category(Category::Core)
50    }
51
52    fn run(
53        &self,
54        engine_state: &EngineState,
55        caller_stack: &mut Stack,
56        call: &Call,
57        input: PipelineData,
58    ) -> Result<PipelineData, ShellError> {
59        let head = call.head;
60        let block: Closure = call.req(engine_state, caller_stack, 0)?;
61        let rest: Vec<Value> = call.rest(engine_state, caller_stack, 1)?;
62        let ignore_all_errors = call.has_flag(engine_state, caller_stack, "ignore-errors")?;
63
64        let capture_errors = call.has_flag(engine_state, caller_stack, "capture-errors")?;
65        let has_env = call.has_flag(engine_state, caller_stack, "env")?;
66
67        let mut callee_stack = caller_stack.captures_to_stack_preserve_out_dest(block.captures);
68        let block = engine_state.get_block(block.block_id);
69
70        bind_args_to(&mut callee_stack, &block.signature, rest, head)?;
71        let eval_block_with_early_return = get_eval_block_with_early_return(engine_state);
72
73        let result = eval_block_with_early_return(engine_state, &mut callee_stack, block, input)
74            .map(|p| p.body);
75
76        if has_env {
77            // Merge the block's environment to the current stack
78            redirect_env(engine_state, caller_stack, &callee_stack);
79        }
80
81        match result {
82            Ok(PipelineData::ByteStream(stream, metadata)) if capture_errors => {
83                let span = stream.span();
84                #[cfg(not(feature = "os"))]
85                return Err(ShellError::DisabledOsSupport {
86                    msg: "Cannot create a thread to receive stdout message.".to_string(),
87                    span,
88                });
89
90                #[cfg(feature = "os")]
91                match stream.into_child() {
92                    Ok(mut child) => {
93                        // Use a thread to receive stdout message.
94                        // Or we may get a deadlock if child process sends out too much bytes to stderr.
95                        //
96                        // For example: in normal linux system, stderr pipe's limit is 65535 bytes.
97                        // if child process sends out 65536 bytes, the process will be hanged because no consumer
98                        // consumes the first 65535 bytes
99                        // So we need a thread to receive stdout message, then the current thread can continue to consume
100                        // stderr messages.
101                        let stdout_handler = child
102                            .stdout
103                            .take()
104                            .map(|mut stdout| {
105                                thread::Builder::new()
106                                    .name("stdout consumer".to_string())
107                                    .spawn(move || {
108                                        let mut buf = Vec::new();
109                                        stdout.read_to_end(&mut buf).map_err(|err| {
110                                            IoError::new_internal(
111                                                err,
112                                                "Could not read stdout to end",
113                                            )
114                                        })?;
115                                        Ok::<_, ShellError>(buf)
116                                    })
117                                    .map_err(|err| IoError::new(err, head, None))
118                            })
119                            .transpose()?;
120
121                        // Intercept stderr so we can return it in the error if the exit code is non-zero.
122                        // The threading issues mentioned above dictate why we also need to intercept stdout.
123                        let stderr_msg = match child.stderr.take() {
124                            None => String::new(),
125                            Some(mut stderr) => {
126                                let mut buf = String::new();
127                                stderr
128                                    .read_to_string(&mut buf)
129                                    .map_err(|err| IoError::new(err, span, None))?;
130                                buf
131                            }
132                        };
133
134                        let stdout = if let Some(handle) = stdout_handler {
135                            match handle.join() {
136                                Err(err) => {
137                                    return Err(ShellError::ExternalCommand {
138                                        label: "Fail to receive external commands stdout message"
139                                            .to_string(),
140                                        help: format!("{err:?}"),
141                                        span,
142                                    });
143                                }
144                                Ok(res) => Some(res?),
145                            }
146                        } else {
147                            None
148                        };
149
150                        child.ignore_error(false);
151                        child.wait()?;
152
153                        let mut child = ChildProcess::from_raw(None, None, None, span);
154                        if let Some(stdout) = stdout {
155                            child.stdout = Some(ChildPipe::Tee(Box::new(Cursor::new(stdout))));
156                        }
157                        if !stderr_msg.is_empty() {
158                            child.stderr = Some(ChildPipe::Tee(Box::new(Cursor::new(stderr_msg))));
159                        }
160                        Ok(PipelineData::byte_stream(
161                            ByteStream::child(child, span),
162                            metadata,
163                        ))
164                    }
165                    Err(stream) => Ok(PipelineData::byte_stream(stream, metadata)),
166                }
167            }
168            Ok(PipelineData::ByteStream(mut stream, metadata))
169                if ignore_all_errors
170                    && !matches!(
171                        caller_stack.stdout(),
172                        OutDest::Pipe | OutDest::PipeSeparate | OutDest::Value
173                    ) =>
174            {
175                #[cfg(feature = "os")]
176                if let ByteStreamSource::Child(child) = stream.source_mut() {
177                    child.ignore_error(true);
178                }
179                Ok(PipelineData::byte_stream(stream, metadata))
180            }
181            Ok(PipelineData::Value(Value::Error { .. }, ..)) | Err(_) if ignore_all_errors => {
182                Ok(PipelineData::empty())
183            }
184            Ok(PipelineData::ListStream(stream, metadata)) if ignore_all_errors => {
185                let stream = stream.map(move |value| {
186                    if let Value::Error { .. } = value {
187                        Value::nothing(head)
188                    } else {
189                        value
190                    }
191                });
192                Ok(PipelineData::list_stream(stream, metadata))
193            }
194            r => r,
195        }
196    }
197
198    fn examples(&self) -> Vec<Example<'_>> {
199        vec![
200            Example {
201                description: "Run the closure.",
202                example: "do { echo hello }",
203                result: Some(Value::test_string("hello")),
204            },
205            Example {
206                description: "Run a stored first-class closure.",
207                example: r#"let text = "I am enclosed"; let hello = {|| echo $text}; do $hello"#,
208                result: Some(Value::test_string("I am enclosed")),
209            },
210            Example {
211                description: "Run the closure and ignore both shell and external program errors.",
212                example: "do --ignore-errors { thisisnotarealcommand }",
213                result: None,
214            },
215            Example {
216                description: "Abort the pipeline if a program returns a non-zero exit code.",
217                example: "do --capture-errors { nu --commands 'exit 1' } | myscarycommand",
218                result: None,
219            },
220            Example {
221                description: "Run the closure with a positional, type-checked parameter.",
222                example: "do {|x:int| 100 + $x } 77",
223                result: Some(Value::test_int(177)),
224            },
225            Example {
226                description: "Run the closure with pipeline input.",
227                example: "77 | do { 100 + $in }",
228                result: Some(Value::test_int(177)),
229            },
230            Example {
231                description: "Run the closure with a default parameter value.",
232                example: "77 | do {|x=100| $x + $in }",
233                result: Some(Value::test_int(177)),
234            },
235            Example {
236                description: "Run the closure with two positional parameters.",
237                example: "do {|x,y| $x + $y } 77 100",
238                result: Some(Value::test_int(177)),
239            },
240            Example {
241                description: "Run the closure and keep changes to the environment.",
242                example: "do --env { $env.foo = 'bar' }; $env.foo",
243                result: Some(Value::test_string("bar")),
244            },
245        ]
246    }
247}
248
249fn bind_args_to(
250    stack: &mut Stack,
251    signature: &Signature,
252    args: Vec<Value>,
253    head_span: Span,
254) -> Result<(), ShellError> {
255    let mut val_iter = args.into_iter();
256    for (param, required) in signature
257        .required_positional
258        .iter()
259        .map(|p| (p, true))
260        .chain(signature.optional_positional.iter().map(|p| (p, false)))
261    {
262        let var_id = param
263            .var_id
264            .expect("internal error: all custom parameters must have var_ids");
265        if let Some(result) = val_iter.next() {
266            let param_type = param.shape.to_type();
267            if !result.is_subtype_of(&param_type) {
268                return Err(ShellError::CantConvert {
269                    to_type: param.shape.to_type().to_string(),
270                    from_type: result.get_type().to_string(),
271                    span: result.span(),
272                    help: None,
273                });
274            }
275            stack.add_var(var_id, result);
276        } else if let Some(value) = &param.default_value {
277            stack.add_var(var_id, value.to_owned())
278        } else if !required {
279            stack.add_var(var_id, Value::nothing(head_span))
280        } else {
281            return Err(ShellError::MissingParameter {
282                param_name: param.name.to_string(),
283                span: head_span,
284            });
285        }
286    }
287
288    if let Some(rest_positional) = &signature.rest_positional {
289        let mut rest_items = vec![];
290
291        for result in val_iter {
292            rest_items.push(result);
293        }
294
295        let span = if let Some(rest_item) = rest_items.first() {
296            rest_item.span()
297        } else {
298            head_span
299        };
300
301        stack.add_var(
302            rest_positional
303                .var_id
304                .expect("Internal error: rest positional parameter lacks var_id"),
305            Value::list(rest_items, span),
306        )
307    }
308    Ok(())
309}
310
311mod test {
312    #[test]
313    fn test_examples() -> nu_test_support::Result {
314        use super::Do;
315        nu_test_support::test().examples(Do)
316    }
317}