Skip to main content

nu_command/filters/
tee.rs

1use nu_engine::{command_prelude::*, get_eval_block_with_early_return};
2#[cfg(feature = "os")]
3use nu_protocol::process::ChildPipe;
4#[cfg(test)]
5use nu_protocol::shell_error;
6use nu_protocol::{
7    ByteStream, ByteStreamSource, OutDest, PipelineMetadata, Signals,
8    byte_stream::copy_with_signals, engine::Closure, report_shell_error, shell_error::io::IoError,
9};
10use std::{
11    io::{self, Read, Write},
12    sync::{
13        Arc,
14        mpsc::{self, Sender},
15    },
16    thread::{self, JoinHandle},
17};
18
19#[derive(Clone)]
20pub struct Tee;
21
22impl Command for Tee {
23    fn name(&self) -> &str {
24        "tee"
25    }
26
27    fn description(&self) -> &str {
28        "Copy a stream to another command in parallel."
29    }
30
31    fn extra_description(&self) -> &str {
32        "This is useful for doing something else with a stream while still continuing to
33use it in your pipeline."
34    }
35
36    fn signature(&self) -> Signature {
37        Signature::build("tee")
38            .input_output_type(Type::Any, Type::Any)
39            .switch(
40                "stderr",
41                "For external commands: copy the standard error stream instead.",
42                Some('e'),
43            )
44            .required(
45                "closure",
46                SyntaxShape::Closure(None),
47                "The other command to send the stream to.",
48            )
49            .category(Category::Filters)
50    }
51
52    fn examples(&self) -> Vec<Example<'_>> {
53        vec![
54            Example {
55                example: "http get http://example.org/ | tee { save example.html }",
56                description: "Save a webpage to a file while also printing it.",
57                result: None,
58            },
59            Example {
60                example: "nu -c 'print -e error; print ok' | tee --stderr { save error.log } | complete",
61                description: "Save error messages from an external command to a file without redirecting them.",
62                result: None,
63            },
64            Example {
65                example: "1..100 | tee { each { print } } | math sum | wrap sum",
66                description: "Print numbers and their sum.",
67                result: None,
68            },
69            Example {
70                example: "10000 | tee { 1..$in | print } | $in * 5",
71                description: "Do something with a value on another thread, while also passing through the value.",
72                result: Some(Value::test_int(50000)),
73            },
74        ]
75    }
76
77    fn run(
78        &self,
79        engine_state: &EngineState,
80        stack: &mut Stack,
81        call: &Call,
82        input: PipelineData,
83    ) -> Result<PipelineData, ShellError> {
84        let head = call.head;
85        let from_io_error = IoError::factory(head, None);
86        let use_stderr = call.has_flag(engine_state, stack, "stderr")?;
87
88        let closure: Spanned<Closure> = call.req(engine_state, stack, 0)?;
89        let closure_span = closure.span;
90        let closure = closure.item;
91
92        let engine_state_arc = Arc::new(engine_state.clone());
93
94        let mut eval_block = {
95            let closure_engine_state = engine_state_arc.clone();
96            let mut closure_stack = stack
97                .captures_to_stack_preserve_out_dest(closure.captures)
98                .reset_pipes();
99            let eval_block_with_early_return = get_eval_block_with_early_return(engine_state);
100
101            move |input| {
102                let result = eval_block_with_early_return(
103                    &closure_engine_state,
104                    &mut closure_stack,
105                    closure_engine_state.get_block(closure.block_id),
106                    input,
107                )
108                .map(|p| p.body);
109                // Make sure to drain any iterator produced to avoid unexpected behavior
110                result.and_then(|data| data.drain().map(|_| ()))
111            }
112        };
113
114        // Convert values that can be represented as streams into streams. Streams can pass errors
115        // through later, so if we treat string/binary/list as a stream instead, it's likely that
116        // we can get the error back to the original thread.
117        let span = input.span().unwrap_or(head);
118        let input = input.into_stream_or_original(engine_state);
119
120        if let PipelineData::ByteStream(stream, metadata) = input {
121            let type_ = stream.type_();
122
123            let info = StreamInfo {
124                span,
125                signals: engine_state.signals().clone(),
126                type_,
127                metadata: metadata.clone(),
128            };
129
130            match stream.into_source() {
131                ByteStreamSource::Read(read) => {
132                    if use_stderr {
133                        return stderr_misuse(span, head);
134                    }
135
136                    let tee_thread = spawn_tee(info, eval_block)?;
137                    let tee = IoTee::new(read, tee_thread);
138
139                    Ok(PipelineData::byte_stream(
140                        ByteStream::read(tee, span, engine_state.signals().clone(), type_),
141                        metadata,
142                    ))
143                }
144                ByteStreamSource::File(file) => {
145                    if use_stderr {
146                        return stderr_misuse(span, head);
147                    }
148
149                    let tee_thread = spawn_tee(info, eval_block)?;
150                    let tee = IoTee::new(file, tee_thread);
151
152                    Ok(PipelineData::byte_stream(
153                        ByteStream::read(tee, span, engine_state.signals().clone(), type_),
154                        metadata,
155                    ))
156                }
157                #[cfg(feature = "os")]
158                ByteStreamSource::Child(mut child) => {
159                    let stderr_thread = if use_stderr {
160                        let stderr_thread = if let Some(stderr) = child.stderr.take() {
161                            let tee_thread = spawn_tee(info.clone(), eval_block)?;
162                            let tee = IoTee::new(stderr, tee_thread);
163                            match stack.stderr() {
164                                OutDest::Pipe | OutDest::PipeSeparate | OutDest::Value => {
165                                    child.stderr = Some(ChildPipe::Tee(Box::new(tee)));
166                                    Ok(None)
167                                }
168                                OutDest::Null => copy_on_thread(tee, io::sink(), &info).map(Some),
169                                OutDest::Print | OutDest::Inherit => {
170                                    copy_on_thread(tee, io::stderr(), &info).map(Some)
171                                }
172                                OutDest::File(file) => {
173                                    copy_on_thread(tee, file.clone(), &info).map(Some)
174                                }
175                            }?
176                        } else {
177                            None
178                        };
179
180                        if let Some(stdout) = child.stdout.take() {
181                            match stack.stdout() {
182                                OutDest::Pipe | OutDest::PipeSeparate | OutDest::Value => {
183                                    child.stdout = Some(stdout);
184                                    Ok(())
185                                }
186                                OutDest::Null => copy_pipe(stdout, io::sink(), &info),
187                                OutDest::Print | OutDest::Inherit => {
188                                    copy_pipe(stdout, io::stdout(), &info)
189                                }
190                                OutDest::File(file) => copy_pipe(stdout, file.as_ref(), &info),
191                            }?;
192                        }
193
194                        stderr_thread
195                    } else {
196                        let stderr_thread = if let Some(stderr) = child.stderr.take() {
197                            let info = info.clone();
198                            match stack.stderr() {
199                                OutDest::Pipe | OutDest::PipeSeparate | OutDest::Value => {
200                                    child.stderr = Some(stderr);
201                                    Ok(None)
202                                }
203                                OutDest::Null => {
204                                    copy_pipe_on_thread(stderr, io::sink(), &info).map(Some)
205                                }
206                                OutDest::Print | OutDest::Inherit => {
207                                    copy_pipe_on_thread(stderr, io::stderr(), &info).map(Some)
208                                }
209                                OutDest::File(file) => {
210                                    copy_pipe_on_thread(stderr, file.clone(), &info).map(Some)
211                                }
212                            }?
213                        } else {
214                            None
215                        };
216
217                        if let Some(stdout) = child.stdout.take() {
218                            let tee_thread = spawn_tee(info.clone(), eval_block)?;
219                            let tee = IoTee::new(stdout, tee_thread);
220                            match stack.stdout() {
221                                OutDest::Pipe | OutDest::PipeSeparate | OutDest::Value => {
222                                    child.stdout = Some(ChildPipe::Tee(Box::new(tee)));
223                                    Ok(())
224                                }
225                                OutDest::Null => copy(tee, io::sink(), &info),
226                                OutDest::Print | OutDest::Inherit => copy(tee, io::stdout(), &info),
227                                OutDest::File(file) => copy(tee, file.as_ref(), &info),
228                            }?;
229                        }
230
231                        stderr_thread
232                    };
233
234                    if child.stdout.is_some() || child.stderr.is_some() {
235                        Ok(PipelineData::byte_stream(
236                            ByteStream::child(*child, span),
237                            metadata,
238                        ))
239                    } else {
240                        if let Some(thread) = stderr_thread {
241                            thread.join().unwrap_or_else(|_| Err(panic_error()))?;
242                        }
243                        child.wait()?;
244                        Ok(PipelineData::empty())
245                    }
246                }
247            }
248        } else {
249            if use_stderr {
250                return stderr_misuse(input.span().unwrap_or(head), head);
251            }
252
253            let mut input = input;
254            let metadata = input.take_metadata();
255            let metadata_clone = metadata.clone();
256
257            if let PipelineData::ListStream(..) = input {
258                // Only use the iterator implementation on lists / list streams. We want to be able
259                // to preserve errors as much as possible, and only the stream implementations can
260                // really do that
261                let signals = engine_state.signals().clone();
262
263                Ok(tee(input.into_iter(), move |rx| {
264                    let input = rx.into_pipeline_data_with_metadata(span, signals, metadata_clone);
265                    eval_block(input)
266                })
267                .map_err(&from_io_error)?
268                .map(move |result| result.unwrap_or_else(|err| Value::error(err, closure_span)))
269                .into_pipeline_data_with_metadata(
270                    span,
271                    engine_state.signals().clone(),
272                    metadata,
273                ))
274            } else {
275                // Otherwise, we can spawn a thread with the input value, but we have nowhere to
276                // send an error to other than just trying to print it to stderr.
277                let value = input.into_value(span)?;
278                let value_clone = value.clone();
279                tee_once(stack.clone(), engine_state_arc, move || {
280                    eval_block(value_clone.into_pipeline_data_with_metadata(metadata_clone))
281                })
282                .map_err(&from_io_error)?;
283                Ok(value.into_pipeline_data_with_metadata(metadata))
284            }
285        }
286    }
287
288    fn pipe_redirection(&self) -> (Option<OutDest>, Option<OutDest>) {
289        (Some(OutDest::PipeSeparate), Some(OutDest::PipeSeparate))
290    }
291}
292
293fn panic_error() -> ShellError {
294    ShellError::NushellFailed {
295        msg: "A panic occurred on a thread spawned by `tee`".into(),
296    }
297}
298
299/// Copies the iterator to a channel on another thread. If an error is produced on that thread,
300/// it is embedded in the resulting iterator as an `Err` as soon as possible. When the iterator
301/// finishes, it waits for the other thread to finish, also handling any error produced at that
302/// point.
303fn tee<T, I: Iterator<Item = T>>(
304    input: I,
305    with_cloned_stream: impl FnOnce(mpsc::Receiver<T>) -> Result<(), ShellError> + Send + 'static,
306) -> Result<TeeIterator<I>, std::io::Error>
307where
308    T: Clone + Send + 'static,
309{
310    // For sending the values to the other thread
311    let (tx, rx) = mpsc::channel();
312
313    Ok(TeeIterator {
314        thread: Some(
315            thread::Builder::new()
316                .name("tee".into())
317                .spawn(move || with_cloned_stream(rx))?,
318        ),
319        iter: input.into_iter(),
320        tx: Some(tx),
321    })
322}
323
324struct TeeIterator<I: Iterator> {
325    thread: Option<JoinHandle<Result<(), ShellError>>>,
326    iter: I,
327    tx: Option<Sender<I::Item>>,
328}
329
330impl<I> Iterator for TeeIterator<I>
331where
332    I: Iterator,
333    I::Item: Clone,
334{
335    type Item = Result<I::Item, ShellError>;
336
337    fn next(&mut self) -> Option<Self::Item> {
338        if self.thread.as_ref().is_some_and(|t| t.is_finished()) {
339            // Check for an error from the other thread
340            let result = self
341                .thread
342                .take()
343                .expect("thread was taken early")
344                .join()
345                .unwrap_or_else(|_| Err(panic_error()));
346            if let Err(err) = result {
347                // Embed the error early
348                return Some(Err(err));
349            }
350        }
351
352        // Get a value from the iterator
353        if let Some(value) = self.iter.next() {
354            // Send a copy, ignoring any error if the channel is closed
355            let _ = self.tx.as_ref().map(|tx| tx.send(value.clone()));
356            Some(Ok(value))
357        } else {
358            // Close the channel so the stream ends for the other thread
359            drop(self.tx.take());
360            // Wait for the other thread, and embed any error produced
361            self.thread.take().and_then(|t| {
362                t.join()
363                    .unwrap_or_else(|_| Err(panic_error()))
364                    .err()
365                    .map(Err)
366            })
367        }
368    }
369}
370
371impl<I: Iterator> Drop for TeeIterator<I> {
372    fn drop(&mut self) {
373        // in case the iterator is dropped without consuming all the input,
374        // and the channel is still alive we need to force consume all the input
375        // otherwise the data will be truncated
376        if let Some(tx) = &mut self.tx {
377            for value in &mut self.iter {
378                // Send the input, if the channel is closed, just stop
379                if tx.send(value).is_err() {
380                    break;
381                }
382            }
383        }
384    }
385}
386
387/// "tee" for a single value. No stream handling, just spawns a thread, printing any resulting error
388fn tee_once(
389    stack: Stack,
390    engine_state: Arc<EngineState>,
391    on_thread: impl FnOnce() -> Result<(), ShellError> + Send + 'static,
392) -> Result<JoinHandle<()>, std::io::Error> {
393    thread::Builder::new().name("tee".into()).spawn(move || {
394        if let Err(err) = on_thread() {
395            report_shell_error(Some(&stack), &engine_state, &err);
396        }
397    })
398}
399
400fn stderr_misuse<T>(span: Span, head: Span) -> Result<T, ShellError> {
401    Err(ShellError::UnsupportedInput {
402        msg: "--stderr can only be used on external commands".into(),
403        input: "the input to `tee` is not an external command".into(),
404        msg_span: head,
405        input_span: span,
406    })
407}
408
409struct IoTee<R: Read> {
410    reader: R,
411    sender: Option<Sender<Vec<u8>>>,
412    thread: Option<JoinHandle<Result<(), ShellError>>>,
413}
414
415impl<R: Read> IoTee<R> {
416    fn new(reader: R, tee: TeeThread) -> Self {
417        Self {
418            reader,
419            sender: Some(tee.sender),
420            thread: Some(tee.thread),
421        }
422    }
423}
424
425impl<R: Read> Read for IoTee<R> {
426    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
427        if let Some(thread) = self.thread.take() {
428            if thread.is_finished() {
429                if let Err(err) = thread.join().unwrap_or_else(|_| Err(panic_error())) {
430                    return Err(io::Error::other(err));
431                }
432            } else {
433                self.thread = Some(thread)
434            }
435        }
436        let len = self.reader.read(buf)?;
437        if len == 0 {
438            self.sender = None;
439            if let Some(thread) = self.thread.take()
440                && let Err(err) = thread.join().unwrap_or_else(|_| Err(panic_error()))
441            {
442                return Err(io::Error::other(err));
443            }
444        } else if let Some(sender) = self.sender.as_mut()
445            && sender.send(buf[..len].to_vec()).is_err()
446        {
447            self.sender = None;
448        }
449        Ok(len)
450    }
451}
452
453struct TeeThread {
454    sender: Sender<Vec<u8>>,
455    thread: JoinHandle<Result<(), ShellError>>,
456}
457
458fn spawn_tee(
459    info: StreamInfo,
460    mut eval_block: impl FnMut(PipelineData) -> Result<(), ShellError> + Send + 'static,
461) -> Result<TeeThread, ShellError> {
462    let (sender, receiver) = mpsc::channel();
463
464    let thread = thread::Builder::new()
465        .name("tee".into())
466        .spawn(move || {
467            // We use Signals::empty() here because we assume there already is a Signals on the other side
468            let stream = ByteStream::from_iter(
469                receiver.into_iter(),
470                info.span,
471                Signals::empty(),
472                info.type_,
473            );
474            eval_block(PipelineData::byte_stream(stream, info.metadata))
475        })
476        .map_err(|err| {
477            IoError::new_with_additional_context(err, info.span, None, "Could not spawn tee")
478        })?;
479
480    Ok(TeeThread { sender, thread })
481}
482
483#[derive(Clone)]
484struct StreamInfo {
485    span: Span,
486    signals: Signals,
487    type_: ByteStreamType,
488    metadata: Option<PipelineMetadata>,
489}
490
491fn copy(src: impl Read, dest: impl Write, info: &StreamInfo) -> Result<(), ShellError> {
492    copy_with_signals(src, dest, info.span, &info.signals)?;
493    Ok(())
494}
495
496#[cfg(feature = "os")]
497fn copy_pipe(pipe: ChildPipe, dest: impl Write, info: &StreamInfo) -> Result<(), ShellError> {
498    match pipe {
499        ChildPipe::Pipe(pipe) => copy(pipe, dest, info),
500        ChildPipe::Tee(tee) => copy(tee, dest, info),
501    }
502}
503
504fn copy_on_thread(
505    src: impl Read + Send + 'static,
506    dest: impl Write + Send + 'static,
507    info: &StreamInfo,
508) -> Result<JoinHandle<Result<(), ShellError>>, ShellError> {
509    let span = info.span;
510    let signals = info.signals.clone();
511    thread::Builder::new()
512        .name("stderr copier".into())
513        .spawn(move || {
514            copy_with_signals(src, dest, span, &signals)?;
515            Ok(())
516        })
517        .map_err(|err| {
518            IoError::new_with_additional_context(err, span, None, "Could not spawn stderr copier")
519                .into()
520        })
521}
522
523#[cfg(feature = "os")]
524fn copy_pipe_on_thread(
525    pipe: ChildPipe,
526    dest: impl Write + Send + 'static,
527    info: &StreamInfo,
528) -> Result<JoinHandle<Result<(), ShellError>>, ShellError> {
529    match pipe {
530        ChildPipe::Pipe(pipe) => copy_on_thread(pipe, dest, info),
531        ChildPipe::Tee(tee) => copy_on_thread(tee, dest, info),
532    }
533}
534
535#[test]
536fn tee_copies_values_to_other_thread_and_passes_them_through() {
537    let (tx, rx) = mpsc::channel();
538
539    let expected_values = vec![1, 2, 3, 4];
540
541    let my_result = tee(expected_values.clone().into_iter(), move |rx| {
542        for val in rx {
543            let _ = tx.send(val);
544        }
545        Ok(())
546    })
547    .expect("io error")
548    .collect::<Result<Vec<i32>, ShellError>>()
549    .expect("should not produce error");
550
551    assert_eq!(expected_values, my_result);
552
553    let other_threads_result = rx.into_iter().collect::<Vec<_>>();
554
555    assert_eq!(expected_values, other_threads_result);
556}
557
558#[test]
559fn tee_forwards_errors_back_immediately() {
560    use std::time::Duration;
561    let slow_input = (0..100).inspect(|_| std::thread::sleep(Duration::from_millis(1)));
562    let iter = tee(slow_input, |_| {
563        Err(ShellError::Io(IoError::new_with_additional_context(
564            shell_error::io::ErrorKind::from_std(std::io::ErrorKind::Other),
565            Span::test_data(),
566            None,
567            "test",
568        )))
569    })
570    .expect("io error");
571    for result in iter {
572        if let Ok(val) = result {
573            // should not make it to the end
574            assert!(val < 99, "the error did not come early enough");
575        } else {
576            // got the error
577            return;
578        }
579    }
580    panic!("never received the error");
581}
582
583#[test]
584fn tee_waits_for_the_other_thread() {
585    use std::sync::{
586        Arc,
587        atomic::{AtomicBool, Ordering},
588    };
589    use std::time::Duration;
590    let waited = Arc::new(AtomicBool::new(false));
591    let waited_clone = waited.clone();
592    let iter = tee(0..100, move |_| {
593        std::thread::sleep(Duration::from_millis(10));
594        waited_clone.store(true, Ordering::Relaxed);
595        Err(ShellError::Io(IoError::new_with_additional_context(
596            shell_error::io::ErrorKind::from_std(std::io::ErrorKind::Other),
597            Span::test_data(),
598            None,
599            "test",
600        )))
601    })
602    .expect("io error");
603    let last = iter.last();
604    assert!(waited.load(Ordering::Relaxed), "failed to wait");
605    assert!(
606        last.is_some_and(|res| res.is_err()),
607        "failed to return error from wait"
608    );
609}
610
611// avoid regression for bug https://github.com/nushell/nushell/issues/17792
612#[test]
613fn tee_output_is_ignored_but_other_thread_get_values() {
614    let (tx, rx) = mpsc::channel();
615
616    let input = 0u32..100u32;
617    let expected_values: Vec<_> = input.clone().collect();
618
619    let my_result = tee(input, move |rx| {
620        for val in rx {
621            let _ = tx.send(val);
622        }
623        Ok(())
624    })
625    .expect("io error")
626    // don't take all values, just the first 2
627    .take(2)
628    .collect::<Result<Vec<_>, ShellError>>()
629    .expect("should not produce error");
630
631    // tee was only able to output 2 entries, the others where ignored by us
632    assert_eq!(expected_values[0..my_result.len()], my_result);
633
634    let other_threads_result = rx.into_iter().collect::<Vec<_>>();
635
636    // although tee only was able to output 2 values, the inner closure should
637    // receive all the values from the input
638    assert_eq!(expected_values, other_threads_result);
639}
640
641#[test]
642fn tee_other_thread_ignore_values_but_output_all_others() {
643    let (tx, rx) = mpsc::channel();
644
645    let input = 0u32..100u32;
646    let expected_values: Vec<_> = input.clone().collect();
647
648    let my_result = tee(input, move |rx| {
649        for val in rx {
650            let _ = tx.send(val);
651        }
652        Ok(())
653    })
654    .expect("io error")
655    .collect::<Result<Vec<_>, ShellError>>()
656    .expect("should not produce error");
657
658    // tee is expect to output all values
659    assert_eq!(expected_values, my_result);
660
661    // the other thread will consume only two values
662    let other_threads_result = rx.into_iter().take(2).collect::<Vec<_>>();
663    assert_eq!(expected_values[0..2], other_threads_result);
664}