1use nu_engine::{command_prelude::*, get_eval_block_with_early_return};
2#[cfg(feature = "os")]
3use nu_protocol::process::ChildPipe;
4#[cfg(test)]
5use nu_protocol::shell_error;
6use nu_protocol::{
7 ByteStream, ByteStreamSource, OutDest, PipelineMetadata, Signals,
8 byte_stream::copy_with_signals, engine::Closure, report_shell_error, shell_error::io::IoError,
9};
10use std::{
11 io::{self, Read, Write},
12 sync::{
13 Arc,
14 mpsc::{self, Sender},
15 },
16 thread::{self, JoinHandle},
17};
18
19#[derive(Clone)]
20pub struct Tee;
21
22impl Command for Tee {
23 fn name(&self) -> &str {
24 "tee"
25 }
26
27 fn description(&self) -> &str {
28 "Copy a stream to another command in parallel."
29 }
30
31 fn extra_description(&self) -> &str {
32 r#"This is useful for doing something else with a stream while still continuing to
33use it in your pipeline."#
34 }
35
36 fn signature(&self) -> Signature {
37 Signature::build("tee")
38 .input_output_type(Type::Any, Type::Any)
39 .switch(
40 "stderr",
41 "For external commands: copy the standard error stream instead.",
42 Some('e'),
43 )
44 .required(
45 "closure",
46 SyntaxShape::Closure(None),
47 "The other command to send the stream to.",
48 )
49 .category(Category::Filters)
50 }
51
52 fn examples(&self) -> Vec<Example<'_>> {
53 vec![
54 Example {
55 example: "http get http://example.org/ | tee { save example.html }",
56 description: "Save a webpage to a file while also printing it",
57 result: None,
58 },
59 Example {
60 example: "nu -c 'print -e error; print ok' | tee --stderr { save error.log } | complete",
61 description: "Save error messages from an external command to a file without \
62 redirecting them",
63 result: None,
64 },
65 Example {
66 example: "1..100 | tee { each { print } } | math sum | wrap sum",
67 description: "Print numbers and their sum",
68 result: None,
69 },
70 Example {
71 example: "10000 | tee { 1..$in | print } | $in * 5",
72 description: "Do something with a value on another thread, while also passing through the value",
73 result: Some(Value::test_int(50000)),
74 },
75 ]
76 }
77
78 fn run(
79 &self,
80 engine_state: &EngineState,
81 stack: &mut Stack,
82 call: &Call,
83 input: PipelineData,
84 ) -> Result<PipelineData, ShellError> {
85 let head = call.head;
86 let from_io_error = IoError::factory(head, None);
87 let use_stderr = call.has_flag(engine_state, stack, "stderr")?;
88
89 let closure: Spanned<Closure> = call.req(engine_state, stack, 0)?;
90 let closure_span = closure.span;
91 let closure = closure.item;
92
93 let engine_state_arc = Arc::new(engine_state.clone());
94
95 let mut eval_block = {
96 let closure_engine_state = engine_state_arc.clone();
97 let mut closure_stack = stack
98 .captures_to_stack_preserve_out_dest(closure.captures)
99 .reset_pipes();
100 let eval_block_with_early_return = get_eval_block_with_early_return(engine_state);
101
102 move |input| {
103 let result = eval_block_with_early_return(
104 &closure_engine_state,
105 &mut closure_stack,
106 closure_engine_state.get_block(closure.block_id),
107 input,
108 )
109 .map(|p| p.body);
110 result.and_then(|data| data.drain().map(|_| ()))
112 }
113 };
114
115 let span = input.span().unwrap_or(head);
119 let input = input
120 .try_into_stream(engine_state)
121 .unwrap_or_else(|original_input| original_input);
122
123 if let PipelineData::ByteStream(stream, metadata) = input {
124 let type_ = stream.type_();
125
126 let info = StreamInfo {
127 span,
128 signals: engine_state.signals().clone(),
129 type_,
130 metadata: metadata.clone(),
131 };
132
133 match stream.into_source() {
134 ByteStreamSource::Read(read) => {
135 if use_stderr {
136 return stderr_misuse(span, head);
137 }
138
139 let tee_thread = spawn_tee(info, eval_block)?;
140 let tee = IoTee::new(read, tee_thread);
141
142 Ok(PipelineData::byte_stream(
143 ByteStream::read(tee, span, engine_state.signals().clone(), type_),
144 metadata,
145 ))
146 }
147 ByteStreamSource::File(file) => {
148 if use_stderr {
149 return stderr_misuse(span, head);
150 }
151
152 let tee_thread = spawn_tee(info, eval_block)?;
153 let tee = IoTee::new(file, tee_thread);
154
155 Ok(PipelineData::byte_stream(
156 ByteStream::read(tee, span, engine_state.signals().clone(), type_),
157 metadata,
158 ))
159 }
160 #[cfg(feature = "os")]
161 ByteStreamSource::Child(mut child) => {
162 let stderr_thread = if use_stderr {
163 let stderr_thread = if let Some(stderr) = child.stderr.take() {
164 let tee_thread = spawn_tee(info.clone(), eval_block)?;
165 let tee = IoTee::new(stderr, tee_thread);
166 match stack.stderr() {
167 OutDest::Pipe | OutDest::PipeSeparate | OutDest::Value => {
168 child.stderr = Some(ChildPipe::Tee(Box::new(tee)));
169 Ok(None)
170 }
171 OutDest::Null => copy_on_thread(tee, io::sink(), &info).map(Some),
172 OutDest::Print | OutDest::Inherit => {
173 copy_on_thread(tee, io::stderr(), &info).map(Some)
174 }
175 OutDest::File(file) => {
176 copy_on_thread(tee, file.clone(), &info).map(Some)
177 }
178 }?
179 } else {
180 None
181 };
182
183 if let Some(stdout) = child.stdout.take() {
184 match stack.stdout() {
185 OutDest::Pipe | OutDest::PipeSeparate | OutDest::Value => {
186 child.stdout = Some(stdout);
187 Ok(())
188 }
189 OutDest::Null => copy_pipe(stdout, io::sink(), &info),
190 OutDest::Print | OutDest::Inherit => {
191 copy_pipe(stdout, io::stdout(), &info)
192 }
193 OutDest::File(file) => copy_pipe(stdout, file.as_ref(), &info),
194 }?;
195 }
196
197 stderr_thread
198 } else {
199 let stderr_thread = if let Some(stderr) = child.stderr.take() {
200 let info = info.clone();
201 match stack.stderr() {
202 OutDest::Pipe | OutDest::PipeSeparate | OutDest::Value => {
203 child.stderr = Some(stderr);
204 Ok(None)
205 }
206 OutDest::Null => {
207 copy_pipe_on_thread(stderr, io::sink(), &info).map(Some)
208 }
209 OutDest::Print | OutDest::Inherit => {
210 copy_pipe_on_thread(stderr, io::stderr(), &info).map(Some)
211 }
212 OutDest::File(file) => {
213 copy_pipe_on_thread(stderr, file.clone(), &info).map(Some)
214 }
215 }?
216 } else {
217 None
218 };
219
220 if let Some(stdout) = child.stdout.take() {
221 let tee_thread = spawn_tee(info.clone(), eval_block)?;
222 let tee = IoTee::new(stdout, tee_thread);
223 match stack.stdout() {
224 OutDest::Pipe | OutDest::PipeSeparate | OutDest::Value => {
225 child.stdout = Some(ChildPipe::Tee(Box::new(tee)));
226 Ok(())
227 }
228 OutDest::Null => copy(tee, io::sink(), &info),
229 OutDest::Print | OutDest::Inherit => copy(tee, io::stdout(), &info),
230 OutDest::File(file) => copy(tee, file.as_ref(), &info),
231 }?;
232 }
233
234 stderr_thread
235 };
236
237 if child.stdout.is_some() || child.stderr.is_some() {
238 Ok(PipelineData::byte_stream(
239 ByteStream::child(*child, span),
240 metadata,
241 ))
242 } else {
243 if let Some(thread) = stderr_thread {
244 thread.join().unwrap_or_else(|_| Err(panic_error()))?;
245 }
246 child.wait()?;
247 Ok(PipelineData::empty())
248 }
249 }
250 }
251 } else {
252 if use_stderr {
253 return stderr_misuse(input.span().unwrap_or(head), head);
254 }
255
256 let metadata = input.metadata();
257 let metadata_clone = metadata.clone();
258
259 if let PipelineData::ListStream(..) = input {
260 let signals = engine_state.signals().clone();
264
265 Ok(tee(input.into_iter(), move |rx| {
266 let input = rx.into_pipeline_data_with_metadata(span, signals, metadata_clone);
267 eval_block(input)
268 })
269 .map_err(&from_io_error)?
270 .map(move |result| result.unwrap_or_else(|err| Value::error(err, closure_span)))
271 .into_pipeline_data_with_metadata(
272 span,
273 engine_state.signals().clone(),
274 metadata,
275 ))
276 } else {
277 let value = input.into_value(span)?;
280 let value_clone = value.clone();
281 tee_once(stack.clone(), engine_state_arc, move || {
282 eval_block(value_clone.into_pipeline_data_with_metadata(metadata_clone))
283 })
284 .map_err(&from_io_error)?;
285 Ok(value.into_pipeline_data_with_metadata(metadata))
286 }
287 }
288 }
289
290 fn pipe_redirection(&self) -> (Option<OutDest>, Option<OutDest>) {
291 (Some(OutDest::PipeSeparate), Some(OutDest::PipeSeparate))
292 }
293}
294
295fn panic_error() -> ShellError {
296 ShellError::NushellFailed {
297 msg: "A panic occurred on a thread spawned by `tee`".into(),
298 }
299}
300
301fn tee<T>(
306 input: impl Iterator<Item = T>,
307 with_cloned_stream: impl FnOnce(mpsc::Receiver<T>) -> Result<(), ShellError> + Send + 'static,
308) -> Result<impl Iterator<Item = Result<T, ShellError>>, std::io::Error>
309where
310 T: Clone + Send + 'static,
311{
312 let (tx, rx) = mpsc::channel();
314
315 let mut thread = Some(
316 thread::Builder::new()
317 .name("tee".into())
318 .spawn(move || with_cloned_stream(rx))?,
319 );
320
321 let mut iter = input.into_iter();
322 let mut tx = Some(tx);
323
324 Ok(std::iter::from_fn(move || {
325 if thread.as_ref().is_some_and(|t| t.is_finished()) {
326 let result = thread
328 .take()
329 .expect("thread was taken early")
330 .join()
331 .unwrap_or_else(|_| Err(panic_error()));
332 if let Err(err) = result {
333 return Some(Err(err));
335 }
336 }
337
338 if let Some(value) = iter.next() {
340 let _ = tx.as_ref().map(|tx| tx.send(value.clone()));
342 Some(Ok(value))
343 } else {
344 drop(tx.take());
346 thread.take().and_then(|t| {
348 t.join()
349 .unwrap_or_else(|_| Err(panic_error()))
350 .err()
351 .map(Err)
352 })
353 }
354 }))
355}
356
357fn tee_once(
359 stack: Stack,
360 engine_state: Arc<EngineState>,
361 on_thread: impl FnOnce() -> Result<(), ShellError> + Send + 'static,
362) -> Result<JoinHandle<()>, std::io::Error> {
363 thread::Builder::new().name("tee".into()).spawn(move || {
364 if let Err(err) = on_thread() {
365 report_shell_error(Some(&stack), &engine_state, &err);
366 }
367 })
368}
369
370fn stderr_misuse<T>(span: Span, head: Span) -> Result<T, ShellError> {
371 Err(ShellError::UnsupportedInput {
372 msg: "--stderr can only be used on external commands".into(),
373 input: "the input to `tee` is not an external command".into(),
374 msg_span: head,
375 input_span: span,
376 })
377}
378
379struct IoTee<R: Read> {
380 reader: R,
381 sender: Option<Sender<Vec<u8>>>,
382 thread: Option<JoinHandle<Result<(), ShellError>>>,
383}
384
385impl<R: Read> IoTee<R> {
386 fn new(reader: R, tee: TeeThread) -> Self {
387 Self {
388 reader,
389 sender: Some(tee.sender),
390 thread: Some(tee.thread),
391 }
392 }
393}
394
395impl<R: Read> Read for IoTee<R> {
396 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
397 if let Some(thread) = self.thread.take() {
398 if thread.is_finished() {
399 if let Err(err) = thread.join().unwrap_or_else(|_| Err(panic_error())) {
400 return Err(io::Error::other(err));
401 }
402 } else {
403 self.thread = Some(thread)
404 }
405 }
406 let len = self.reader.read(buf)?;
407 if len == 0 {
408 self.sender = None;
409 if let Some(thread) = self.thread.take()
410 && let Err(err) = thread.join().unwrap_or_else(|_| Err(panic_error()))
411 {
412 return Err(io::Error::other(err));
413 }
414 } else if let Some(sender) = self.sender.as_mut()
415 && sender.send(buf[..len].to_vec()).is_err()
416 {
417 self.sender = None;
418 }
419 Ok(len)
420 }
421}
422
423struct TeeThread {
424 sender: Sender<Vec<u8>>,
425 thread: JoinHandle<Result<(), ShellError>>,
426}
427
428fn spawn_tee(
429 info: StreamInfo,
430 mut eval_block: impl FnMut(PipelineData) -> Result<(), ShellError> + Send + 'static,
431) -> Result<TeeThread, ShellError> {
432 let (sender, receiver) = mpsc::channel();
433
434 let thread = thread::Builder::new()
435 .name("tee".into())
436 .spawn(move || {
437 let stream = ByteStream::from_iter(
439 receiver.into_iter(),
440 info.span,
441 Signals::empty(),
442 info.type_,
443 );
444 eval_block(PipelineData::byte_stream(stream, info.metadata))
445 })
446 .map_err(|err| {
447 IoError::new_with_additional_context(err, info.span, None, "Could not spawn tee")
448 })?;
449
450 Ok(TeeThread { sender, thread })
451}
452
453#[derive(Clone)]
454struct StreamInfo {
455 span: Span,
456 signals: Signals,
457 type_: ByteStreamType,
458 metadata: Option<PipelineMetadata>,
459}
460
461fn copy(src: impl Read, dest: impl Write, info: &StreamInfo) -> Result<(), ShellError> {
462 copy_with_signals(src, dest, info.span, &info.signals)?;
463 Ok(())
464}
465
466#[cfg(feature = "os")]
467fn copy_pipe(pipe: ChildPipe, dest: impl Write, info: &StreamInfo) -> Result<(), ShellError> {
468 match pipe {
469 ChildPipe::Pipe(pipe) => copy(pipe, dest, info),
470 ChildPipe::Tee(tee) => copy(tee, dest, info),
471 }
472}
473
474fn copy_on_thread(
475 src: impl Read + Send + 'static,
476 dest: impl Write + Send + 'static,
477 info: &StreamInfo,
478) -> Result<JoinHandle<Result<(), ShellError>>, ShellError> {
479 let span = info.span;
480 let signals = info.signals.clone();
481 thread::Builder::new()
482 .name("stderr copier".into())
483 .spawn(move || {
484 copy_with_signals(src, dest, span, &signals)?;
485 Ok(())
486 })
487 .map_err(|err| {
488 IoError::new_with_additional_context(err, span, None, "Could not spawn stderr copier")
489 .into()
490 })
491}
492
493#[cfg(feature = "os")]
494fn copy_pipe_on_thread(
495 pipe: ChildPipe,
496 dest: impl Write + Send + 'static,
497 info: &StreamInfo,
498) -> Result<JoinHandle<Result<(), ShellError>>, ShellError> {
499 match pipe {
500 ChildPipe::Pipe(pipe) => copy_on_thread(pipe, dest, info),
501 ChildPipe::Tee(tee) => copy_on_thread(tee, dest, info),
502 }
503}
504
505#[test]
506fn tee_copies_values_to_other_thread_and_passes_them_through() {
507 let (tx, rx) = mpsc::channel();
508
509 let expected_values = vec![1, 2, 3, 4];
510
511 let my_result = tee(expected_values.clone().into_iter(), move |rx| {
512 for val in rx {
513 let _ = tx.send(val);
514 }
515 Ok(())
516 })
517 .expect("io error")
518 .collect::<Result<Vec<i32>, ShellError>>()
519 .expect("should not produce error");
520
521 assert_eq!(expected_values, my_result);
522
523 let other_threads_result = rx.into_iter().collect::<Vec<_>>();
524
525 assert_eq!(expected_values, other_threads_result);
526}
527
528#[test]
529fn tee_forwards_errors_back_immediately() {
530 use std::time::Duration;
531 let slow_input = (0..100).inspect(|_| std::thread::sleep(Duration::from_millis(1)));
532 let iter = tee(slow_input, |_| {
533 Err(ShellError::Io(IoError::new_with_additional_context(
534 shell_error::io::ErrorKind::from_std(std::io::ErrorKind::Other),
535 Span::test_data(),
536 None,
537 "test",
538 )))
539 })
540 .expect("io error");
541 for result in iter {
542 if let Ok(val) = result {
543 assert!(val < 99, "the error did not come early enough");
545 } else {
546 return;
548 }
549 }
550 panic!("never received the error");
551}
552
553#[test]
554fn tee_waits_for_the_other_thread() {
555 use std::sync::{
556 Arc,
557 atomic::{AtomicBool, Ordering},
558 };
559 use std::time::Duration;
560 let waited = Arc::new(AtomicBool::new(false));
561 let waited_clone = waited.clone();
562 let iter = tee(0..100, move |_| {
563 std::thread::sleep(Duration::from_millis(10));
564 waited_clone.store(true, Ordering::Relaxed);
565 Err(ShellError::Io(IoError::new_with_additional_context(
566 shell_error::io::ErrorKind::from_std(std::io::ErrorKind::Other),
567 Span::test_data(),
568 None,
569 "test",
570 )))
571 })
572 .expect("io error");
573 let last = iter.last();
574 assert!(waited.load(Ordering::Relaxed), "failed to wait");
575 assert!(
576 last.is_some_and(|res| res.is_err()),
577 "failed to return error from wait"
578 );
579}