nu_command/filters/
tee.rs

1use nu_engine::{command_prelude::*, get_eval_block_with_early_return};
2#[cfg(feature = "os")]
3use nu_protocol::process::ChildPipe;
4#[cfg(test)]
5use nu_protocol::shell_error;
6use nu_protocol::{
7    ByteStream, ByteStreamSource, OutDest, PipelineMetadata, Signals,
8    byte_stream::copy_with_signals, engine::Closure, report_shell_error, shell_error::io::IoError,
9};
10use std::{
11    io::{self, Read, Write},
12    sync::{
13        Arc,
14        mpsc::{self, Sender},
15    },
16    thread::{self, JoinHandle},
17};
18
19#[derive(Clone)]
20pub struct Tee;
21
22impl Command for Tee {
23    fn name(&self) -> &str {
24        "tee"
25    }
26
27    fn description(&self) -> &str {
28        "Copy a stream to another command in parallel."
29    }
30
31    fn extra_description(&self) -> &str {
32        r#"This is useful for doing something else with a stream while still continuing to
33use it in your pipeline."#
34    }
35
36    fn signature(&self) -> Signature {
37        Signature::build("tee")
38            .input_output_type(Type::Any, Type::Any)
39            .switch(
40                "stderr",
41                "For external commands: copy the standard error stream instead.",
42                Some('e'),
43            )
44            .required(
45                "closure",
46                SyntaxShape::Closure(None),
47                "The other command to send the stream to.",
48            )
49            .category(Category::Filters)
50    }
51
52    fn examples(&self) -> Vec<Example<'_>> {
53        vec![
54            Example {
55                example: "http get http://example.org/ | tee { save example.html }",
56                description: "Save a webpage to a file while also printing it",
57                result: None,
58            },
59            Example {
60                example: "nu -c 'print -e error; print ok' | tee --stderr { save error.log } | complete",
61                description: "Save error messages from an external command to a file without \
62                    redirecting them",
63                result: None,
64            },
65            Example {
66                example: "1..100 | tee { each { print } } | math sum | wrap sum",
67                description: "Print numbers and their sum",
68                result: None,
69            },
70            Example {
71                example: "10000 | tee { 1..$in | print } | $in * 5",
72                description: "Do something with a value on another thread, while also passing through the value",
73                result: Some(Value::test_int(50000)),
74            },
75        ]
76    }
77
78    fn run(
79        &self,
80        engine_state: &EngineState,
81        stack: &mut Stack,
82        call: &Call,
83        input: PipelineData,
84    ) -> Result<PipelineData, ShellError> {
85        let head = call.head;
86        let from_io_error = IoError::factory(head, None);
87        let use_stderr = call.has_flag(engine_state, stack, "stderr")?;
88
89        let closure: Spanned<Closure> = call.req(engine_state, stack, 0)?;
90        let closure_span = closure.span;
91        let closure = closure.item;
92
93        let engine_state_arc = Arc::new(engine_state.clone());
94
95        let mut eval_block = {
96            let closure_engine_state = engine_state_arc.clone();
97            let mut closure_stack = stack
98                .captures_to_stack_preserve_out_dest(closure.captures)
99                .reset_pipes();
100            let eval_block_with_early_return = get_eval_block_with_early_return(engine_state);
101
102            move |input| {
103                let result = eval_block_with_early_return(
104                    &closure_engine_state,
105                    &mut closure_stack,
106                    closure_engine_state.get_block(closure.block_id),
107                    input,
108                )
109                .map(|p| p.body);
110                // Make sure to drain any iterator produced to avoid unexpected behavior
111                result.and_then(|data| data.drain().map(|_| ()))
112            }
113        };
114
115        // Convert values that can be represented as streams into streams. Streams can pass errors
116        // through later, so if we treat string/binary/list as a stream instead, it's likely that
117        // we can get the error back to the original thread.
118        let span = input.span().unwrap_or(head);
119        let input = input
120            .try_into_stream(engine_state)
121            .unwrap_or_else(|original_input| original_input);
122
123        if let PipelineData::ByteStream(stream, metadata) = input {
124            let type_ = stream.type_();
125
126            let info = StreamInfo {
127                span,
128                signals: engine_state.signals().clone(),
129                type_,
130                metadata: metadata.clone(),
131            };
132
133            match stream.into_source() {
134                ByteStreamSource::Read(read) => {
135                    if use_stderr {
136                        return stderr_misuse(span, head);
137                    }
138
139                    let tee_thread = spawn_tee(info, eval_block)?;
140                    let tee = IoTee::new(read, tee_thread);
141
142                    Ok(PipelineData::byte_stream(
143                        ByteStream::read(tee, span, engine_state.signals().clone(), type_),
144                        metadata,
145                    ))
146                }
147                ByteStreamSource::File(file) => {
148                    if use_stderr {
149                        return stderr_misuse(span, head);
150                    }
151
152                    let tee_thread = spawn_tee(info, eval_block)?;
153                    let tee = IoTee::new(file, tee_thread);
154
155                    Ok(PipelineData::byte_stream(
156                        ByteStream::read(tee, span, engine_state.signals().clone(), type_),
157                        metadata,
158                    ))
159                }
160                #[cfg(feature = "os")]
161                ByteStreamSource::Child(mut child) => {
162                    let stderr_thread = if use_stderr {
163                        let stderr_thread = if let Some(stderr) = child.stderr.take() {
164                            let tee_thread = spawn_tee(info.clone(), eval_block)?;
165                            let tee = IoTee::new(stderr, tee_thread);
166                            match stack.stderr() {
167                                OutDest::Pipe | OutDest::PipeSeparate | OutDest::Value => {
168                                    child.stderr = Some(ChildPipe::Tee(Box::new(tee)));
169                                    Ok(None)
170                                }
171                                OutDest::Null => copy_on_thread(tee, io::sink(), &info).map(Some),
172                                OutDest::Print | OutDest::Inherit => {
173                                    copy_on_thread(tee, io::stderr(), &info).map(Some)
174                                }
175                                OutDest::File(file) => {
176                                    copy_on_thread(tee, file.clone(), &info).map(Some)
177                                }
178                            }?
179                        } else {
180                            None
181                        };
182
183                        if let Some(stdout) = child.stdout.take() {
184                            match stack.stdout() {
185                                OutDest::Pipe | OutDest::PipeSeparate | OutDest::Value => {
186                                    child.stdout = Some(stdout);
187                                    Ok(())
188                                }
189                                OutDest::Null => copy_pipe(stdout, io::sink(), &info),
190                                OutDest::Print | OutDest::Inherit => {
191                                    copy_pipe(stdout, io::stdout(), &info)
192                                }
193                                OutDest::File(file) => copy_pipe(stdout, file.as_ref(), &info),
194                            }?;
195                        }
196
197                        stderr_thread
198                    } else {
199                        let stderr_thread = if let Some(stderr) = child.stderr.take() {
200                            let info = info.clone();
201                            match stack.stderr() {
202                                OutDest::Pipe | OutDest::PipeSeparate | OutDest::Value => {
203                                    child.stderr = Some(stderr);
204                                    Ok(None)
205                                }
206                                OutDest::Null => {
207                                    copy_pipe_on_thread(stderr, io::sink(), &info).map(Some)
208                                }
209                                OutDest::Print | OutDest::Inherit => {
210                                    copy_pipe_on_thread(stderr, io::stderr(), &info).map(Some)
211                                }
212                                OutDest::File(file) => {
213                                    copy_pipe_on_thread(stderr, file.clone(), &info).map(Some)
214                                }
215                            }?
216                        } else {
217                            None
218                        };
219
220                        if let Some(stdout) = child.stdout.take() {
221                            let tee_thread = spawn_tee(info.clone(), eval_block)?;
222                            let tee = IoTee::new(stdout, tee_thread);
223                            match stack.stdout() {
224                                OutDest::Pipe | OutDest::PipeSeparate | OutDest::Value => {
225                                    child.stdout = Some(ChildPipe::Tee(Box::new(tee)));
226                                    Ok(())
227                                }
228                                OutDest::Null => copy(tee, io::sink(), &info),
229                                OutDest::Print | OutDest::Inherit => copy(tee, io::stdout(), &info),
230                                OutDest::File(file) => copy(tee, file.as_ref(), &info),
231                            }?;
232                        }
233
234                        stderr_thread
235                    };
236
237                    if child.stdout.is_some() || child.stderr.is_some() {
238                        Ok(PipelineData::byte_stream(
239                            ByteStream::child(*child, span),
240                            metadata,
241                        ))
242                    } else {
243                        if let Some(thread) = stderr_thread {
244                            thread.join().unwrap_or_else(|_| Err(panic_error()))?;
245                        }
246                        child.wait()?;
247                        Ok(PipelineData::empty())
248                    }
249                }
250            }
251        } else {
252            if use_stderr {
253                return stderr_misuse(input.span().unwrap_or(head), head);
254            }
255
256            let metadata = input.metadata();
257            let metadata_clone = metadata.clone();
258
259            if let PipelineData::ListStream(..) = input {
260                // Only use the iterator implementation on lists / list streams. We want to be able
261                // to preserve errors as much as possible, and only the stream implementations can
262                // really do that
263                let signals = engine_state.signals().clone();
264
265                Ok(tee(input.into_iter(), move |rx| {
266                    let input = rx.into_pipeline_data_with_metadata(span, signals, metadata_clone);
267                    eval_block(input)
268                })
269                .map_err(&from_io_error)?
270                .map(move |result| result.unwrap_or_else(|err| Value::error(err, closure_span)))
271                .into_pipeline_data_with_metadata(
272                    span,
273                    engine_state.signals().clone(),
274                    metadata,
275                ))
276            } else {
277                // Otherwise, we can spawn a thread with the input value, but we have nowhere to
278                // send an error to other than just trying to print it to stderr.
279                let value = input.into_value(span)?;
280                let value_clone = value.clone();
281                tee_once(stack.clone(), engine_state_arc, move || {
282                    eval_block(value_clone.into_pipeline_data_with_metadata(metadata_clone))
283                })
284                .map_err(&from_io_error)?;
285                Ok(value.into_pipeline_data_with_metadata(metadata))
286            }
287        }
288    }
289
290    fn pipe_redirection(&self) -> (Option<OutDest>, Option<OutDest>) {
291        (Some(OutDest::PipeSeparate), Some(OutDest::PipeSeparate))
292    }
293}
294
295fn panic_error() -> ShellError {
296    ShellError::NushellFailed {
297        msg: "A panic occurred on a thread spawned by `tee`".into(),
298    }
299}
300
301/// Copies the iterator to a channel on another thread. If an error is produced on that thread,
302/// it is embedded in the resulting iterator as an `Err` as soon as possible. When the iterator
303/// finishes, it waits for the other thread to finish, also handling any error produced at that
304/// point.
305fn tee<T>(
306    input: impl Iterator<Item = T>,
307    with_cloned_stream: impl FnOnce(mpsc::Receiver<T>) -> Result<(), ShellError> + Send + 'static,
308) -> Result<impl Iterator<Item = Result<T, ShellError>>, std::io::Error>
309where
310    T: Clone + Send + 'static,
311{
312    // For sending the values to the other thread
313    let (tx, rx) = mpsc::channel();
314
315    let mut thread = Some(
316        thread::Builder::new()
317            .name("tee".into())
318            .spawn(move || with_cloned_stream(rx))?,
319    );
320
321    let mut iter = input.into_iter();
322    let mut tx = Some(tx);
323
324    Ok(std::iter::from_fn(move || {
325        if thread.as_ref().is_some_and(|t| t.is_finished()) {
326            // Check for an error from the other thread
327            let result = thread
328                .take()
329                .expect("thread was taken early")
330                .join()
331                .unwrap_or_else(|_| Err(panic_error()));
332            if let Err(err) = result {
333                // Embed the error early
334                return Some(Err(err));
335            }
336        }
337
338        // Get a value from the iterator
339        if let Some(value) = iter.next() {
340            // Send a copy, ignoring any error if the channel is closed
341            let _ = tx.as_ref().map(|tx| tx.send(value.clone()));
342            Some(Ok(value))
343        } else {
344            // Close the channel so the stream ends for the other thread
345            drop(tx.take());
346            // Wait for the other thread, and embed any error produced
347            thread.take().and_then(|t| {
348                t.join()
349                    .unwrap_or_else(|_| Err(panic_error()))
350                    .err()
351                    .map(Err)
352            })
353        }
354    }))
355}
356
357/// "tee" for a single value. No stream handling, just spawns a thread, printing any resulting error
358fn tee_once(
359    stack: Stack,
360    engine_state: Arc<EngineState>,
361    on_thread: impl FnOnce() -> Result<(), ShellError> + Send + 'static,
362) -> Result<JoinHandle<()>, std::io::Error> {
363    thread::Builder::new().name("tee".into()).spawn(move || {
364        if let Err(err) = on_thread() {
365            report_shell_error(Some(&stack), &engine_state, &err);
366        }
367    })
368}
369
370fn stderr_misuse<T>(span: Span, head: Span) -> Result<T, ShellError> {
371    Err(ShellError::UnsupportedInput {
372        msg: "--stderr can only be used on external commands".into(),
373        input: "the input to `tee` is not an external command".into(),
374        msg_span: head,
375        input_span: span,
376    })
377}
378
379struct IoTee<R: Read> {
380    reader: R,
381    sender: Option<Sender<Vec<u8>>>,
382    thread: Option<JoinHandle<Result<(), ShellError>>>,
383}
384
385impl<R: Read> IoTee<R> {
386    fn new(reader: R, tee: TeeThread) -> Self {
387        Self {
388            reader,
389            sender: Some(tee.sender),
390            thread: Some(tee.thread),
391        }
392    }
393}
394
395impl<R: Read> Read for IoTee<R> {
396    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
397        if let Some(thread) = self.thread.take() {
398            if thread.is_finished() {
399                if let Err(err) = thread.join().unwrap_or_else(|_| Err(panic_error())) {
400                    return Err(io::Error::other(err));
401                }
402            } else {
403                self.thread = Some(thread)
404            }
405        }
406        let len = self.reader.read(buf)?;
407        if len == 0 {
408            self.sender = None;
409            if let Some(thread) = self.thread.take()
410                && let Err(err) = thread.join().unwrap_or_else(|_| Err(panic_error()))
411            {
412                return Err(io::Error::other(err));
413            }
414        } else if let Some(sender) = self.sender.as_mut()
415            && sender.send(buf[..len].to_vec()).is_err()
416        {
417            self.sender = None;
418        }
419        Ok(len)
420    }
421}
422
423struct TeeThread {
424    sender: Sender<Vec<u8>>,
425    thread: JoinHandle<Result<(), ShellError>>,
426}
427
428fn spawn_tee(
429    info: StreamInfo,
430    mut eval_block: impl FnMut(PipelineData) -> Result<(), ShellError> + Send + 'static,
431) -> Result<TeeThread, ShellError> {
432    let (sender, receiver) = mpsc::channel();
433
434    let thread = thread::Builder::new()
435        .name("tee".into())
436        .spawn(move || {
437            // We use Signals::empty() here because we assume there already is a Signals on the other side
438            let stream = ByteStream::from_iter(
439                receiver.into_iter(),
440                info.span,
441                Signals::empty(),
442                info.type_,
443            );
444            eval_block(PipelineData::byte_stream(stream, info.metadata))
445        })
446        .map_err(|err| {
447            IoError::new_with_additional_context(err, info.span, None, "Could not spawn tee")
448        })?;
449
450    Ok(TeeThread { sender, thread })
451}
452
453#[derive(Clone)]
454struct StreamInfo {
455    span: Span,
456    signals: Signals,
457    type_: ByteStreamType,
458    metadata: Option<PipelineMetadata>,
459}
460
461fn copy(src: impl Read, dest: impl Write, info: &StreamInfo) -> Result<(), ShellError> {
462    copy_with_signals(src, dest, info.span, &info.signals)?;
463    Ok(())
464}
465
466#[cfg(feature = "os")]
467fn copy_pipe(pipe: ChildPipe, dest: impl Write, info: &StreamInfo) -> Result<(), ShellError> {
468    match pipe {
469        ChildPipe::Pipe(pipe) => copy(pipe, dest, info),
470        ChildPipe::Tee(tee) => copy(tee, dest, info),
471    }
472}
473
474fn copy_on_thread(
475    src: impl Read + Send + 'static,
476    dest: impl Write + Send + 'static,
477    info: &StreamInfo,
478) -> Result<JoinHandle<Result<(), ShellError>>, ShellError> {
479    let span = info.span;
480    let signals = info.signals.clone();
481    thread::Builder::new()
482        .name("stderr copier".into())
483        .spawn(move || {
484            copy_with_signals(src, dest, span, &signals)?;
485            Ok(())
486        })
487        .map_err(|err| {
488            IoError::new_with_additional_context(err, span, None, "Could not spawn stderr copier")
489                .into()
490        })
491}
492
493#[cfg(feature = "os")]
494fn copy_pipe_on_thread(
495    pipe: ChildPipe,
496    dest: impl Write + Send + 'static,
497    info: &StreamInfo,
498) -> Result<JoinHandle<Result<(), ShellError>>, ShellError> {
499    match pipe {
500        ChildPipe::Pipe(pipe) => copy_on_thread(pipe, dest, info),
501        ChildPipe::Tee(tee) => copy_on_thread(tee, dest, info),
502    }
503}
504
505#[test]
506fn tee_copies_values_to_other_thread_and_passes_them_through() {
507    let (tx, rx) = mpsc::channel();
508
509    let expected_values = vec![1, 2, 3, 4];
510
511    let my_result = tee(expected_values.clone().into_iter(), move |rx| {
512        for val in rx {
513            let _ = tx.send(val);
514        }
515        Ok(())
516    })
517    .expect("io error")
518    .collect::<Result<Vec<i32>, ShellError>>()
519    .expect("should not produce error");
520
521    assert_eq!(expected_values, my_result);
522
523    let other_threads_result = rx.into_iter().collect::<Vec<_>>();
524
525    assert_eq!(expected_values, other_threads_result);
526}
527
528#[test]
529fn tee_forwards_errors_back_immediately() {
530    use std::time::Duration;
531    let slow_input = (0..100).inspect(|_| std::thread::sleep(Duration::from_millis(1)));
532    let iter = tee(slow_input, |_| {
533        Err(ShellError::Io(IoError::new_with_additional_context(
534            shell_error::io::ErrorKind::from_std(std::io::ErrorKind::Other),
535            Span::test_data(),
536            None,
537            "test",
538        )))
539    })
540    .expect("io error");
541    for result in iter {
542        if let Ok(val) = result {
543            // should not make it to the end
544            assert!(val < 99, "the error did not come early enough");
545        } else {
546            // got the error
547            return;
548        }
549    }
550    panic!("never received the error");
551}
552
553#[test]
554fn tee_waits_for_the_other_thread() {
555    use std::sync::{
556        Arc,
557        atomic::{AtomicBool, Ordering},
558    };
559    use std::time::Duration;
560    let waited = Arc::new(AtomicBool::new(false));
561    let waited_clone = waited.clone();
562    let iter = tee(0..100, move |_| {
563        std::thread::sleep(Duration::from_millis(10));
564        waited_clone.store(true, Ordering::Relaxed);
565        Err(ShellError::Io(IoError::new_with_additional_context(
566            shell_error::io::ErrorKind::from_std(std::io::ErrorKind::Other),
567            Span::test_data(),
568            None,
569            "test",
570        )))
571    })
572    .expect("io error");
573    let last = iter.last();
574    assert!(waited.load(Ordering::Relaxed), "failed to wait");
575    assert!(
576        last.is_some_and(|res| res.is_err()),
577        "failed to return error from wait"
578    );
579}