1use nu_engine::{command_prelude::*, get_eval_block_with_early_return};
2#[cfg(feature = "os")]
3use nu_protocol::process::ChildPipe;
4#[cfg(test)]
5use nu_protocol::shell_error;
6use nu_protocol::{
7 ByteStream, ByteStreamSource, OutDest, PipelineMetadata, Signals,
8 byte_stream::copy_with_signals, engine::Closure, report_shell_error, shell_error::io::IoError,
9};
10use std::{
11 io::{self, Read, Write},
12 sync::{
13 Arc,
14 mpsc::{self, Sender},
15 },
16 thread::{self, JoinHandle},
17};
18
19#[derive(Clone)]
20pub struct Tee;
21
22impl Command for Tee {
23 fn name(&self) -> &str {
24 "tee"
25 }
26
27 fn description(&self) -> &str {
28 "Copy a stream to another command in parallel."
29 }
30
31 fn extra_description(&self) -> &str {
32 "This is useful for doing something else with a stream while still continuing to
33use it in your pipeline."
34 }
35
36 fn signature(&self) -> Signature {
37 Signature::build("tee")
38 .input_output_type(Type::Any, Type::Any)
39 .switch(
40 "stderr",
41 "For external commands: copy the standard error stream instead.",
42 Some('e'),
43 )
44 .required(
45 "closure",
46 SyntaxShape::Closure(None),
47 "The other command to send the stream to.",
48 )
49 .category(Category::Filters)
50 }
51
52 fn examples(&self) -> Vec<Example<'_>> {
53 vec![
54 Example {
55 example: "http get http://example.org/ | tee { save example.html }",
56 description: "Save a webpage to a file while also printing it.",
57 result: None,
58 },
59 Example {
60 example: "nu -c 'print -e error; print ok' | tee --stderr { save error.log } | complete",
61 description: "Save error messages from an external command to a file without redirecting them.",
62 result: None,
63 },
64 Example {
65 example: "1..100 | tee { each { print } } | math sum | wrap sum",
66 description: "Print numbers and their sum.",
67 result: None,
68 },
69 Example {
70 example: "10000 | tee { 1..$in | print } | $in * 5",
71 description: "Do something with a value on another thread, while also passing through the value.",
72 result: Some(Value::test_int(50000)),
73 },
74 ]
75 }
76
77 fn run(
78 &self,
79 engine_state: &EngineState,
80 stack: &mut Stack,
81 call: &Call,
82 input: PipelineData,
83 ) -> Result<PipelineData, ShellError> {
84 let head = call.head;
85 let from_io_error = IoError::factory(head, None);
86 let use_stderr = call.has_flag(engine_state, stack, "stderr")?;
87
88 let closure: Spanned<Closure> = call.req(engine_state, stack, 0)?;
89 let closure_span = closure.span;
90 let closure = closure.item;
91
92 let engine_state_arc = Arc::new(engine_state.clone());
93
94 let mut eval_block = {
95 let closure_engine_state = engine_state_arc.clone();
96 let mut closure_stack = stack
97 .captures_to_stack_preserve_out_dest(closure.captures)
98 .reset_pipes();
99 let eval_block_with_early_return = get_eval_block_with_early_return(engine_state);
100
101 move |input| {
102 let result = eval_block_with_early_return(
103 &closure_engine_state,
104 &mut closure_stack,
105 closure_engine_state.get_block(closure.block_id),
106 input,
107 )
108 .map(|p| p.body);
109 result.and_then(|data| data.drain().map(|_| ()))
111 }
112 };
113
114 let span = input.span().unwrap_or(head);
118 let input = input.into_stream_or_original(engine_state);
119
120 if let PipelineData::ByteStream(stream, metadata) = input {
121 let type_ = stream.type_();
122
123 let info = StreamInfo {
124 span,
125 signals: engine_state.signals().clone(),
126 type_,
127 metadata: metadata.clone(),
128 };
129
130 match stream.into_source() {
131 ByteStreamSource::Read(read) => {
132 if use_stderr {
133 return stderr_misuse(span, head);
134 }
135
136 let tee_thread = spawn_tee(info, eval_block)?;
137 let tee = IoTee::new(read, tee_thread);
138
139 Ok(PipelineData::byte_stream(
140 ByteStream::read(tee, span, engine_state.signals().clone(), type_),
141 metadata,
142 ))
143 }
144 ByteStreamSource::File(file) => {
145 if use_stderr {
146 return stderr_misuse(span, head);
147 }
148
149 let tee_thread = spawn_tee(info, eval_block)?;
150 let tee = IoTee::new(file, tee_thread);
151
152 Ok(PipelineData::byte_stream(
153 ByteStream::read(tee, span, engine_state.signals().clone(), type_),
154 metadata,
155 ))
156 }
157 #[cfg(feature = "os")]
158 ByteStreamSource::Child(mut child) => {
159 let stderr_thread = if use_stderr {
160 let stderr_thread = if let Some(stderr) = child.stderr.take() {
161 let tee_thread = spawn_tee(info.clone(), eval_block)?;
162 let tee = IoTee::new(stderr, tee_thread);
163 match stack.stderr() {
164 OutDest::Pipe | OutDest::PipeSeparate | OutDest::Value => {
165 child.stderr = Some(ChildPipe::Tee(Box::new(tee)));
166 Ok(None)
167 }
168 OutDest::Null => copy_on_thread(tee, io::sink(), &info).map(Some),
169 OutDest::Print | OutDest::Inherit => {
170 copy_on_thread(tee, io::stderr(), &info).map(Some)
171 }
172 OutDest::File(file) => {
173 copy_on_thread(tee, file.clone(), &info).map(Some)
174 }
175 }?
176 } else {
177 None
178 };
179
180 if let Some(stdout) = child.stdout.take() {
181 match stack.stdout() {
182 OutDest::Pipe | OutDest::PipeSeparate | OutDest::Value => {
183 child.stdout = Some(stdout);
184 Ok(())
185 }
186 OutDest::Null => copy_pipe(stdout, io::sink(), &info),
187 OutDest::Print | OutDest::Inherit => {
188 copy_pipe(stdout, io::stdout(), &info)
189 }
190 OutDest::File(file) => copy_pipe(stdout, file.as_ref(), &info),
191 }?;
192 }
193
194 stderr_thread
195 } else {
196 let stderr_thread = if let Some(stderr) = child.stderr.take() {
197 let info = info.clone();
198 match stack.stderr() {
199 OutDest::Pipe | OutDest::PipeSeparate | OutDest::Value => {
200 child.stderr = Some(stderr);
201 Ok(None)
202 }
203 OutDest::Null => {
204 copy_pipe_on_thread(stderr, io::sink(), &info).map(Some)
205 }
206 OutDest::Print | OutDest::Inherit => {
207 copy_pipe_on_thread(stderr, io::stderr(), &info).map(Some)
208 }
209 OutDest::File(file) => {
210 copy_pipe_on_thread(stderr, file.clone(), &info).map(Some)
211 }
212 }?
213 } else {
214 None
215 };
216
217 if let Some(stdout) = child.stdout.take() {
218 let tee_thread = spawn_tee(info.clone(), eval_block)?;
219 let tee = IoTee::new(stdout, tee_thread);
220 match stack.stdout() {
221 OutDest::Pipe | OutDest::PipeSeparate | OutDest::Value => {
222 child.stdout = Some(ChildPipe::Tee(Box::new(tee)));
223 Ok(())
224 }
225 OutDest::Null => copy(tee, io::sink(), &info),
226 OutDest::Print | OutDest::Inherit => copy(tee, io::stdout(), &info),
227 OutDest::File(file) => copy(tee, file.as_ref(), &info),
228 }?;
229 }
230
231 stderr_thread
232 };
233
234 if child.stdout.is_some() || child.stderr.is_some() {
235 Ok(PipelineData::byte_stream(
236 ByteStream::child(*child, span),
237 metadata,
238 ))
239 } else {
240 if let Some(thread) = stderr_thread {
241 thread.join().unwrap_or_else(|_| Err(panic_error()))?;
242 }
243 child.wait()?;
244 Ok(PipelineData::empty())
245 }
246 }
247 }
248 } else {
249 if use_stderr {
250 return stderr_misuse(input.span().unwrap_or(head), head);
251 }
252
253 let mut input = input;
254 let metadata = input.take_metadata();
255 let metadata_clone = metadata.clone();
256
257 if let PipelineData::ListStream(..) = input {
258 let signals = engine_state.signals().clone();
262
263 Ok(tee(input.into_iter(), move |rx| {
264 let input = rx.into_pipeline_data_with_metadata(span, signals, metadata_clone);
265 eval_block(input)
266 })
267 .map_err(&from_io_error)?
268 .map(move |result| result.unwrap_or_else(|err| Value::error(err, closure_span)))
269 .into_pipeline_data_with_metadata(
270 span,
271 engine_state.signals().clone(),
272 metadata,
273 ))
274 } else {
275 let value = input.into_value(span)?;
278 let value_clone = value.clone();
279 tee_once(stack.clone(), engine_state_arc, move || {
280 eval_block(value_clone.into_pipeline_data_with_metadata(metadata_clone))
281 })
282 .map_err(&from_io_error)?;
283 Ok(value.into_pipeline_data_with_metadata(metadata))
284 }
285 }
286 }
287
288 fn pipe_redirection(&self) -> (Option<OutDest>, Option<OutDest>) {
289 (Some(OutDest::PipeSeparate), Some(OutDest::PipeSeparate))
290 }
291}
292
293fn panic_error() -> ShellError {
294 ShellError::NushellFailed {
295 msg: "A panic occurred on a thread spawned by `tee`".into(),
296 }
297}
298
299fn tee<T, I: Iterator<Item = T>>(
304 input: I,
305 with_cloned_stream: impl FnOnce(mpsc::Receiver<T>) -> Result<(), ShellError> + Send + 'static,
306) -> Result<TeeIterator<I>, std::io::Error>
307where
308 T: Clone + Send + 'static,
309{
310 let (tx, rx) = mpsc::channel();
312
313 Ok(TeeIterator {
314 thread: Some(
315 thread::Builder::new()
316 .name("tee".into())
317 .spawn(move || with_cloned_stream(rx))?,
318 ),
319 iter: input.into_iter(),
320 tx: Some(tx),
321 })
322}
323
324struct TeeIterator<I: Iterator> {
325 thread: Option<JoinHandle<Result<(), ShellError>>>,
326 iter: I,
327 tx: Option<Sender<I::Item>>,
328}
329
330impl<I> Iterator for TeeIterator<I>
331where
332 I: Iterator,
333 I::Item: Clone,
334{
335 type Item = Result<I::Item, ShellError>;
336
337 fn next(&mut self) -> Option<Self::Item> {
338 if self.thread.as_ref().is_some_and(|t| t.is_finished()) {
339 let result = self
341 .thread
342 .take()
343 .expect("thread was taken early")
344 .join()
345 .unwrap_or_else(|_| Err(panic_error()));
346 if let Err(err) = result {
347 return Some(Err(err));
349 }
350 }
351
352 if let Some(value) = self.iter.next() {
354 let _ = self.tx.as_ref().map(|tx| tx.send(value.clone()));
356 Some(Ok(value))
357 } else {
358 drop(self.tx.take());
360 self.thread.take().and_then(|t| {
362 t.join()
363 .unwrap_or_else(|_| Err(panic_error()))
364 .err()
365 .map(Err)
366 })
367 }
368 }
369}
370
371impl<I: Iterator> Drop for TeeIterator<I> {
372 fn drop(&mut self) {
373 if let Some(tx) = &mut self.tx {
377 for value in &mut self.iter {
378 if tx.send(value).is_err() {
380 break;
381 }
382 }
383 }
384 }
385}
386
387fn tee_once(
389 stack: Stack,
390 engine_state: Arc<EngineState>,
391 on_thread: impl FnOnce() -> Result<(), ShellError> + Send + 'static,
392) -> Result<JoinHandle<()>, std::io::Error> {
393 thread::Builder::new().name("tee".into()).spawn(move || {
394 if let Err(err) = on_thread() {
395 report_shell_error(Some(&stack), &engine_state, &err);
396 }
397 })
398}
399
400fn stderr_misuse<T>(span: Span, head: Span) -> Result<T, ShellError> {
401 Err(ShellError::UnsupportedInput {
402 msg: "--stderr can only be used on external commands".into(),
403 input: "the input to `tee` is not an external command".into(),
404 msg_span: head,
405 input_span: span,
406 })
407}
408
409struct IoTee<R: Read> {
410 reader: R,
411 sender: Option<Sender<Vec<u8>>>,
412 thread: Option<JoinHandle<Result<(), ShellError>>>,
413}
414
415impl<R: Read> IoTee<R> {
416 fn new(reader: R, tee: TeeThread) -> Self {
417 Self {
418 reader,
419 sender: Some(tee.sender),
420 thread: Some(tee.thread),
421 }
422 }
423}
424
425impl<R: Read> Read for IoTee<R> {
426 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
427 if let Some(thread) = self.thread.take() {
428 if thread.is_finished() {
429 if let Err(err) = thread.join().unwrap_or_else(|_| Err(panic_error())) {
430 return Err(io::Error::other(err));
431 }
432 } else {
433 self.thread = Some(thread)
434 }
435 }
436 let len = self.reader.read(buf)?;
437 if len == 0 {
438 self.sender = None;
439 if let Some(thread) = self.thread.take()
440 && let Err(err) = thread.join().unwrap_or_else(|_| Err(panic_error()))
441 {
442 return Err(io::Error::other(err));
443 }
444 } else if let Some(sender) = self.sender.as_mut()
445 && sender.send(buf[..len].to_vec()).is_err()
446 {
447 self.sender = None;
448 }
449 Ok(len)
450 }
451}
452
453struct TeeThread {
454 sender: Sender<Vec<u8>>,
455 thread: JoinHandle<Result<(), ShellError>>,
456}
457
458fn spawn_tee(
459 info: StreamInfo,
460 mut eval_block: impl FnMut(PipelineData) -> Result<(), ShellError> + Send + 'static,
461) -> Result<TeeThread, ShellError> {
462 let (sender, receiver) = mpsc::channel();
463
464 let thread = thread::Builder::new()
465 .name("tee".into())
466 .spawn(move || {
467 let stream = ByteStream::from_iter(
469 receiver.into_iter(),
470 info.span,
471 Signals::empty(),
472 info.type_,
473 );
474 eval_block(PipelineData::byte_stream(stream, info.metadata))
475 })
476 .map_err(|err| {
477 IoError::new_with_additional_context(err, info.span, None, "Could not spawn tee")
478 })?;
479
480 Ok(TeeThread { sender, thread })
481}
482
483#[derive(Clone)]
484struct StreamInfo {
485 span: Span,
486 signals: Signals,
487 type_: ByteStreamType,
488 metadata: Option<PipelineMetadata>,
489}
490
491fn copy(src: impl Read, dest: impl Write, info: &StreamInfo) -> Result<(), ShellError> {
492 copy_with_signals(src, dest, info.span, &info.signals)?;
493 Ok(())
494}
495
496#[cfg(feature = "os")]
497fn copy_pipe(pipe: ChildPipe, dest: impl Write, info: &StreamInfo) -> Result<(), ShellError> {
498 match pipe {
499 ChildPipe::Pipe(pipe) => copy(pipe, dest, info),
500 ChildPipe::Tee(tee) => copy(tee, dest, info),
501 }
502}
503
504fn copy_on_thread(
505 src: impl Read + Send + 'static,
506 dest: impl Write + Send + 'static,
507 info: &StreamInfo,
508) -> Result<JoinHandle<Result<(), ShellError>>, ShellError> {
509 let span = info.span;
510 let signals = info.signals.clone();
511 thread::Builder::new()
512 .name("stderr copier".into())
513 .spawn(move || {
514 copy_with_signals(src, dest, span, &signals)?;
515 Ok(())
516 })
517 .map_err(|err| {
518 IoError::new_with_additional_context(err, span, None, "Could not spawn stderr copier")
519 .into()
520 })
521}
522
523#[cfg(feature = "os")]
524fn copy_pipe_on_thread(
525 pipe: ChildPipe,
526 dest: impl Write + Send + 'static,
527 info: &StreamInfo,
528) -> Result<JoinHandle<Result<(), ShellError>>, ShellError> {
529 match pipe {
530 ChildPipe::Pipe(pipe) => copy_on_thread(pipe, dest, info),
531 ChildPipe::Tee(tee) => copy_on_thread(tee, dest, info),
532 }
533}
534
535#[test]
536fn tee_copies_values_to_other_thread_and_passes_them_through() {
537 let (tx, rx) = mpsc::channel();
538
539 let expected_values = vec![1, 2, 3, 4];
540
541 let my_result = tee(expected_values.clone().into_iter(), move |rx| {
542 for val in rx {
543 let _ = tx.send(val);
544 }
545 Ok(())
546 })
547 .expect("io error")
548 .collect::<Result<Vec<i32>, ShellError>>()
549 .expect("should not produce error");
550
551 assert_eq!(expected_values, my_result);
552
553 let other_threads_result = rx.into_iter().collect::<Vec<_>>();
554
555 assert_eq!(expected_values, other_threads_result);
556}
557
558#[test]
559fn tee_forwards_errors_back_immediately() {
560 use std::time::Duration;
561 let slow_input = (0..100).inspect(|_| std::thread::sleep(Duration::from_millis(1)));
562 let iter = tee(slow_input, |_| {
563 Err(ShellError::Io(IoError::new_with_additional_context(
564 shell_error::io::ErrorKind::from_std(std::io::ErrorKind::Other),
565 Span::test_data(),
566 None,
567 "test",
568 )))
569 })
570 .expect("io error");
571 for result in iter {
572 if let Ok(val) = result {
573 assert!(val < 99, "the error did not come early enough");
575 } else {
576 return;
578 }
579 }
580 panic!("never received the error");
581}
582
583#[test]
584fn tee_waits_for_the_other_thread() {
585 use std::sync::{
586 Arc,
587 atomic::{AtomicBool, Ordering},
588 };
589 use std::time::Duration;
590 let waited = Arc::new(AtomicBool::new(false));
591 let waited_clone = waited.clone();
592 let iter = tee(0..100, move |_| {
593 std::thread::sleep(Duration::from_millis(10));
594 waited_clone.store(true, Ordering::Relaxed);
595 Err(ShellError::Io(IoError::new_with_additional_context(
596 shell_error::io::ErrorKind::from_std(std::io::ErrorKind::Other),
597 Span::test_data(),
598 None,
599 "test",
600 )))
601 })
602 .expect("io error");
603 let last = iter.last();
604 assert!(waited.load(Ordering::Relaxed), "failed to wait");
605 assert!(
606 last.is_some_and(|res| res.is_err()),
607 "failed to return error from wait"
608 );
609}
610
611#[test]
613fn tee_output_is_ignored_but_other_thread_get_values() {
614 let (tx, rx) = mpsc::channel();
615
616 let input = 0u32..100u32;
617 let expected_values: Vec<_> = input.clone().collect();
618
619 let my_result = tee(input, move |rx| {
620 for val in rx {
621 let _ = tx.send(val);
622 }
623 Ok(())
624 })
625 .expect("io error")
626 .take(2)
628 .collect::<Result<Vec<_>, ShellError>>()
629 .expect("should not produce error");
630
631 assert_eq!(expected_values[0..my_result.len()], my_result);
633
634 let other_threads_result = rx.into_iter().collect::<Vec<_>>();
635
636 assert_eq!(expected_values, other_threads_result);
639}
640
641#[test]
642fn tee_other_thread_ignore_values_but_output_all_others() {
643 let (tx, rx) = mpsc::channel();
644
645 let input = 0u32..100u32;
646 let expected_values: Vec<_> = input.clone().collect();
647
648 let my_result = tee(input, move |rx| {
649 for val in rx {
650 let _ = tx.send(val);
651 }
652 Ok(())
653 })
654 .expect("io error")
655 .collect::<Result<Vec<_>, ShellError>>()
656 .expect("should not produce error");
657
658 assert_eq!(expected_values, my_result);
660
661 let other_threads_result = rx.into_iter().take(2).collect::<Vec<_>>();
663 assert_eq!(expected_values[0..2], other_threads_result);
664}