Skip to main content

nu_cmd_lang/core_commands/
do_.rs

1use nu_engine::{ClosureEvalOnce, command_prelude::*};
2#[cfg(feature = "os")]
3use nu_protocol::process::{ChildPipe, ChildProcess};
4use nu_protocol::{
5    ByteStream, ByteStreamSource, OutDest, engine::Closure, shell_error::io::IoError,
6};
7
8use std::{
9    io::{Cursor, Read},
10    thread,
11};
12
13#[derive(Clone)]
14pub struct Do;
15
16impl Command for Do {
17    fn name(&self) -> &str {
18        "do"
19    }
20
21    fn description(&self) -> &str {
22        "Run a closure, providing it with the pipeline input."
23    }
24
25    fn signature(&self) -> Signature {
26        Signature::build("do")
27            .required("closure", SyntaxShape::Closure(None), "The closure to run.")
28            .input_output_types(vec![(Type::Any, Type::Any)])
29            .switch(
30                "ignore-errors",
31                "Ignore errors as the closure runs.",
32                Some('i'),
33            )
34            .switch(
35                "capture-errors",
36                "Catch errors as the closure runs, and return them.",
37                Some('c'),
38            )
39            .switch(
40                "env",
41                "Keep the environment defined inside the command.",
42                None,
43            )
44            .rest(
45                "rest",
46                SyntaxShape::Any,
47                "The parameter(s) for the closure.",
48            )
49            .category(Category::Core)
50    }
51
52    fn run(
53        &self,
54        engine_state: &EngineState,
55        caller_stack: &mut Stack,
56        call: &Call,
57        input: PipelineData,
58    ) -> Result<PipelineData, ShellError> {
59        let head = call.head;
60        let closure: Closure = call.req(engine_state, caller_stack, 0)?;
61        let rest: Vec<Value> = call.rest(engine_state, caller_stack, 1)?;
62        let ignore_all_errors = call.has_flag(engine_state, caller_stack, "ignore-errors")?;
63
64        let capture_errors = call.has_flag(engine_state, caller_stack, "capture-errors")?;
65        let has_env = call.has_flag(engine_state, caller_stack, "env")?;
66
67        let result = if has_env {
68            ClosureEvalOnce::new_env_preserve_out_dest(engine_state, caller_stack, closure)
69        } else {
70            ClosureEvalOnce::new_preserve_out_dest(engine_state, caller_stack, closure)
71        }
72        .add_args(rest)?
73        .run_with_input(input);
74
75        match result {
76            Ok(PipelineData::ByteStream(stream, metadata)) if capture_errors => {
77                let span = stream.span();
78                #[cfg(not(feature = "os"))]
79                return Err(ShellError::DisabledOsSupport {
80                    msg: "Cannot create a thread to receive stdout message.".to_string(),
81                    span,
82                });
83
84                #[cfg(feature = "os")]
85                match stream.into_child() {
86                    Ok(mut child) => {
87                        // Use a thread to receive stdout message.
88                        // Or we may get a deadlock if child process sends out too much bytes to stderr.
89                        //
90                        // For example: in normal linux system, stderr pipe's limit is 65535 bytes.
91                        // if child process sends out 65536 bytes, the process will be hanged because no consumer
92                        // consumes the first 65535 bytes
93                        // So we need a thread to receive stdout message, then the current thread can continue to consume
94                        // stderr messages.
95                        let stdout_handler = child
96                            .stdout
97                            .take()
98                            .map(|mut stdout| {
99                                thread::Builder::new()
100                                    .name("stdout consumer".to_string())
101                                    .spawn(move || {
102                                        let mut buf = Vec::new();
103                                        stdout.read_to_end(&mut buf).map_err(|err| {
104                                            IoError::new_internal(
105                                                err,
106                                                "Could not read stdout to end",
107                                            )
108                                        })?;
109                                        Ok::<_, ShellError>(buf)
110                                    })
111                                    .map_err(|err| IoError::new(err, head, None))
112                            })
113                            .transpose()?;
114
115                        // Intercept stderr so we can return it in the error if the exit code is non-zero.
116                        // The threading issues mentioned above dictate why we also need to intercept stdout.
117                        let stderr_msg = match child.stderr.take() {
118                            None => String::new(),
119                            Some(mut stderr) => {
120                                let mut buf = String::new();
121                                stderr
122                                    .read_to_string(&mut buf)
123                                    .map_err(|err| IoError::new(err, span, None))?;
124                                buf
125                            }
126                        };
127
128                        let stdout = if let Some(handle) = stdout_handler {
129                            match handle.join() {
130                                Err(err) => {
131                                    return Err(ShellError::ExternalCommand {
132                                        label: "Fail to receive external commands stdout message"
133                                            .to_string(),
134                                        help: format!("{err:?}"),
135                                        span,
136                                    });
137                                }
138                                Ok(res) => Some(res?),
139                            }
140                        } else {
141                            None
142                        };
143
144                        child.ignore_error(false);
145                        child.wait()?;
146
147                        let mut child = ChildProcess::from_raw(None, None, None, span);
148                        if let Some(stdout) = stdout {
149                            child.stdout = Some(ChildPipe::Tee(Box::new(Cursor::new(stdout))));
150                        }
151                        if !stderr_msg.is_empty() {
152                            child.stderr = Some(ChildPipe::Tee(Box::new(Cursor::new(stderr_msg))));
153                        }
154                        Ok(PipelineData::byte_stream(
155                            ByteStream::child(child, span),
156                            metadata,
157                        ))
158                    }
159                    Err(stream) => Ok(PipelineData::byte_stream(stream, metadata)),
160                }
161            }
162            Ok(PipelineData::ByteStream(mut stream, metadata))
163                if ignore_all_errors
164                    && !matches!(
165                        caller_stack.stdout(),
166                        OutDest::Pipe | OutDest::PipeSeparate | OutDest::Value
167                    ) =>
168            {
169                #[cfg(feature = "os")]
170                if let ByteStreamSource::Child(child) = stream.source_mut() {
171                    child.ignore_error(true);
172                }
173                Ok(PipelineData::byte_stream(stream, metadata))
174            }
175            Ok(PipelineData::Value(Value::Error { .. }, ..)) | Err(_) if ignore_all_errors => {
176                Ok(PipelineData::empty())
177            }
178            Ok(PipelineData::ListStream(stream, metadata)) if ignore_all_errors => {
179                let stream = stream.map(move |value| {
180                    if let Value::Error { .. } = value {
181                        Value::nothing(head)
182                    } else {
183                        value
184                    }
185                });
186                Ok(PipelineData::list_stream(stream, metadata))
187            }
188            r => r,
189        }
190    }
191
192    fn examples(&self) -> Vec<Example<'_>> {
193        vec![
194            Example {
195                description: "Run the closure.",
196                example: "do { echo hello }",
197                result: Some(Value::test_string("hello")),
198            },
199            Example {
200                description: "Run a stored first-class closure.",
201                example: r#"let text = "I am enclosed"; let hello = {|| echo $text}; do $hello"#,
202                result: Some(Value::test_string("I am enclosed")),
203            },
204            Example {
205                description: "Run the closure and ignore both shell and external program errors.",
206                example: "do --ignore-errors { thisisnotarealcommand }",
207                result: None,
208            },
209            Example {
210                description: "Abort the pipeline if a program returns a non-zero exit code.",
211                example: "do --capture-errors { nu --commands 'exit 1' } | myscarycommand",
212                result: None,
213            },
214            Example {
215                description: "Run the closure with a positional, type-checked parameter.",
216                example: "do {|x:int| 100 + $x } 77",
217                result: Some(Value::test_int(177)),
218            },
219            Example {
220                description: "Run the closure with pipeline input.",
221                example: "77 | do { 100 + $in }",
222                result: Some(Value::test_int(177)),
223            },
224            Example {
225                description: "Run the closure with a default parameter value.",
226                example: "77 | do {|x=100| $x + $in }",
227                result: Some(Value::test_int(177)),
228            },
229            Example {
230                description: "Run the closure with two positional parameters.",
231                example: "do {|x,y| $x + $y } 77 100",
232                result: Some(Value::test_int(177)),
233            },
234            Example {
235                description: "Run the closure and keep changes to the environment.",
236                example: "do --env { $env.foo = 'bar' }; $env.foo",
237                result: Some(Value::test_string("bar")),
238            },
239        ]
240    }
241}
242
243mod test {
244    #[test]
245    fn test_examples() -> nu_test_support::Result {
246        use super::Do;
247        nu_test_support::test().examples(Do)
248    }
249}