Skip to main content

nu_protocol/pipeline/
byte_stream.rs

1//! Module managing the streaming of raw bytes between pipeline elements
2//!
3//! This module also handles conversions the [`ShellError`] <-> [`io::Error`](std::io::Error),
4//! so remember the usage of [`ShellErrorBridge`] where applicable.
5#[cfg(feature = "os")]
6use crate::process::{ChildPipe, ChildProcess};
7use crate::{
8    IntRange, PipelineData, ShellError, Signals, Span, Type, Value,
9    shell_error::{bridge::ShellErrorBridge, io::IoError},
10};
11use nu_utils::SplitRead as SplitReadInner;
12use serde::{Deserialize, Serialize};
13use std::ops::Bound;
14#[cfg(unix)]
15use std::os::fd::OwnedFd;
16#[cfg(windows)]
17use std::os::windows::io::OwnedHandle;
18use std::{
19    fmt::Debug,
20    fs::File,
21    io::{self, BufRead, BufReader, Cursor, ErrorKind, Read, Write},
22    process::Stdio,
23};
24
25/// The source of bytes for a [`ByteStream`].
26///
27/// Currently, there are only three possibilities:
28/// 1. `Read` (any `dyn` type that implements [`Read`])
29/// 2. [`File`]
30/// 3. [`ChildProcess`]
31pub enum ByteStreamSource {
32    Read(Box<dyn Read + Send + 'static>),
33    File(File),
34    #[cfg(feature = "os")]
35    Child(Box<ChildProcess>),
36}
37
38impl ByteStreamSource {
39    fn reader(self) -> Option<SourceReader> {
40        match self {
41            ByteStreamSource::Read(read) => Some(SourceReader::Read(read)),
42            ByteStreamSource::File(file) => Some(SourceReader::File(file)),
43            #[cfg(feature = "os")]
44            ByteStreamSource::Child(mut child) => child.stdout.take().map(|stdout| match stdout {
45                ChildPipe::Pipe(pipe) => SourceReader::File(convert_file(pipe)),
46                ChildPipe::Tee(tee) => SourceReader::Read(tee),
47            }),
48        }
49    }
50
51    /// Source is a `Child` or `File`, rather than `Read`. Currently affects trimming
52    #[cfg(feature = "os")]
53    pub fn is_external(&self) -> bool {
54        matches!(self, ByteStreamSource::Child(..))
55    }
56
57    #[cfg(not(feature = "os"))]
58    pub fn is_external(&self) -> bool {
59        // without os support we never have externals
60        false
61    }
62}
63
64impl Debug for ByteStreamSource {
65    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        match self {
67            ByteStreamSource::Read(_) => f.debug_tuple("Read").field(&"..").finish(),
68            ByteStreamSource::File(file) => f.debug_tuple("File").field(file).finish(),
69            #[cfg(feature = "os")]
70            ByteStreamSource::Child(child) => f.debug_tuple("Child").field(child).finish(),
71        }
72    }
73}
74
75enum SourceReader {
76    Read(Box<dyn Read + Send + 'static>),
77    File(File),
78}
79
80impl Read for SourceReader {
81    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
82        match self {
83            SourceReader::Read(reader) => reader.read(buf),
84            SourceReader::File(file) => file.read(buf),
85        }
86    }
87}
88
89impl Debug for SourceReader {
90    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91        match self {
92            SourceReader::Read(_) => f.debug_tuple("Read").field(&"..").finish(),
93            SourceReader::File(file) => f.debug_tuple("File").field(file).finish(),
94        }
95    }
96}
97
98/// Optional type color for [`ByteStream`], which determines type compatibility.
99#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
100pub enum ByteStreamType {
101    /// Compatible with [`Type::Binary`], and should only be converted to binary, even when the
102    /// desired type is unknown.
103    Binary,
104    /// Compatible with [`Type::String`], and should only be converted to string, even when the
105    /// desired type is unknown.
106    ///
107    /// This does not guarantee valid UTF-8 data, but it is conventionally so. Converting to
108    /// `String` still requires validation of the data.
109    String,
110    /// Unknown whether the stream should contain binary or string data. This usually is the result
111    /// of an external stream, e.g. an external command or file.
112    #[default]
113    Unknown,
114}
115
116impl ByteStreamType {
117    /// Returns the string that describes the byte stream type - i.e., the same as what `describe`
118    /// produces. This can be used in type mismatch error messages.
119    pub fn describe(self) -> &'static str {
120        match self {
121            ByteStreamType::Binary => "binary (stream)",
122            ByteStreamType::String => "string (stream)",
123            ByteStreamType::Unknown => "byte stream",
124        }
125    }
126
127    /// Returns true if the type is `Binary` or `Unknown`
128    pub fn is_binary_coercible(self) -> bool {
129        matches!(self, ByteStreamType::Binary | ByteStreamType::Unknown)
130    }
131
132    /// Returns true if the type is `String` or `Unknown`
133    pub fn is_string_coercible(self) -> bool {
134        matches!(self, ByteStreamType::String | ByteStreamType::Unknown)
135    }
136}
137
138impl From<ByteStreamType> for Type {
139    fn from(value: ByteStreamType) -> Self {
140        match value {
141            ByteStreamType::Binary => Type::Binary,
142            ByteStreamType::String => Type::String,
143            ByteStreamType::Unknown => Type::Any,
144        }
145    }
146}
147
148/// A potentially infinite, interruptible stream of bytes.
149///
150/// To create a [`ByteStream`], you can use any of the following methods:
151/// - [`read`](ByteStream::read): takes any type that implements [`Read`].
152/// - [`file`](ByteStream::file): takes a [`File`].
153/// - [`from_iter`](ByteStream::from_iter): takes an [`Iterator`] whose items implement `AsRef<[u8]>`.
154/// - [`from_result_iter`](ByteStream::from_result_iter): same as [`from_iter`](ByteStream::from_iter),
155///   but each item is a `Result<T, ShellError>`.
156/// - [`from_fn`](ByteStream::from_fn): uses a generator function to fill a buffer whenever it is
157///   empty. This has high performance because it doesn't need to allocate for each chunk of data,
158///   and can just reuse the same buffer.
159///
160/// Byte streams have a [type](.type_()) which is used to preserve type compatibility when they
161/// are the result of an internal command. It is important that this be set to the correct value.
162/// [`Unknown`](ByteStreamType::Unknown) is used only for external sources where the type can not
163/// be inherently determined, and having it automatically act as a string or binary depending on
164/// whether it parses as UTF-8 or not is desirable.
165///
166/// The data of a [`ByteStream`] can be accessed using one of the following methods:
167/// - [`reader`](ByteStream::reader): returns a [`Read`]-able type to get the raw bytes in the stream.
168/// - [`lines`](ByteStream::lines): splits the bytes on lines and returns an [`Iterator`]
169///   where each item is a `Result<String, ShellError>`.
170/// - [`chunks`](ByteStream::chunks): returns an [`Iterator`] of [`Value`]s where each value is
171///   either a string or binary.
172///   Try not to use this method if possible. Rather, please use [`reader`](ByteStream::reader)
173///   (or [`lines`](ByteStream::lines) if it matches the situation).
174///
175/// Additionally, there are few methods to collect a [`ByteStream`] into memory:
176/// - [`into_bytes`](ByteStream::into_bytes): collects all bytes into a [`Vec<u8>`].
177/// - [`into_string`](ByteStream::into_string): collects all bytes into a [`String`], erroring if utf-8 decoding failed.
178/// - [`into_value`](ByteStream::into_value): collects all bytes into a value typed appropriately
179///   for the [type](.type_()) of this stream. If the type is [`Unknown`](ByteStreamType::Unknown),
180///   it will produce a string value if the data is valid UTF-8, or a binary value otherwise.
181///
182/// There are also a few other methods to consume all the data of a [`ByteStream`]:
183/// - [`drain`](ByteStream::drain): consumes all bytes and outputs nothing.
184/// - [`write_to`](ByteStream::write_to): writes all bytes to the given [`Write`] destination.
185/// - [`print`](ByteStream::print): a convenience wrapper around [`write_to`](ByteStream::write_to).
186///   It prints all bytes to stdout or stderr.
187///
188/// Internally, [`ByteStream`]s currently come in three flavors according to [`ByteStreamSource`].
189/// See its documentation for more information.
190#[derive(Debug)]
191pub struct ByteStream {
192    stream: ByteStreamSource,
193    span: Span,
194    signals: Signals,
195    type_: ByteStreamType,
196    known_size: Option<u64>,
197    caller_spans: Vec<Span>,
198}
199
200impl ByteStream {
201    /// Create a new [`ByteStream`] from a [`ByteStreamSource`].
202    pub fn new(
203        stream: ByteStreamSource,
204        span: Span,
205        signals: Signals,
206        type_: ByteStreamType,
207    ) -> Self {
208        Self {
209            stream,
210            span,
211            signals,
212            type_,
213            known_size: None,
214            caller_spans: vec![],
215        }
216    }
217
218    /// Push a caller [`Span`] to the bytestream, it's useful to construct a backtrace.
219    pub fn push_caller_span(&mut self, span: Span) {
220        if span != self.span {
221            self.caller_spans.push(span)
222        }
223    }
224
225    /// Get all caller [`Span`], it's useful to construct a backtrace.
226    pub fn get_caller_spans(&self) -> &Vec<Span> {
227        &self.caller_spans
228    }
229
230    /// Create a [`ByteStream`] from an arbitrary reader. The type must be provided.
231    pub fn read(
232        reader: impl Read + Send + 'static,
233        span: Span,
234        signals: Signals,
235        type_: ByteStreamType,
236    ) -> Self {
237        Self::new(
238            ByteStreamSource::Read(Box::new(reader)),
239            span,
240            signals,
241            type_,
242        )
243    }
244
245    pub fn skip(self, span: Span, n: u64) -> Result<Self, ShellError> {
246        let known_size = self.known_size.map(|len| len.saturating_sub(n));
247        if let Some(mut reader) = self.reader() {
248            // Copy the number of skipped bytes into the sink before proceeding
249            io::copy(&mut (&mut reader).take(n), &mut io::sink())
250                .map_err(|err| IoError::new(err, span, None))?;
251            Ok(
252                ByteStream::read(reader, span, Signals::empty(), ByteStreamType::Binary)
253                    .with_known_size(known_size),
254            )
255        } else {
256            Err(ShellError::TypeMismatch {
257                err_message: "expected readable stream".into(),
258                span,
259            })
260        }
261    }
262
263    pub fn take(self, span: Span, n: u64) -> Result<Self, ShellError> {
264        let known_size = self.known_size.map(|s| s.min(n));
265        if let Some(reader) = self.reader() {
266            Ok(ByteStream::read(
267                reader.take(n),
268                span,
269                Signals::empty(),
270                ByteStreamType::Binary,
271            )
272            .with_known_size(known_size))
273        } else {
274            Err(ShellError::TypeMismatch {
275                err_message: "expected readable stream".into(),
276                span,
277            })
278        }
279    }
280
281    pub fn slice(
282        self,
283        val_span: Span,
284        call_span: Span,
285        range: IntRange,
286    ) -> Result<Self, ShellError> {
287        if let Some(len) = self.known_size {
288            let start = range.absolute_start(len);
289            let stream = self.skip(val_span, start);
290
291            match range.absolute_end(len) {
292                Bound::Unbounded => stream,
293                Bound::Included(end) | Bound::Excluded(end) if end < start => {
294                    stream.and_then(|s| s.take(val_span, 0))
295                }
296                Bound::Included(end) => {
297                    let distance = end - start + 1;
298                    stream.and_then(|s| s.take(val_span, distance.min(len)))
299                }
300                Bound::Excluded(end) => {
301                    let distance = end - start;
302                    stream.and_then(|s| s.take(val_span, distance.min(len)))
303                }
304            }
305        } else if range.is_relative() {
306            Err(ShellError::RelativeRangeOnInfiniteStream { span: call_span })
307        } else {
308            let start = range.start() as u64;
309            let stream = self.skip(val_span, start);
310
311            match range.distance() {
312                Bound::Unbounded => stream,
313                Bound::Included(distance) => stream.and_then(|s| s.take(val_span, distance + 1)),
314                Bound::Excluded(distance) => stream.and_then(|s| s.take(val_span, distance)),
315            }
316        }
317    }
318
319    /// Create a [`ByteStream`] from a string. The type of the stream is always `String`.
320    pub fn read_string(string: String, span: Span, signals: Signals) -> Self {
321        let len = string.len();
322        ByteStream::read(
323            Cursor::new(string.into_bytes()),
324            span,
325            signals,
326            ByteStreamType::String,
327        )
328        .with_known_size(Some(len as u64))
329    }
330
331    /// Create a [`ByteStream`] from a byte vector. The type of the stream is always `Binary`.
332    pub fn read_binary(bytes: Vec<u8>, span: Span, signals: Signals) -> Self {
333        let len = bytes.len();
334        ByteStream::read(Cursor::new(bytes), span, signals, ByteStreamType::Binary)
335            .with_known_size(Some(len as u64))
336    }
337
338    /// Create a [`ByteStream`] from a file.
339    ///
340    /// The type is implicitly `Unknown`, as it's not typically known whether files will
341    /// return text or binary.
342    pub fn file(file: File, span: Span, signals: Signals) -> Self {
343        Self::new(
344            ByteStreamSource::File(file),
345            span,
346            signals,
347            ByteStreamType::Unknown,
348        )
349    }
350
351    /// Create a [`ByteStream`] from a child process's stdout and stderr.
352    ///
353    /// The type is implicitly `Unknown`, as it's not typically known whether child processes will
354    /// return text or binary.
355    #[cfg(feature = "os")]
356    pub fn child(child: ChildProcess, span: Span) -> Self {
357        Self::new(
358            ByteStreamSource::Child(Box::new(child)),
359            span,
360            Signals::empty(),
361            ByteStreamType::Unknown,
362        )
363    }
364
365    /// Create a [`ByteStream`] that reads from stdin.
366    ///
367    /// The type is implicitly `Unknown`, as it's not typically known whether stdin is text or
368    /// binary.
369    #[cfg(feature = "os")]
370    pub fn stdin(span: Span) -> Result<Self, ShellError> {
371        let stdin = os_pipe::dup_stdin().map_err(|err| IoError::new(err, span, None))?;
372        let source = ByteStreamSource::File(convert_file(stdin));
373        Ok(Self::new(
374            source,
375            span,
376            Signals::empty(),
377            ByteStreamType::Unknown,
378        ))
379    }
380
381    #[cfg(not(feature = "os"))]
382    pub fn stdin(span: Span) -> Result<Self, ShellError> {
383        Err(ShellError::DisabledOsSupport {
384            msg: "Stdin is not supported".to_string(),
385            span,
386        })
387    }
388
389    /// Create a [`ByteStream`] from a generator function that writes data to the given buffer
390    /// when called, and returns `Ok(false)` on end of stream.
391    pub fn from_fn(
392        span: Span,
393        signals: Signals,
394        type_: ByteStreamType,
395        generator: impl FnMut(&mut Vec<u8>) -> Result<bool, ShellError> + Send + 'static,
396    ) -> Self {
397        Self::read(
398            ReadGenerator {
399                buffer: Cursor::new(Vec::new()),
400                generator,
401            },
402            span,
403            signals,
404            type_,
405        )
406    }
407
408    pub fn with_type(mut self, type_: ByteStreamType) -> Self {
409        self.type_ = type_;
410        self
411    }
412
413    /// Create a new [`ByteStream`] from an [`Iterator`] of bytes slices.
414    ///
415    /// The returned [`ByteStream`] will have a [`ByteStreamSource`] of `Read`.
416    pub fn from_iter<I>(iter: I, span: Span, signals: Signals, type_: ByteStreamType) -> Self
417    where
418        I: IntoIterator,
419        I::IntoIter: Send + 'static,
420        I::Item: AsRef<[u8]> + Default + Send + 'static,
421    {
422        let iter = iter.into_iter();
423        let cursor = Some(Cursor::new(I::Item::default()));
424        Self::read(ReadIterator { iter, cursor }, span, signals, type_)
425    }
426
427    /// Create a new [`ByteStream`] from an [`Iterator`] of [`Result`] bytes slices.
428    ///
429    /// The returned [`ByteStream`] will have a [`ByteStreamSource`] of `Read`.
430    pub fn from_result_iter<I, T>(
431        iter: I,
432        span: Span,
433        signals: Signals,
434        type_: ByteStreamType,
435    ) -> Self
436    where
437        I: IntoIterator<Item = Result<T, ShellError>>,
438        I::IntoIter: Send + 'static,
439        T: AsRef<[u8]> + Default + Send + 'static,
440    {
441        let iter = iter.into_iter();
442        let cursor = Some(Cursor::new(T::default()));
443        Self::read(ReadResultIterator { iter, cursor }, span, signals, type_)
444    }
445
446    /// Set the known size, in number of bytes, of the [`ByteStream`].
447    pub fn with_known_size(mut self, size: Option<u64>) -> Self {
448        self.known_size = size;
449        self
450    }
451
452    /// Get a reference to the inner [`ByteStreamSource`] of the [`ByteStream`].
453    pub fn source(&self) -> &ByteStreamSource {
454        &self.stream
455    }
456
457    /// Get a mutable reference to the inner [`ByteStreamSource`] of the [`ByteStream`].
458    pub fn source_mut(&mut self) -> &mut ByteStreamSource {
459        &mut self.stream
460    }
461
462    /// Returns the [`Span`] associated with the [`ByteStream`].
463    pub fn span(&self) -> Span {
464        self.span
465    }
466
467    /// Changes the [`Span`] associated with the [`ByteStream`].
468    pub fn with_span(mut self, span: Span) -> Self {
469        self.span = span;
470        self
471    }
472
473    /// Returns the [`ByteStreamType`] associated with the [`ByteStream`].
474    pub fn type_(&self) -> ByteStreamType {
475        self.type_
476    }
477
478    /// Returns the known size, in number of bytes, of the [`ByteStream`].
479    pub fn known_size(&self) -> Option<u64> {
480        self.known_size
481    }
482
483    /// Convert the [`ByteStream`] into its [`Reader`] which allows one to [`Read`] the raw bytes of the stream.
484    ///
485    /// [`Reader`] is buffered and also implements [`BufRead`].
486    ///
487    /// If the source of the [`ByteStream`] is [`ByteStreamSource::Child`] and the child has no stdout,
488    /// then the stream is considered empty and `None` will be returned.
489    pub fn reader(self) -> Option<Reader> {
490        let reader = self.stream.reader()?;
491        Some(Reader {
492            reader: BufReader::new(reader),
493            span: self.span,
494            signals: self.signals,
495        })
496    }
497
498    /// Convert the [`ByteStream`] into a [`Lines`] iterator where each element is a `Result<String, ShellError>`.
499    ///
500    /// There is no limit on how large each line will be. Ending new lines (`\n` or `\r\n`) are
501    /// stripped from each line. If a line fails to be decoded as utf-8, then it will become a [`ShellError`].
502    ///
503    /// If the source of the [`ByteStream`] is [`ByteStreamSource::Child`] and the child has no stdout,
504    /// then the stream is considered empty and `None` will be returned.
505    pub fn lines(self) -> Option<Lines> {
506        let reader = self.stream.reader()?;
507        Some(Lines {
508            reader: BufReader::new(reader),
509            span: self.span,
510            signals: self.signals,
511            strict: false,
512        })
513    }
514
515    /// Convert the [`ByteStream`] into a [`SplitRead`] iterator where each element is a `Result<String, ShellError>`.
516    ///
517    /// Each call to [`next`](Iterator::next) reads the currently available data from the byte
518    /// stream source, until `delimiter` or the end of the stream is encountered.
519    ///
520    /// If the source of the [`ByteStream`] is [`ByteStreamSource::Child`] and the child has no stdout,
521    /// then the stream is considered empty and `None` will be returned.
522    pub fn split(self, delimiter: Vec<u8>) -> Option<SplitRead> {
523        let reader = self.stream.reader()?;
524        Some(SplitRead::new(reader, delimiter, self.span, self.signals))
525    }
526
527    /// Convert the [`ByteStream`] into a [`Chunks`] iterator where each element is a `Result<Value, ShellError>`.
528    ///
529    /// Each call to [`next`](Iterator::next) reads the currently available data from the byte stream source,
530    /// up to a maximum size. The values are typed according to the [type](.type_()) of the
531    /// stream, and if that type is [`Unknown`](ByteStreamType::Unknown), string values will be
532    /// produced as long as the stream continues to parse as valid UTF-8, but binary values will
533    /// be produced instead of the stream fails to parse as UTF-8 instead at any point.
534    /// Any and all newlines are kept intact in each chunk.
535    ///
536    /// Where possible, prefer [`reader`](ByteStream::reader) or [`lines`](ByteStream::lines) over this method.
537    /// Those methods are more likely to be used in a semantically correct way
538    /// (and [`reader`](ByteStream::reader) is more efficient too).
539    ///
540    /// If the source of the [`ByteStream`] is [`ByteStreamSource::Child`] and the child has no stdout,
541    /// then the stream is considered empty and `None` will be returned.
542    pub fn chunks(self) -> Option<Chunks> {
543        let reader = self.stream.reader()?;
544        Some(Chunks::new(reader, self.span, self.signals, self.type_))
545    }
546
547    /// Convert the [`ByteStream`] into its inner [`ByteStreamSource`].
548    pub fn into_source(self) -> ByteStreamSource {
549        self.stream
550    }
551
552    /// Attempt to convert the [`ByteStream`] into a [`Stdio`].
553    ///
554    /// This will succeed if the [`ByteStreamSource`] of the [`ByteStream`] is either:
555    /// - [`File`](ByteStreamSource::File)
556    /// - [`Child`](ByteStreamSource::Child) and the child has a stdout that is `Some(ChildPipe::Pipe(..))`.
557    ///
558    /// All other cases return an `Err` with the original [`ByteStream`] in it.
559    pub fn into_stdio(mut self) -> Result<Stdio, Self> {
560        match self.stream {
561            ByteStreamSource::Read(..) => Err(self),
562            ByteStreamSource::File(file) => Ok(file.into()),
563            #[cfg(feature = "os")]
564            ByteStreamSource::Child(child) => {
565                if let ChildProcess {
566                    stdout: Some(ChildPipe::Pipe(stdout)),
567                    stderr,
568                    ..
569                } = *child
570                {
571                    debug_assert!(stderr.is_none(), "stderr should not exist");
572                    Ok(stdout.into())
573                } else {
574                    self.stream = ByteStreamSource::Child(child);
575                    Err(self)
576                }
577            }
578        }
579    }
580
581    /// Attempt to convert the [`ByteStream`] into a [`ChildProcess`].
582    ///
583    /// This will only succeed if the [`ByteStreamSource`] of the [`ByteStream`] is [`Child`](ByteStreamSource::Child).
584    /// All other cases return an `Err` with the original [`ByteStream`] in it.
585    #[cfg(feature = "os")]
586    pub fn into_child(self) -> Result<ChildProcess, Self> {
587        if let ByteStreamSource::Child(child) = self.stream {
588            Ok(*child)
589        } else {
590            Err(self)
591        }
592    }
593
594    /// Collect all the bytes of the [`ByteStream`] into a [`Vec<u8>`].
595    ///
596    /// Any trailing new lines are kept in the returned [`Vec`].
597    pub fn into_bytes(self) -> Result<Vec<u8>, ShellError> {
598        let span = self.span;
599        let signals = self.signals;
600        let from_io_error = IoError::factory(span, None);
601        match self.stream {
602            ByteStreamSource::Read(mut read) => {
603                let mut buf = Vec::new();
604                let mut chunk = [0; DEFAULT_BUF_SIZE];
605                loop {
606                    signals.check(&span)?;
607                    match read.read(&mut chunk) {
608                        Ok(0) => break,
609                        Ok(n) => buf.extend_from_slice(&chunk[..n]),
610                        Err(e) if e.kind() == ErrorKind::Interrupted => continue,
611                        Err(e) => match ShellErrorBridge::try_from(e) {
612                            Ok(ShellErrorBridge(e)) => return Err(e),
613                            Err(e) => return Err(ShellError::Io(from_io_error(e))),
614                        },
615                    }
616                }
617                Ok(buf)
618            }
619            ByteStreamSource::File(mut file) => {
620                let mut buf = Vec::new();
621                let mut chunk = [0; DEFAULT_BUF_SIZE];
622                loop {
623                    signals.check(&span)?;
624                    match file.read(&mut chunk) {
625                        Ok(0) => break,
626                        Ok(n) => buf.extend_from_slice(&chunk[..n]),
627                        Err(e) if e.kind() == ErrorKind::Interrupted => continue,
628                        Err(e) => return Err(ShellError::Io(from_io_error(e))),
629                    }
630                }
631                Ok(buf)
632            }
633            #[cfg(feature = "os")]
634            ByteStreamSource::Child(child) => child.into_bytes(),
635        }
636    }
637
638    /// Collect the stream into a `String` in-memory. This can only succeed if the data contained is
639    /// valid UTF-8.
640    ///
641    /// The trailing new line (`\n` or `\r\n`), if any, is removed from the [`String`] prior to
642    /// being returned, if this is a stream coming from an external process or file.
643    ///
644    /// If the [type](.type_()) is specified as `Binary`, this operation always fails, even if the
645    /// data would have been valid UTF-8.
646    pub fn into_string(self) -> Result<String, ShellError> {
647        let span = self.span;
648        if self.type_.is_string_coercible() {
649            let trim = self.stream.is_external();
650            let bytes = self.into_bytes()?;
651            let mut string = String::from_utf8(bytes).map_err(|err| ShellError::NonUtf8Custom {
652                span,
653                msg: err.to_string(),
654            })?;
655            if trim {
656                trim_end_newline(&mut string);
657            }
658            Ok(string)
659        } else {
660            Err(ShellError::TypeMismatch {
661                err_message: "expected string, but got binary".into(),
662                span,
663            })
664        }
665    }
666
667    /// Collect all the bytes of the [`ByteStream`] into a [`Value`].
668    ///
669    /// If this is a `String` stream, the stream is decoded to UTF-8. If the stream came from an
670    /// external process or file, the trailing new line (`\n` or `\r\n`), if any, is removed from
671    /// the [`String`] prior to being returned.
672    ///
673    /// If this is a `Binary` stream, a [`Value::Binary`] is returned with any trailing new lines
674    /// preserved.
675    ///
676    /// If this is an `Unknown` stream, the behavior depends on whether the stream parses as valid
677    /// UTF-8 or not. If it does, this is uses the `String` behavior; if not, it uses the `Binary`
678    /// behavior.
679    pub fn into_value(self) -> Result<Value, ShellError> {
680        let span = self.span;
681        let trim = self.stream.is_external();
682        let value = match self.type_ {
683            // If the type is specified, then the stream should always become that type:
684            ByteStreamType::Binary => Value::binary(self.into_bytes()?, span),
685            ByteStreamType::String => Value::string(self.into_string()?, span),
686            // If the type is not specified, then it just depends on whether it parses or not:
687            ByteStreamType::Unknown => match String::from_utf8(self.into_bytes()?) {
688                Ok(mut str) => {
689                    if trim {
690                        trim_end_newline(&mut str);
691                    }
692                    Value::string(str, span)
693                }
694                Err(err) => Value::binary(err.into_bytes(), span),
695            },
696        };
697        Ok(value)
698    }
699
700    /// Consume and drop all bytes of the [`ByteStream`].
701    pub fn drain(self) -> Result<(), ShellError> {
702        match self.stream {
703            ByteStreamSource::Read(read) => {
704                copy_with_signals(read, io::sink(), self.span, &self.signals)?;
705                Ok(())
706            }
707            ByteStreamSource::File(_) => Ok(()),
708            #[cfg(feature = "os")]
709            ByteStreamSource::Child(child) => child.wait(),
710        }
711    }
712
713    /// Print all bytes of the [`ByteStream`] to stdout or stderr.
714    pub fn print(self, to_stderr: bool) -> Result<(), ShellError> {
715        if to_stderr {
716            self.write_to(&mut io::stderr())
717        } else {
718            self.write_to(&mut io::stdout())
719        }
720    }
721
722    /// Write all bytes of the [`ByteStream`] to `dest`.
723    pub fn write_to(self, dest: impl Write) -> Result<(), ShellError> {
724        let span = self.span;
725        let signals = &self.signals;
726        match self.stream {
727            ByteStreamSource::Read(read) => {
728                copy_with_signals(read, dest, span, signals)?;
729            }
730            ByteStreamSource::File(file) => {
731                copy_with_signals(file, dest, span, signals)?;
732            }
733            #[cfg(feature = "os")]
734            ByteStreamSource::Child(mut child) => {
735                // All `OutDest`s except `OutDest::PipeSeparate` will cause `stderr` to be `None`.
736                // Only `save`, `tee`, and `complete` set the stderr `OutDest` to `OutDest::PipeSeparate`,
737                // and those commands have proper simultaneous handling of stdout and stderr.
738                debug_assert!(child.stderr.is_none(), "stderr should not exist");
739
740                if let Some(stdout) = child.stdout.take() {
741                    match stdout {
742                        ChildPipe::Pipe(pipe) => {
743                            copy_with_signals(pipe, dest, span, signals)?;
744                        }
745                        ChildPipe::Tee(tee) => {
746                            copy_with_signals(tee, dest, span, signals)?;
747                        }
748                    }
749                }
750                child.wait()?;
751            }
752        }
753        Ok(())
754    }
755}
756
757impl From<ByteStream> for PipelineData {
758    fn from(stream: ByteStream) -> Self {
759        Self::byte_stream(stream, None)
760    }
761}
762
763struct ReadIterator<I>
764where
765    I: Iterator,
766    I::Item: AsRef<[u8]>,
767{
768    iter: I,
769    cursor: Option<Cursor<I::Item>>,
770}
771
772impl<I> Read for ReadIterator<I>
773where
774    I: Iterator,
775    I::Item: AsRef<[u8]>,
776{
777    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
778        while let Some(cursor) = self.cursor.as_mut() {
779            let read = cursor.read(buf)?;
780            if read == 0 {
781                self.cursor = self.iter.next().map(Cursor::new);
782            } else {
783                return Ok(read);
784            }
785        }
786        Ok(0)
787    }
788}
789
790struct ReadResultIterator<I, T>
791where
792    I: Iterator<Item = Result<T, ShellError>>,
793    T: AsRef<[u8]>,
794{
795    iter: I,
796    cursor: Option<Cursor<T>>,
797}
798
799impl<I, T> Read for ReadResultIterator<I, T>
800where
801    I: Iterator<Item = Result<T, ShellError>>,
802    T: AsRef<[u8]>,
803{
804    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
805        while let Some(cursor) = self.cursor.as_mut() {
806            let read = cursor.read(buf)?;
807            if read == 0 {
808                self.cursor = self
809                    .iter
810                    .next()
811                    .transpose()
812                    .map_err(ShellErrorBridge)?
813                    .map(Cursor::new);
814            } else {
815                return Ok(read);
816            }
817        }
818        Ok(0)
819    }
820}
821
822pub struct Reader {
823    reader: BufReader<SourceReader>,
824    span: Span,
825    signals: Signals,
826}
827
828impl Reader {
829    pub fn span(&self) -> Span {
830        self.span
831    }
832}
833
834impl Read for Reader {
835    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
836        self.signals.check(&self.span).map_err(ShellErrorBridge)?;
837        self.reader.read(buf)
838    }
839}
840
841impl BufRead for Reader {
842    fn fill_buf(&mut self) -> io::Result<&[u8]> {
843        self.reader.fill_buf()
844    }
845
846    fn consume(&mut self, amt: usize) {
847        self.reader.consume(amt)
848    }
849}
850
851pub struct Lines {
852    reader: BufReader<SourceReader>,
853    span: Span,
854    signals: Signals,
855    /// Controls UTF-8 decoding behavior for each line.
856    ///
857    /// When `false` (the default), invalid UTF-8 bytes are replaced with the Unicode
858    /// replacement character (U+FFFD) using lossy conversion, so processing continues
859    /// uninterrupted even if the input is not valid UTF-8.
860    ///
861    /// When `true`, any line containing invalid UTF-8 bytes will immediately produce
862    /// a [`ShellError::NonUtf8`] error instead of a string value.
863    strict: bool,
864}
865
866impl Lines {
867    pub fn span(&self) -> Span {
868        self.span
869    }
870
871    /// Sets the UTF-8 decoding mode for this iterator.
872    ///
873    /// When `strict` is `true`, any line that contains invalid UTF-8 bytes will yield
874    /// a [`ShellError::NonUtf8`] error. When `false` (the default), invalid bytes are
875    /// silently replaced with the Unicode replacement character (U+FFFD `\u{FFFD}`).
876    pub fn strict(mut self, strict: bool) -> Self {
877        self.strict = strict;
878        self
879    }
880}
881
882impl Iterator for Lines {
883    type Item = Result<String, ShellError>;
884
885    fn next(&mut self) -> Option<Self::Item> {
886        if self.signals.interrupted() {
887            None
888        } else {
889            let mut buf = Vec::new();
890            match self.reader.read_until(b'\n', &mut buf) {
891                Ok(0) => None,
892                Ok(_) => {
893                    let mut string = if self.strict {
894                        match String::from_utf8(buf) {
895                            Ok(s) => s,
896                            Err(_) => return Some(Err(ShellError::NonUtf8 { span: self.span })),
897                        }
898                    } else {
899                        String::from_utf8_lossy(&buf).into_owned()
900                    };
901                    trim_end_newline(&mut string);
902                    Some(Ok(string))
903                }
904                Err(err) => Some(Err(IoError::new(err, self.span, None).into())),
905            }
906        }
907    }
908}
909
910pub struct SplitRead {
911    internal: SplitReadInner<BufReader<SourceReader>>,
912    span: Span,
913    signals: Signals,
914}
915
916impl SplitRead {
917    fn new(
918        reader: SourceReader,
919        delimiter: impl AsRef<[u8]>,
920        span: Span,
921        signals: Signals,
922    ) -> Self {
923        Self {
924            internal: SplitReadInner::new(BufReader::new(reader), delimiter),
925            span,
926            signals,
927        }
928    }
929
930    pub fn span(&self) -> Span {
931        self.span
932    }
933}
934
935impl Iterator for SplitRead {
936    type Item = Result<Vec<u8>, ShellError>;
937
938    fn next(&mut self) -> Option<Self::Item> {
939        if self.signals.interrupted() {
940            return None;
941        }
942        self.internal.next().map(|r| {
943            r.map_err(|err| {
944                ShellError::Io(IoError::new_internal(
945                    err,
946                    "Could not get next value for SplitRead",
947                ))
948            })
949        })
950    }
951}
952
953/// Turn a readable stream into [`Value`]s.
954///
955/// The `Value` type depends on the type of the stream ([`ByteStreamType`]). If `Unknown`, the
956/// stream will return strings as long as UTF-8 parsing succeeds, but will start returning binary
957/// if it fails.
958pub struct Chunks {
959    reader: BufReader<SourceReader>,
960    pos: u64,
961    error: bool,
962    span: Span,
963    signals: Signals,
964    type_: ByteStreamType,
965}
966
967impl Chunks {
968    fn new(reader: SourceReader, span: Span, signals: Signals, type_: ByteStreamType) -> Self {
969        Self {
970            reader: BufReader::new(reader),
971            pos: 0,
972            error: false,
973            span,
974            signals,
975            type_,
976        }
977    }
978
979    pub fn span(&self) -> Span {
980        self.span
981    }
982
983    fn next_string(&mut self) -> Result<Option<String>, (Vec<u8>, ShellError)> {
984        let from_io_error = |err: std::io::Error| match ShellErrorBridge::try_from(err) {
985            Ok(err) => err.0,
986            Err(err) => IoError::new(err, self.span, None).into(),
987        };
988
989        // Get some data from the reader
990        let buf = self
991            .reader
992            .fill_buf()
993            .map_err(from_io_error)
994            .map_err(|err| (vec![], err))?;
995
996        // If empty, this is EOF
997        if buf.is_empty() {
998            return Ok(None);
999        }
1000
1001        let mut buf = buf.to_vec();
1002        let mut consumed = 0;
1003
1004        // If the buf length is under 4 bytes, it could be invalid, so try to get more
1005        if buf.len() < 4 {
1006            consumed += buf.len();
1007            self.reader.consume(buf.len());
1008            match self.reader.fill_buf() {
1009                Ok(more_bytes) => buf.extend_from_slice(more_bytes),
1010                Err(err) => return Err((buf, from_io_error(err))),
1011            }
1012        }
1013
1014        // Try to parse utf-8 and decide what to do
1015        match String::from_utf8(buf) {
1016            Ok(string) => {
1017                self.reader.consume(string.len() - consumed);
1018                self.pos += string.len() as u64;
1019                Ok(Some(string))
1020            }
1021            Err(err) if err.utf8_error().error_len().is_none() => {
1022                // There is some valid data at the beginning, and this is just incomplete, so just
1023                // consume that and return it
1024                let valid_up_to = err.utf8_error().valid_up_to();
1025                if valid_up_to > consumed {
1026                    self.reader.consume(valid_up_to - consumed);
1027                }
1028                let mut buf = err.into_bytes();
1029                buf.truncate(valid_up_to);
1030                buf.shrink_to_fit();
1031                let string = String::from_utf8(buf)
1032                    .expect("failed to parse utf-8 even after correcting error");
1033                self.pos += string.len() as u64;
1034                Ok(Some(string))
1035            }
1036            Err(err) => {
1037                // There is an error at the beginning and we have no hope of parsing further.
1038                let shell_error = ShellError::NonUtf8Custom {
1039                    msg: format!("invalid utf-8 sequence starting at index {}", self.pos),
1040                    span: self.span,
1041                };
1042                let buf = err.into_bytes();
1043                // We are consuming the entire buf though, because we're returning it in case it
1044                // will be cast to binary
1045                if buf.len() > consumed {
1046                    self.reader.consume(buf.len() - consumed);
1047                }
1048                self.pos += buf.len() as u64;
1049                Err((buf, shell_error))
1050            }
1051        }
1052    }
1053}
1054
1055impl Iterator for Chunks {
1056    type Item = Result<Value, ShellError>;
1057
1058    fn next(&mut self) -> Option<Self::Item> {
1059        if self.error || self.signals.interrupted() {
1060            None
1061        } else {
1062            match self.type_ {
1063                // Binary should always be binary
1064                ByteStreamType::Binary => {
1065                    let buf = match self.reader.fill_buf() {
1066                        Ok(buf) => buf,
1067                        Err(err) => {
1068                            self.error = true;
1069                            return Some(Err(ShellError::Io(IoError::new(err, self.span, None))));
1070                        }
1071                    };
1072                    if !buf.is_empty() {
1073                        let len = buf.len();
1074                        let value = Value::binary(buf, self.span);
1075                        self.reader.consume(len);
1076                        self.pos += len as u64;
1077                        Some(Ok(value))
1078                    } else {
1079                        None
1080                    }
1081                }
1082                // String produces an error if UTF-8 can't be parsed
1083                ByteStreamType::String => match self.next_string().transpose()? {
1084                    Ok(string) => Some(Ok(Value::string(string, self.span))),
1085                    Err((_, err)) => {
1086                        self.error = true;
1087                        Some(Err(err))
1088                    }
1089                },
1090                // For Unknown, we try to create strings, but we switch to binary mode if we
1091                // fail
1092                ByteStreamType::Unknown => {
1093                    match self.next_string().transpose()? {
1094                        Ok(string) => Some(Ok(Value::string(string, self.span))),
1095                        Err((buf, _)) if !buf.is_empty() => {
1096                            // Switch to binary mode
1097                            self.type_ = ByteStreamType::Binary;
1098                            Some(Ok(Value::binary(buf, self.span)))
1099                        }
1100                        Err((_, err)) => {
1101                            self.error = true;
1102                            Some(Err(err))
1103                        }
1104                    }
1105                }
1106            }
1107        }
1108    }
1109}
1110
1111fn trim_end_newline(string: &mut String) {
1112    if string.ends_with('\n') {
1113        string.pop();
1114        if string.ends_with('\r') {
1115            string.pop();
1116        }
1117    }
1118}
1119
1120#[cfg(unix)]
1121pub(crate) fn convert_file<T: From<OwnedFd>>(file: impl Into<OwnedFd>) -> T {
1122    file.into().into()
1123}
1124
1125#[cfg(windows)]
1126pub(crate) fn convert_file<T: From<OwnedHandle>>(file: impl Into<OwnedHandle>) -> T {
1127    file.into().into()
1128}
1129
1130const DEFAULT_BUF_SIZE: usize = 8192;
1131
1132pub fn copy_with_signals(
1133    mut reader: impl Read,
1134    mut writer: impl Write,
1135    span: Span,
1136    signals: &Signals,
1137) -> Result<u64, ShellError> {
1138    let from_io_error = IoError::factory(span, None);
1139    if signals.is_empty() {
1140        match io::copy(&mut reader, &mut writer) {
1141            Ok(n) => {
1142                writer.flush().map_err(&from_io_error)?;
1143                Ok(n)
1144            }
1145            Err(err) => {
1146                let _ = writer.flush();
1147                match ShellErrorBridge::try_from(err) {
1148                    Ok(ShellErrorBridge(shell_error)) => Err(shell_error),
1149                    Err(err) => Err(from_io_error(err).into()),
1150                }
1151            }
1152        }
1153    } else {
1154        // #[cfg(any(target_os = "linux", target_os = "android"))]
1155        // {
1156        //     return crate::sys::kernel_copy::copy_spec(reader, writer);
1157        // }
1158        match generic_copy(&mut reader, &mut writer, span, signals) {
1159            Ok(len) => {
1160                writer.flush().map_err(&from_io_error)?;
1161                Ok(len)
1162            }
1163            Err(err) => {
1164                let _ = writer.flush();
1165                Err(err)
1166            }
1167        }
1168    }
1169}
1170
1171// Copied from [`std::io::copy`]
1172fn generic_copy(
1173    mut reader: impl Read,
1174    mut writer: impl Write,
1175    span: Span,
1176    signals: &Signals,
1177) -> Result<u64, ShellError> {
1178    let from_io_error = IoError::factory(span, None);
1179    let buf = &mut [0; DEFAULT_BUF_SIZE];
1180    let mut len = 0;
1181    loop {
1182        signals.check(&span)?;
1183        let n = match reader.read(buf) {
1184            Ok(0) => break,
1185            Ok(n) => n,
1186            Err(e) if e.kind() == ErrorKind::Interrupted => continue,
1187            Err(e) => match ShellErrorBridge::try_from(e) {
1188                Ok(ShellErrorBridge(e)) => return Err(e),
1189                Err(e) => return Err(from_io_error(e).into()),
1190            },
1191        };
1192        len += n;
1193        writer.write_all(&buf[..n]).map_err(&from_io_error)?;
1194    }
1195    Ok(len as u64)
1196}
1197
1198struct ReadGenerator<F>
1199where
1200    F: FnMut(&mut Vec<u8>) -> Result<bool, ShellError> + Send + 'static,
1201{
1202    buffer: Cursor<Vec<u8>>,
1203    generator: F,
1204}
1205
1206impl<F> BufRead for ReadGenerator<F>
1207where
1208    F: FnMut(&mut Vec<u8>) -> Result<bool, ShellError> + Send + 'static,
1209{
1210    fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
1211        // We have to loop, because it's important that we don't leave the buffer empty unless we're
1212        // truly at the end of the stream.
1213        while self.buffer.fill_buf()?.is_empty() {
1214            // Reset the cursor to the beginning and truncate
1215            self.buffer.set_position(0);
1216            self.buffer.get_mut().clear();
1217            // Ask the generator to generate data
1218            if !(self.generator)(self.buffer.get_mut()).map_err(ShellErrorBridge)? {
1219                // End of stream
1220                break;
1221            }
1222        }
1223        self.buffer.fill_buf()
1224    }
1225
1226    fn consume(&mut self, amt: usize) {
1227        self.buffer.consume(amt);
1228    }
1229}
1230
1231impl<F> Read for ReadGenerator<F>
1232where
1233    F: FnMut(&mut Vec<u8>) -> Result<bool, ShellError> + Send + 'static,
1234{
1235    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
1236        // Straightforward implementation on top of BufRead
1237        let slice = self.fill_buf()?;
1238        let len = buf.len().min(slice.len());
1239        buf[..len].copy_from_slice(&slice[..len]);
1240        self.consume(len);
1241        Ok(len)
1242    }
1243}
1244
1245#[cfg(test)]
1246mod tests {
1247    use super::*;
1248
1249    fn test_chunks<T>(data: Vec<T>, type_: ByteStreamType) -> Chunks
1250    where
1251        T: AsRef<[u8]> + Default + Send + 'static,
1252    {
1253        let reader = ReadIterator {
1254            iter: data.into_iter(),
1255            cursor: Some(Cursor::new(T::default())),
1256        };
1257        Chunks::new(
1258            SourceReader::Read(Box::new(reader)),
1259            Span::test_data(),
1260            Signals::empty(),
1261            type_,
1262        )
1263    }
1264
1265    #[test]
1266    fn chunks_read_binary_passthrough() {
1267        let bins = vec![&[0, 1][..], &[2, 3][..]];
1268        let iter = test_chunks(bins.clone(), ByteStreamType::Binary);
1269
1270        let bins_values: Vec<Value> = bins
1271            .into_iter()
1272            .map(|bin| Value::binary(bin, Span::test_data()))
1273            .collect();
1274        assert_eq!(
1275            bins_values,
1276            iter.collect::<Result<Vec<Value>, _>>().expect("error")
1277        );
1278    }
1279
1280    #[test]
1281    fn chunks_read_string_clean() {
1282        let strs = vec!["Nushell", "が好きです"];
1283        let iter = test_chunks(strs.clone(), ByteStreamType::String);
1284
1285        let strs_values: Vec<Value> = strs
1286            .into_iter()
1287            .map(|string| Value::string(string, Span::test_data()))
1288            .collect();
1289        assert_eq!(
1290            strs_values,
1291            iter.collect::<Result<Vec<Value>, _>>().expect("error")
1292        );
1293    }
1294
1295    #[test]
1296    fn chunks_read_string_split_boundary() {
1297        let real = "Nushell最高!";
1298        let chunks = vec![&b"Nushell\xe6"[..], &b"\x9c\x80\xe9"[..], &b"\xab\x98!"[..]];
1299        let iter = test_chunks(chunks.clone(), ByteStreamType::String);
1300
1301        let mut string = String::new();
1302        for value in iter {
1303            let chunk_string = value.expect("error").into_string().expect("not a string");
1304            string.push_str(&chunk_string);
1305        }
1306        assert_eq!(real, string);
1307    }
1308
1309    #[test]
1310    fn chunks_read_string_utf8_error() {
1311        let chunks = vec![&b"Nushell\xe6"[..], &b"\x9c\x80\xe9"[..], &b"\xab"[..]];
1312        let iter = test_chunks(chunks, ByteStreamType::String);
1313
1314        let mut string = String::new();
1315        for value in iter {
1316            match value {
1317                Ok(value) => string.push_str(&value.into_string().expect("not a string")),
1318                Err(err) => {
1319                    println!("string so far: {string:?}");
1320                    println!("got error: {err:?}");
1321                    assert!(!string.is_empty());
1322                    assert!(matches!(err, ShellError::NonUtf8Custom { .. }));
1323                    return;
1324                }
1325            }
1326        }
1327        panic!("no error");
1328    }
1329
1330    #[test]
1331    fn chunks_read_unknown_fallback() {
1332        let chunks = vec![&b"Nushell"[..], &b"\x9c\x80\xe9abcd"[..], &b"efgh"[..]];
1333        let mut iter = test_chunks(chunks, ByteStreamType::Unknown);
1334
1335        let mut get = || iter.next().expect("end of iter").expect("error");
1336
1337        assert_eq!(Value::test_string("Nushell"), get());
1338        assert_eq!(Value::test_binary(b"\x9c\x80\xe9abcd"), get());
1339        // Once it's in binary mode it won't go back
1340        assert_eq!(Value::test_binary(b"efgh"), get());
1341    }
1342}