nu_protocol/pipeline/
byte_stream.rs

1//! Module managing the streaming of raw bytes between pipeline elements
2//!
3//! This module also handles conversions the [`ShellError`] <-> [`io::Error`](std::io::Error),
4//! so remember the usage of [`ShellErrorBridge`] where applicable.
5#[cfg(feature = "os")]
6use crate::process::{ChildPipe, ChildProcess};
7use crate::{
8    IntRange, PipelineData, ShellError, Signals, Span, Type, Value,
9    shell_error::{bridge::ShellErrorBridge, io::IoError},
10};
11use nu_utils::SplitRead as SplitReadInner;
12use serde::{Deserialize, Serialize};
13use std::ops::Bound;
14#[cfg(unix)]
15use std::os::fd::OwnedFd;
16#[cfg(windows)]
17use std::os::windows::io::OwnedHandle;
18use std::{
19    fmt::Debug,
20    fs::File,
21    io::{self, BufRead, BufReader, Cursor, ErrorKind, Read, Write},
22    process::Stdio,
23};
24
25/// The source of bytes for a [`ByteStream`].
26///
27/// Currently, there are only three possibilities:
28/// 1. `Read` (any `dyn` type that implements [`Read`])
29/// 2. [`File`]
30/// 3. [`ChildProcess`]
31pub enum ByteStreamSource {
32    Read(Box<dyn Read + Send + 'static>),
33    File(File),
34    #[cfg(feature = "os")]
35    Child(Box<ChildProcess>),
36}
37
38impl ByteStreamSource {
39    fn reader(self) -> Option<SourceReader> {
40        match self {
41            ByteStreamSource::Read(read) => Some(SourceReader::Read(read)),
42            ByteStreamSource::File(file) => Some(SourceReader::File(file)),
43            #[cfg(feature = "os")]
44            ByteStreamSource::Child(mut child) => child.stdout.take().map(|stdout| match stdout {
45                ChildPipe::Pipe(pipe) => SourceReader::File(convert_file(pipe)),
46                ChildPipe::Tee(tee) => SourceReader::Read(tee),
47            }),
48        }
49    }
50
51    /// Source is a `Child` or `File`, rather than `Read`. Currently affects trimming
52    #[cfg(feature = "os")]
53    pub fn is_external(&self) -> bool {
54        matches!(self, ByteStreamSource::Child(..))
55    }
56
57    #[cfg(not(feature = "os"))]
58    pub fn is_external(&self) -> bool {
59        // without os support we never have externals
60        false
61    }
62}
63
64impl Debug for ByteStreamSource {
65    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        match self {
67            ByteStreamSource::Read(_) => f.debug_tuple("Read").field(&"..").finish(),
68            ByteStreamSource::File(file) => f.debug_tuple("File").field(file).finish(),
69            #[cfg(feature = "os")]
70            ByteStreamSource::Child(child) => f.debug_tuple("Child").field(child).finish(),
71        }
72    }
73}
74
75enum SourceReader {
76    Read(Box<dyn Read + Send + 'static>),
77    File(File),
78}
79
80impl Read for SourceReader {
81    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
82        match self {
83            SourceReader::Read(reader) => reader.read(buf),
84            SourceReader::File(file) => file.read(buf),
85        }
86    }
87}
88
89impl Debug for SourceReader {
90    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91        match self {
92            SourceReader::Read(_) => f.debug_tuple("Read").field(&"..").finish(),
93            SourceReader::File(file) => f.debug_tuple("File").field(file).finish(),
94        }
95    }
96}
97
98/// Optional type color for [`ByteStream`], which determines type compatibility.
99#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
100pub enum ByteStreamType {
101    /// Compatible with [`Type::Binary`], and should only be converted to binary, even when the
102    /// desired type is unknown.
103    Binary,
104    /// Compatible with [`Type::String`], and should only be converted to string, even when the
105    /// desired type is unknown.
106    ///
107    /// This does not guarantee valid UTF-8 data, but it is conventionally so. Converting to
108    /// `String` still requires validation of the data.
109    String,
110    /// Unknown whether the stream should contain binary or string data. This usually is the result
111    /// of an external stream, e.g. an external command or file.
112    #[default]
113    Unknown,
114}
115
116impl ByteStreamType {
117    /// Returns the string that describes the byte stream type - i.e., the same as what `describe`
118    /// produces. This can be used in type mismatch error messages.
119    pub fn describe(self) -> &'static str {
120        match self {
121            ByteStreamType::Binary => "binary (stream)",
122            ByteStreamType::String => "string (stream)",
123            ByteStreamType::Unknown => "byte stream",
124        }
125    }
126
127    /// Returns true if the type is `Binary` or `Unknown`
128    pub fn is_binary_coercible(self) -> bool {
129        matches!(self, ByteStreamType::Binary | ByteStreamType::Unknown)
130    }
131
132    /// Returns true if the type is `String` or `Unknown`
133    pub fn is_string_coercible(self) -> bool {
134        matches!(self, ByteStreamType::String | ByteStreamType::Unknown)
135    }
136}
137
138impl From<ByteStreamType> for Type {
139    fn from(value: ByteStreamType) -> Self {
140        match value {
141            ByteStreamType::Binary => Type::Binary,
142            ByteStreamType::String => Type::String,
143            ByteStreamType::Unknown => Type::Any,
144        }
145    }
146}
147
148/// A potentially infinite, interruptible stream of bytes.
149///
150/// To create a [`ByteStream`], you can use any of the following methods:
151/// - [`read`](ByteStream::read): takes any type that implements [`Read`].
152/// - [`file`](ByteStream::file): takes a [`File`].
153/// - [`from_iter`](ByteStream::from_iter): takes an [`Iterator`] whose items implement `AsRef<[u8]>`.
154/// - [`from_result_iter`](ByteStream::from_result_iter): same as [`from_iter`](ByteStream::from_iter),
155///   but each item is a `Result<T, ShellError>`.
156/// - [`from_fn`](ByteStream::from_fn): uses a generator function to fill a buffer whenever it is
157///   empty. This has high performance because it doesn't need to allocate for each chunk of data,
158///   and can just reuse the same buffer.
159///
160/// Byte streams have a [type](.type_()) which is used to preserve type compatibility when they
161/// are the result of an internal command. It is important that this be set to the correct value.
162/// [`Unknown`](ByteStreamType::Unknown) is used only for external sources where the type can not
163/// be inherently determined, and having it automatically act as a string or binary depending on
164/// whether it parses as UTF-8 or not is desirable.
165///
166/// The data of a [`ByteStream`] can be accessed using one of the following methods:
167/// - [`reader`](ByteStream::reader): returns a [`Read`]-able type to get the raw bytes in the stream.
168/// - [`lines`](ByteStream::lines): splits the bytes on lines and returns an [`Iterator`]
169///   where each item is a `Result<String, ShellError>`.
170/// - [`chunks`](ByteStream::chunks): returns an [`Iterator`] of [`Value`]s where each value is
171///   either a string or binary.
172///   Try not to use this method if possible. Rather, please use [`reader`](ByteStream::reader)
173///   (or [`lines`](ByteStream::lines) if it matches the situation).
174///
175/// Additionally, there are few methods to collect a [`ByteStream`] into memory:
176/// - [`into_bytes`](ByteStream::into_bytes): collects all bytes into a [`Vec<u8>`].
177/// - [`into_string`](ByteStream::into_string): collects all bytes into a [`String`], erroring if utf-8 decoding failed.
178/// - [`into_value`](ByteStream::into_value): collects all bytes into a value typed appropriately
179///   for the [type](.type_()) of this stream. If the type is [`Unknown`](ByteStreamType::Unknown),
180///   it will produce a string value if the data is valid UTF-8, or a binary value otherwise.
181///
182/// There are also a few other methods to consume all the data of a [`ByteStream`]:
183/// - [`drain`](ByteStream::drain): consumes all bytes and outputs nothing.
184/// - [`write_to`](ByteStream::write_to): writes all bytes to the given [`Write`] destination.
185/// - [`print`](ByteStream::print): a convenience wrapper around [`write_to`](ByteStream::write_to).
186///   It prints all bytes to stdout or stderr.
187///
188/// Internally, [`ByteStream`]s currently come in three flavors according to [`ByteStreamSource`].
189/// See its documentation for more information.
190#[derive(Debug)]
191pub struct ByteStream {
192    stream: ByteStreamSource,
193    span: Span,
194    signals: Signals,
195    type_: ByteStreamType,
196    known_size: Option<u64>,
197    caller_spans: Vec<Span>,
198}
199
200impl ByteStream {
201    /// Create a new [`ByteStream`] from a [`ByteStreamSource`].
202    pub fn new(
203        stream: ByteStreamSource,
204        span: Span,
205        signals: Signals,
206        type_: ByteStreamType,
207    ) -> Self {
208        Self {
209            stream,
210            span,
211            signals,
212            type_,
213            known_size: None,
214            caller_spans: vec![],
215        }
216    }
217
218    /// Push a caller [`Span`] to the bytestream, it's useful to construct a backtrace.
219    pub fn push_caller_span(&mut self, span: Span) {
220        if span != self.span {
221            self.caller_spans.push(span)
222        }
223    }
224
225    /// Get all caller [`Span`], it's useful to construct a backtrace.
226    pub fn get_caller_spans(&self) -> &Vec<Span> {
227        &self.caller_spans
228    }
229
230    /// Create a [`ByteStream`] from an arbitrary reader. The type must be provided.
231    pub fn read(
232        reader: impl Read + Send + 'static,
233        span: Span,
234        signals: Signals,
235        type_: ByteStreamType,
236    ) -> Self {
237        Self::new(
238            ByteStreamSource::Read(Box::new(reader)),
239            span,
240            signals,
241            type_,
242        )
243    }
244
245    pub fn skip(self, span: Span, n: u64) -> Result<Self, ShellError> {
246        let known_size = self.known_size.map(|len| len.saturating_sub(n));
247        if let Some(mut reader) = self.reader() {
248            // Copy the number of skipped bytes into the sink before proceeding
249            io::copy(&mut (&mut reader).take(n), &mut io::sink())
250                .map_err(|err| IoError::new(err, span, None))?;
251            Ok(
252                ByteStream::read(reader, span, Signals::empty(), ByteStreamType::Binary)
253                    .with_known_size(known_size),
254            )
255        } else {
256            Err(ShellError::TypeMismatch {
257                err_message: "expected readable stream".into(),
258                span,
259            })
260        }
261    }
262
263    pub fn take(self, span: Span, n: u64) -> Result<Self, ShellError> {
264        let known_size = self.known_size.map(|s| s.min(n));
265        if let Some(reader) = self.reader() {
266            Ok(ByteStream::read(
267                reader.take(n),
268                span,
269                Signals::empty(),
270                ByteStreamType::Binary,
271            )
272            .with_known_size(known_size))
273        } else {
274            Err(ShellError::TypeMismatch {
275                err_message: "expected readable stream".into(),
276                span,
277            })
278        }
279    }
280
281    pub fn slice(
282        self,
283        val_span: Span,
284        call_span: Span,
285        range: IntRange,
286    ) -> Result<Self, ShellError> {
287        if let Some(len) = self.known_size {
288            let start = range.absolute_start(len);
289            let stream = self.skip(val_span, start);
290
291            match range.absolute_end(len) {
292                Bound::Unbounded => stream,
293                Bound::Included(end) | Bound::Excluded(end) if end < start => {
294                    stream.and_then(|s| s.take(val_span, 0))
295                }
296                Bound::Included(end) => {
297                    let distance = end - start + 1;
298                    stream.and_then(|s| s.take(val_span, distance.min(len)))
299                }
300                Bound::Excluded(end) => {
301                    let distance = end - start;
302                    stream.and_then(|s| s.take(val_span, distance.min(len)))
303                }
304            }
305        } else if range.is_relative() {
306            Err(ShellError::RelativeRangeOnInfiniteStream { span: call_span })
307        } else {
308            let start = range.start() as u64;
309            let stream = self.skip(val_span, start);
310
311            match range.distance() {
312                Bound::Unbounded => stream,
313                Bound::Included(distance) => stream.and_then(|s| s.take(val_span, distance + 1)),
314                Bound::Excluded(distance) => stream.and_then(|s| s.take(val_span, distance)),
315            }
316        }
317    }
318
319    /// Create a [`ByteStream`] from a string. The type of the stream is always `String`.
320    pub fn read_string(string: String, span: Span, signals: Signals) -> Self {
321        let len = string.len();
322        ByteStream::read(
323            Cursor::new(string.into_bytes()),
324            span,
325            signals,
326            ByteStreamType::String,
327        )
328        .with_known_size(Some(len as u64))
329    }
330
331    /// Create a [`ByteStream`] from a byte vector. The type of the stream is always `Binary`.
332    pub fn read_binary(bytes: Vec<u8>, span: Span, signals: Signals) -> Self {
333        let len = bytes.len();
334        ByteStream::read(Cursor::new(bytes), span, signals, ByteStreamType::Binary)
335            .with_known_size(Some(len as u64))
336    }
337
338    /// Create a [`ByteStream`] from a file.
339    ///
340    /// The type is implicitly `Unknown`, as it's not typically known whether files will
341    /// return text or binary.
342    pub fn file(file: File, span: Span, signals: Signals) -> Self {
343        Self::new(
344            ByteStreamSource::File(file),
345            span,
346            signals,
347            ByteStreamType::Unknown,
348        )
349    }
350
351    /// Create a [`ByteStream`] from a child process's stdout and stderr.
352    ///
353    /// The type is implicitly `Unknown`, as it's not typically known whether child processes will
354    /// return text or binary.
355    #[cfg(feature = "os")]
356    pub fn child(child: ChildProcess, span: Span) -> Self {
357        Self::new(
358            ByteStreamSource::Child(Box::new(child)),
359            span,
360            Signals::empty(),
361            ByteStreamType::Unknown,
362        )
363    }
364
365    /// Create a [`ByteStream`] that reads from stdin.
366    ///
367    /// The type is implicitly `Unknown`, as it's not typically known whether stdin is text or
368    /// binary.
369    #[cfg(feature = "os")]
370    pub fn stdin(span: Span) -> Result<Self, ShellError> {
371        let stdin = os_pipe::dup_stdin().map_err(|err| IoError::new(err, span, None))?;
372        let source = ByteStreamSource::File(convert_file(stdin));
373        Ok(Self::new(
374            source,
375            span,
376            Signals::empty(),
377            ByteStreamType::Unknown,
378        ))
379    }
380
381    #[cfg(not(feature = "os"))]
382    pub fn stdin(span: Span) -> Result<Self, ShellError> {
383        Err(ShellError::DisabledOsSupport {
384            msg: "Stdin is not supported".to_string(),
385            span,
386        })
387    }
388
389    /// Create a [`ByteStream`] from a generator function that writes data to the given buffer
390    /// when called, and returns `Ok(false)` on end of stream.
391    pub fn from_fn(
392        span: Span,
393        signals: Signals,
394        type_: ByteStreamType,
395        generator: impl FnMut(&mut Vec<u8>) -> Result<bool, ShellError> + Send + 'static,
396    ) -> Self {
397        Self::read(
398            ReadGenerator {
399                buffer: Cursor::new(Vec::new()),
400                generator,
401            },
402            span,
403            signals,
404            type_,
405        )
406    }
407
408    pub fn with_type(mut self, type_: ByteStreamType) -> Self {
409        self.type_ = type_;
410        self
411    }
412
413    /// Create a new [`ByteStream`] from an [`Iterator`] of bytes slices.
414    ///
415    /// The returned [`ByteStream`] will have a [`ByteStreamSource`] of `Read`.
416    pub fn from_iter<I>(iter: I, span: Span, signals: Signals, type_: ByteStreamType) -> Self
417    where
418        I: IntoIterator,
419        I::IntoIter: Send + 'static,
420        I::Item: AsRef<[u8]> + Default + Send + 'static,
421    {
422        let iter = iter.into_iter();
423        let cursor = Some(Cursor::new(I::Item::default()));
424        Self::read(ReadIterator { iter, cursor }, span, signals, type_)
425    }
426
427    /// Create a new [`ByteStream`] from an [`Iterator`] of [`Result`] bytes slices.
428    ///
429    /// The returned [`ByteStream`] will have a [`ByteStreamSource`] of `Read`.
430    pub fn from_result_iter<I, T>(
431        iter: I,
432        span: Span,
433        signals: Signals,
434        type_: ByteStreamType,
435    ) -> Self
436    where
437        I: IntoIterator<Item = Result<T, ShellError>>,
438        I::IntoIter: Send + 'static,
439        T: AsRef<[u8]> + Default + Send + 'static,
440    {
441        let iter = iter.into_iter();
442        let cursor = Some(Cursor::new(T::default()));
443        Self::read(ReadResultIterator { iter, cursor }, span, signals, type_)
444    }
445
446    /// Set the known size, in number of bytes, of the [`ByteStream`].
447    pub fn with_known_size(mut self, size: Option<u64>) -> Self {
448        self.known_size = size;
449        self
450    }
451
452    /// Get a reference to the inner [`ByteStreamSource`] of the [`ByteStream`].
453    pub fn source(&self) -> &ByteStreamSource {
454        &self.stream
455    }
456
457    /// Get a mutable reference to the inner [`ByteStreamSource`] of the [`ByteStream`].
458    pub fn source_mut(&mut self) -> &mut ByteStreamSource {
459        &mut self.stream
460    }
461
462    /// Returns the [`Span`] associated with the [`ByteStream`].
463    pub fn span(&self) -> Span {
464        self.span
465    }
466
467    /// Changes the [`Span`] associated with the [`ByteStream`].
468    pub fn with_span(mut self, span: Span) -> Self {
469        self.span = span;
470        self
471    }
472
473    /// Returns the [`ByteStreamType`] associated with the [`ByteStream`].
474    pub fn type_(&self) -> ByteStreamType {
475        self.type_
476    }
477
478    /// Returns the known size, in number of bytes, of the [`ByteStream`].
479    pub fn known_size(&self) -> Option<u64> {
480        self.known_size
481    }
482
483    /// Convert the [`ByteStream`] into its [`Reader`] which allows one to [`Read`] the raw bytes of the stream.
484    ///
485    /// [`Reader`] is buffered and also implements [`BufRead`].
486    ///
487    /// If the source of the [`ByteStream`] is [`ByteStreamSource::Child`] and the child has no stdout,
488    /// then the stream is considered empty and `None` will be returned.
489    pub fn reader(self) -> Option<Reader> {
490        let reader = self.stream.reader()?;
491        Some(Reader {
492            reader: BufReader::new(reader),
493            span: self.span,
494            signals: self.signals,
495        })
496    }
497
498    /// Convert the [`ByteStream`] into a [`Lines`] iterator where each element is a `Result<String, ShellError>`.
499    ///
500    /// There is no limit on how large each line will be. Ending new lines (`\n` or `\r\n`) are
501    /// stripped from each line. If a line fails to be decoded as utf-8, then it will become a [`ShellError`].
502    ///
503    /// If the source of the [`ByteStream`] is [`ByteStreamSource::Child`] and the child has no stdout,
504    /// then the stream is considered empty and `None` will be returned.
505    pub fn lines(self) -> Option<Lines> {
506        let reader = self.stream.reader()?;
507        Some(Lines {
508            reader: BufReader::new(reader),
509            span: self.span,
510            signals: self.signals,
511        })
512    }
513
514    /// Convert the [`ByteStream`] into a [`SplitRead`] iterator where each element is a `Result<String, ShellError>`.
515    ///
516    /// Each call to [`next`](Iterator::next) reads the currently available data from the byte
517    /// stream source, until `delimiter` or the end of the stream is encountered.
518    ///
519    /// If the source of the [`ByteStream`] is [`ByteStreamSource::Child`] and the child has no stdout,
520    /// then the stream is considered empty and `None` will be returned.
521    pub fn split(self, delimiter: Vec<u8>) -> Option<SplitRead> {
522        let reader = self.stream.reader()?;
523        Some(SplitRead::new(reader, delimiter, self.span, self.signals))
524    }
525
526    /// Convert the [`ByteStream`] into a [`Chunks`] iterator where each element is a `Result<Value, ShellError>`.
527    ///
528    /// Each call to [`next`](Iterator::next) reads the currently available data from the byte stream source,
529    /// up to a maximum size. The values are typed according to the [type](.type_()) of the
530    /// stream, and if that type is [`Unknown`](ByteStreamType::Unknown), string values will be
531    /// produced as long as the stream continues to parse as valid UTF-8, but binary values will
532    /// be produced instead of the stream fails to parse as UTF-8 instead at any point.
533    /// Any and all newlines are kept intact in each chunk.
534    ///
535    /// Where possible, prefer [`reader`](ByteStream::reader) or [`lines`](ByteStream::lines) over this method.
536    /// Those methods are more likely to be used in a semantically correct way
537    /// (and [`reader`](ByteStream::reader) is more efficient too).
538    ///
539    /// If the source of the [`ByteStream`] is [`ByteStreamSource::Child`] and the child has no stdout,
540    /// then the stream is considered empty and `None` will be returned.
541    pub fn chunks(self) -> Option<Chunks> {
542        let reader = self.stream.reader()?;
543        Some(Chunks::new(reader, self.span, self.signals, self.type_))
544    }
545
546    /// Convert the [`ByteStream`] into its inner [`ByteStreamSource`].
547    pub fn into_source(self) -> ByteStreamSource {
548        self.stream
549    }
550
551    /// Attempt to convert the [`ByteStream`] into a [`Stdio`].
552    ///
553    /// This will succeed if the [`ByteStreamSource`] of the [`ByteStream`] is either:
554    /// - [`File`](ByteStreamSource::File)
555    /// - [`Child`](ByteStreamSource::Child) and the child has a stdout that is `Some(ChildPipe::Pipe(..))`.
556    ///
557    /// All other cases return an `Err` with the original [`ByteStream`] in it.
558    pub fn into_stdio(mut self) -> Result<Stdio, Self> {
559        match self.stream {
560            ByteStreamSource::Read(..) => Err(self),
561            ByteStreamSource::File(file) => Ok(file.into()),
562            #[cfg(feature = "os")]
563            ByteStreamSource::Child(child) => {
564                if let ChildProcess {
565                    stdout: Some(ChildPipe::Pipe(stdout)),
566                    stderr,
567                    ..
568                } = *child
569                {
570                    debug_assert!(stderr.is_none(), "stderr should not exist");
571                    Ok(stdout.into())
572                } else {
573                    self.stream = ByteStreamSource::Child(child);
574                    Err(self)
575                }
576            }
577        }
578    }
579
580    /// Attempt to convert the [`ByteStream`] into a [`ChildProcess`].
581    ///
582    /// This will only succeed if the [`ByteStreamSource`] of the [`ByteStream`] is [`Child`](ByteStreamSource::Child).
583    /// All other cases return an `Err` with the original [`ByteStream`] in it.
584    #[cfg(feature = "os")]
585    pub fn into_child(self) -> Result<ChildProcess, Self> {
586        if let ByteStreamSource::Child(child) = self.stream {
587            Ok(*child)
588        } else {
589            Err(self)
590        }
591    }
592
593    /// Collect all the bytes of the [`ByteStream`] into a [`Vec<u8>`].
594    ///
595    /// Any trailing new lines are kept in the returned [`Vec`].
596    pub fn into_bytes(self) -> Result<Vec<u8>, ShellError> {
597        // todo!() ctrlc
598        let from_io_error = IoError::factory(self.span, None);
599        match self.stream {
600            ByteStreamSource::Read(mut read) => {
601                let mut buf = Vec::new();
602                read.read_to_end(&mut buf).map_err(|err| {
603                    match ShellErrorBridge::try_from(err) {
604                        Ok(ShellErrorBridge(err)) => err,
605                        Err(err) => ShellError::Io(from_io_error(err)),
606                    }
607                })?;
608                Ok(buf)
609            }
610            ByteStreamSource::File(mut file) => {
611                let mut buf = Vec::new();
612                file.read_to_end(&mut buf).map_err(&from_io_error)?;
613                Ok(buf)
614            }
615            #[cfg(feature = "os")]
616            ByteStreamSource::Child(child) => child.into_bytes(),
617        }
618    }
619
620    /// Collect the stream into a `String` in-memory. This can only succeed if the data contained is
621    /// valid UTF-8.
622    ///
623    /// The trailing new line (`\n` or `\r\n`), if any, is removed from the [`String`] prior to
624    /// being returned, if this is a stream coming from an external process or file.
625    ///
626    /// If the [type](.type_()) is specified as `Binary`, this operation always fails, even if the
627    /// data would have been valid UTF-8.
628    pub fn into_string(self) -> Result<String, ShellError> {
629        let span = self.span;
630        if self.type_.is_string_coercible() {
631            let trim = self.stream.is_external();
632            let bytes = self.into_bytes()?;
633            let mut string = String::from_utf8(bytes).map_err(|err| ShellError::NonUtf8Custom {
634                span,
635                msg: err.to_string(),
636            })?;
637            if trim {
638                trim_end_newline(&mut string);
639            }
640            Ok(string)
641        } else {
642            Err(ShellError::TypeMismatch {
643                err_message: "expected string, but got binary".into(),
644                span,
645            })
646        }
647    }
648
649    /// Collect all the bytes of the [`ByteStream`] into a [`Value`].
650    ///
651    /// If this is a `String` stream, the stream is decoded to UTF-8. If the stream came from an
652    /// external process or file, the trailing new line (`\n` or `\r\n`), if any, is removed from
653    /// the [`String`] prior to being returned.
654    ///
655    /// If this is a `Binary` stream, a [`Value::Binary`] is returned with any trailing new lines
656    /// preserved.
657    ///
658    /// If this is an `Unknown` stream, the behavior depends on whether the stream parses as valid
659    /// UTF-8 or not. If it does, this is uses the `String` behavior; if not, it uses the `Binary`
660    /// behavior.
661    pub fn into_value(self) -> Result<Value, ShellError> {
662        let span = self.span;
663        let trim = self.stream.is_external();
664        let value = match self.type_ {
665            // If the type is specified, then the stream should always become that type:
666            ByteStreamType::Binary => Value::binary(self.into_bytes()?, span),
667            ByteStreamType::String => Value::string(self.into_string()?, span),
668            // If the type is not specified, then it just depends on whether it parses or not:
669            ByteStreamType::Unknown => match String::from_utf8(self.into_bytes()?) {
670                Ok(mut str) => {
671                    if trim {
672                        trim_end_newline(&mut str);
673                    }
674                    Value::string(str, span)
675                }
676                Err(err) => Value::binary(err.into_bytes(), span),
677            },
678        };
679        Ok(value)
680    }
681
682    /// Consume and drop all bytes of the [`ByteStream`].
683    pub fn drain(self) -> Result<(), ShellError> {
684        match self.stream {
685            ByteStreamSource::Read(read) => {
686                copy_with_signals(read, io::sink(), self.span, &self.signals)?;
687                Ok(())
688            }
689            ByteStreamSource::File(_) => Ok(()),
690            #[cfg(feature = "os")]
691            ByteStreamSource::Child(child) => child.wait(),
692        }
693    }
694
695    /// Print all bytes of the [`ByteStream`] to stdout or stderr.
696    pub fn print(self, to_stderr: bool) -> Result<(), ShellError> {
697        if to_stderr {
698            self.write_to(&mut io::stderr())
699        } else {
700            self.write_to(&mut io::stdout())
701        }
702    }
703
704    /// Write all bytes of the [`ByteStream`] to `dest`.
705    pub fn write_to(self, dest: impl Write) -> Result<(), ShellError> {
706        let span = self.span;
707        let signals = &self.signals;
708        match self.stream {
709            ByteStreamSource::Read(read) => {
710                copy_with_signals(read, dest, span, signals)?;
711            }
712            ByteStreamSource::File(file) => {
713                copy_with_signals(file, dest, span, signals)?;
714            }
715            #[cfg(feature = "os")]
716            ByteStreamSource::Child(mut child) => {
717                // All `OutDest`s except `OutDest::PipeSeparate` will cause `stderr` to be `None`.
718                // Only `save`, `tee`, and `complete` set the stderr `OutDest` to `OutDest::PipeSeparate`,
719                // and those commands have proper simultaneous handling of stdout and stderr.
720                debug_assert!(child.stderr.is_none(), "stderr should not exist");
721
722                if let Some(stdout) = child.stdout.take() {
723                    match stdout {
724                        ChildPipe::Pipe(pipe) => {
725                            copy_with_signals(pipe, dest, span, signals)?;
726                        }
727                        ChildPipe::Tee(tee) => {
728                            copy_with_signals(tee, dest, span, signals)?;
729                        }
730                    }
731                }
732                child.wait()?;
733            }
734        }
735        Ok(())
736    }
737}
738
739impl From<ByteStream> for PipelineData {
740    fn from(stream: ByteStream) -> Self {
741        Self::byte_stream(stream, None)
742    }
743}
744
745struct ReadIterator<I>
746where
747    I: Iterator,
748    I::Item: AsRef<[u8]>,
749{
750    iter: I,
751    cursor: Option<Cursor<I::Item>>,
752}
753
754impl<I> Read for ReadIterator<I>
755where
756    I: Iterator,
757    I::Item: AsRef<[u8]>,
758{
759    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
760        while let Some(cursor) = self.cursor.as_mut() {
761            let read = cursor.read(buf)?;
762            if read == 0 {
763                self.cursor = self.iter.next().map(Cursor::new);
764            } else {
765                return Ok(read);
766            }
767        }
768        Ok(0)
769    }
770}
771
772struct ReadResultIterator<I, T>
773where
774    I: Iterator<Item = Result<T, ShellError>>,
775    T: AsRef<[u8]>,
776{
777    iter: I,
778    cursor: Option<Cursor<T>>,
779}
780
781impl<I, T> Read for ReadResultIterator<I, T>
782where
783    I: Iterator<Item = Result<T, ShellError>>,
784    T: AsRef<[u8]>,
785{
786    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
787        while let Some(cursor) = self.cursor.as_mut() {
788            let read = cursor.read(buf)?;
789            if read == 0 {
790                self.cursor = self
791                    .iter
792                    .next()
793                    .transpose()
794                    .map_err(ShellErrorBridge)?
795                    .map(Cursor::new);
796            } else {
797                return Ok(read);
798            }
799        }
800        Ok(0)
801    }
802}
803
804pub struct Reader {
805    reader: BufReader<SourceReader>,
806    span: Span,
807    signals: Signals,
808}
809
810impl Reader {
811    pub fn span(&self) -> Span {
812        self.span
813    }
814}
815
816impl Read for Reader {
817    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
818        self.signals.check(&self.span).map_err(ShellErrorBridge)?;
819        self.reader.read(buf)
820    }
821}
822
823impl BufRead for Reader {
824    fn fill_buf(&mut self) -> io::Result<&[u8]> {
825        self.reader.fill_buf()
826    }
827
828    fn consume(&mut self, amt: usize) {
829        self.reader.consume(amt)
830    }
831}
832
833pub struct Lines {
834    reader: BufReader<SourceReader>,
835    span: Span,
836    signals: Signals,
837}
838
839impl Lines {
840    pub fn span(&self) -> Span {
841        self.span
842    }
843}
844
845impl Iterator for Lines {
846    type Item = Result<String, ShellError>;
847
848    fn next(&mut self) -> Option<Self::Item> {
849        if self.signals.interrupted() {
850            None
851        } else {
852            let mut buf = Vec::new();
853            match self.reader.read_until(b'\n', &mut buf) {
854                Ok(0) => None,
855                Ok(_) => {
856                    let Ok(mut string) = String::from_utf8(buf) else {
857                        return Some(Err(ShellError::NonUtf8 { span: self.span }));
858                    };
859                    trim_end_newline(&mut string);
860                    Some(Ok(string))
861                }
862                Err(err) => Some(Err(IoError::new(err, self.span, None).into())),
863            }
864        }
865    }
866}
867
868pub struct SplitRead {
869    internal: SplitReadInner<BufReader<SourceReader>>,
870    span: Span,
871    signals: Signals,
872}
873
874impl SplitRead {
875    fn new(
876        reader: SourceReader,
877        delimiter: impl AsRef<[u8]>,
878        span: Span,
879        signals: Signals,
880    ) -> Self {
881        Self {
882            internal: SplitReadInner::new(BufReader::new(reader), delimiter),
883            span,
884            signals,
885        }
886    }
887
888    pub fn span(&self) -> Span {
889        self.span
890    }
891}
892
893impl Iterator for SplitRead {
894    type Item = Result<Vec<u8>, ShellError>;
895
896    fn next(&mut self) -> Option<Self::Item> {
897        if self.signals.interrupted() {
898            return None;
899        }
900        self.internal.next().map(|r| {
901            r.map_err(|err| {
902                ShellError::Io(IoError::new_internal(
903                    err,
904                    "Could not get next value for SplitRead",
905                    crate::location!(),
906                ))
907            })
908        })
909    }
910}
911
912/// Turn a readable stream into [`Value`]s.
913///
914/// The `Value` type depends on the type of the stream ([`ByteStreamType`]). If `Unknown`, the
915/// stream will return strings as long as UTF-8 parsing succeeds, but will start returning binary
916/// if it fails.
917pub struct Chunks {
918    reader: BufReader<SourceReader>,
919    pos: u64,
920    error: bool,
921    span: Span,
922    signals: Signals,
923    type_: ByteStreamType,
924}
925
926impl Chunks {
927    fn new(reader: SourceReader, span: Span, signals: Signals, type_: ByteStreamType) -> Self {
928        Self {
929            reader: BufReader::new(reader),
930            pos: 0,
931            error: false,
932            span,
933            signals,
934            type_,
935        }
936    }
937
938    pub fn span(&self) -> Span {
939        self.span
940    }
941
942    fn next_string(&mut self) -> Result<Option<String>, (Vec<u8>, ShellError)> {
943        let from_io_error = |err: std::io::Error| match ShellErrorBridge::try_from(err) {
944            Ok(err) => err.0,
945            Err(err) => IoError::new(err, self.span, None).into(),
946        };
947
948        // Get some data from the reader
949        let buf = self
950            .reader
951            .fill_buf()
952            .map_err(from_io_error)
953            .map_err(|err| (vec![], err))?;
954
955        // If empty, this is EOF
956        if buf.is_empty() {
957            return Ok(None);
958        }
959
960        let mut buf = buf.to_vec();
961        let mut consumed = 0;
962
963        // If the buf length is under 4 bytes, it could be invalid, so try to get more
964        if buf.len() < 4 {
965            consumed += buf.len();
966            self.reader.consume(buf.len());
967            match self.reader.fill_buf() {
968                Ok(more_bytes) => buf.extend_from_slice(more_bytes),
969                Err(err) => return Err((buf, from_io_error(err))),
970            }
971        }
972
973        // Try to parse utf-8 and decide what to do
974        match String::from_utf8(buf) {
975            Ok(string) => {
976                self.reader.consume(string.len() - consumed);
977                self.pos += string.len() as u64;
978                Ok(Some(string))
979            }
980            Err(err) if err.utf8_error().error_len().is_none() => {
981                // There is some valid data at the beginning, and this is just incomplete, so just
982                // consume that and return it
983                let valid_up_to = err.utf8_error().valid_up_to();
984                if valid_up_to > consumed {
985                    self.reader.consume(valid_up_to - consumed);
986                }
987                let mut buf = err.into_bytes();
988                buf.truncate(valid_up_to);
989                buf.shrink_to_fit();
990                let string = String::from_utf8(buf)
991                    .expect("failed to parse utf-8 even after correcting error");
992                self.pos += string.len() as u64;
993                Ok(Some(string))
994            }
995            Err(err) => {
996                // There is an error at the beginning and we have no hope of parsing further.
997                let shell_error = ShellError::NonUtf8Custom {
998                    msg: format!("invalid utf-8 sequence starting at index {}", self.pos),
999                    span: self.span,
1000                };
1001                let buf = err.into_bytes();
1002                // We are consuming the entire buf though, because we're returning it in case it
1003                // will be cast to binary
1004                if buf.len() > consumed {
1005                    self.reader.consume(buf.len() - consumed);
1006                }
1007                self.pos += buf.len() as u64;
1008                Err((buf, shell_error))
1009            }
1010        }
1011    }
1012}
1013
1014impl Iterator for Chunks {
1015    type Item = Result<Value, ShellError>;
1016
1017    fn next(&mut self) -> Option<Self::Item> {
1018        if self.error || self.signals.interrupted() {
1019            None
1020        } else {
1021            match self.type_ {
1022                // Binary should always be binary
1023                ByteStreamType::Binary => {
1024                    let buf = match self.reader.fill_buf() {
1025                        Ok(buf) => buf,
1026                        Err(err) => {
1027                            self.error = true;
1028                            return Some(Err(ShellError::Io(IoError::new(err, self.span, None))));
1029                        }
1030                    };
1031                    if !buf.is_empty() {
1032                        let len = buf.len();
1033                        let value = Value::binary(buf, self.span);
1034                        self.reader.consume(len);
1035                        self.pos += len as u64;
1036                        Some(Ok(value))
1037                    } else {
1038                        None
1039                    }
1040                }
1041                // String produces an error if UTF-8 can't be parsed
1042                ByteStreamType::String => match self.next_string().transpose()? {
1043                    Ok(string) => Some(Ok(Value::string(string, self.span))),
1044                    Err((_, err)) => {
1045                        self.error = true;
1046                        Some(Err(err))
1047                    }
1048                },
1049                // For Unknown, we try to create strings, but we switch to binary mode if we
1050                // fail
1051                ByteStreamType::Unknown => {
1052                    match self.next_string().transpose()? {
1053                        Ok(string) => Some(Ok(Value::string(string, self.span))),
1054                        Err((buf, _)) if !buf.is_empty() => {
1055                            // Switch to binary mode
1056                            self.type_ = ByteStreamType::Binary;
1057                            Some(Ok(Value::binary(buf, self.span)))
1058                        }
1059                        Err((_, err)) => {
1060                            self.error = true;
1061                            Some(Err(err))
1062                        }
1063                    }
1064                }
1065            }
1066        }
1067    }
1068}
1069
1070fn trim_end_newline(string: &mut String) {
1071    if string.ends_with('\n') {
1072        string.pop();
1073        if string.ends_with('\r') {
1074            string.pop();
1075        }
1076    }
1077}
1078
1079#[cfg(unix)]
1080pub(crate) fn convert_file<T: From<OwnedFd>>(file: impl Into<OwnedFd>) -> T {
1081    file.into().into()
1082}
1083
1084#[cfg(windows)]
1085pub(crate) fn convert_file<T: From<OwnedHandle>>(file: impl Into<OwnedHandle>) -> T {
1086    file.into().into()
1087}
1088
1089const DEFAULT_BUF_SIZE: usize = 8192;
1090
1091pub fn copy_with_signals(
1092    mut reader: impl Read,
1093    mut writer: impl Write,
1094    span: Span,
1095    signals: &Signals,
1096) -> Result<u64, ShellError> {
1097    let from_io_error = IoError::factory(span, None);
1098    if signals.is_empty() {
1099        match io::copy(&mut reader, &mut writer) {
1100            Ok(n) => {
1101                writer.flush().map_err(&from_io_error)?;
1102                Ok(n)
1103            }
1104            Err(err) => {
1105                let _ = writer.flush();
1106                match ShellErrorBridge::try_from(err) {
1107                    Ok(ShellErrorBridge(shell_error)) => Err(shell_error),
1108                    Err(err) => Err(from_io_error(err).into()),
1109                }
1110            }
1111        }
1112    } else {
1113        // #[cfg(any(target_os = "linux", target_os = "android"))]
1114        // {
1115        //     return crate::sys::kernel_copy::copy_spec(reader, writer);
1116        // }
1117        match generic_copy(&mut reader, &mut writer, span, signals) {
1118            Ok(len) => {
1119                writer.flush().map_err(&from_io_error)?;
1120                Ok(len)
1121            }
1122            Err(err) => {
1123                let _ = writer.flush();
1124                Err(err)
1125            }
1126        }
1127    }
1128}
1129
1130// Copied from [`std::io::copy`]
1131fn generic_copy(
1132    mut reader: impl Read,
1133    mut writer: impl Write,
1134    span: Span,
1135    signals: &Signals,
1136) -> Result<u64, ShellError> {
1137    let from_io_error = IoError::factory(span, None);
1138    let buf = &mut [0; DEFAULT_BUF_SIZE];
1139    let mut len = 0;
1140    loop {
1141        signals.check(&span)?;
1142        let n = match reader.read(buf) {
1143            Ok(0) => break,
1144            Ok(n) => n,
1145            Err(e) if e.kind() == ErrorKind::Interrupted => continue,
1146            Err(e) => match ShellErrorBridge::try_from(e) {
1147                Ok(ShellErrorBridge(e)) => return Err(e),
1148                Err(e) => return Err(from_io_error(e).into()),
1149            },
1150        };
1151        len += n;
1152        writer.write_all(&buf[..n]).map_err(&from_io_error)?;
1153    }
1154    Ok(len as u64)
1155}
1156
1157struct ReadGenerator<F>
1158where
1159    F: FnMut(&mut Vec<u8>) -> Result<bool, ShellError> + Send + 'static,
1160{
1161    buffer: Cursor<Vec<u8>>,
1162    generator: F,
1163}
1164
1165impl<F> BufRead for ReadGenerator<F>
1166where
1167    F: FnMut(&mut Vec<u8>) -> Result<bool, ShellError> + Send + 'static,
1168{
1169    fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
1170        // We have to loop, because it's important that we don't leave the buffer empty unless we're
1171        // truly at the end of the stream.
1172        while self.buffer.fill_buf()?.is_empty() {
1173            // Reset the cursor to the beginning and truncate
1174            self.buffer.set_position(0);
1175            self.buffer.get_mut().clear();
1176            // Ask the generator to generate data
1177            if !(self.generator)(self.buffer.get_mut()).map_err(ShellErrorBridge)? {
1178                // End of stream
1179                break;
1180            }
1181        }
1182        self.buffer.fill_buf()
1183    }
1184
1185    fn consume(&mut self, amt: usize) {
1186        self.buffer.consume(amt);
1187    }
1188}
1189
1190impl<F> Read for ReadGenerator<F>
1191where
1192    F: FnMut(&mut Vec<u8>) -> Result<bool, ShellError> + Send + 'static,
1193{
1194    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
1195        // Straightforward implementation on top of BufRead
1196        let slice = self.fill_buf()?;
1197        let len = buf.len().min(slice.len());
1198        buf[..len].copy_from_slice(&slice[..len]);
1199        self.consume(len);
1200        Ok(len)
1201    }
1202}
1203
1204#[cfg(test)]
1205mod tests {
1206    use super::*;
1207
1208    fn test_chunks<T>(data: Vec<T>, type_: ByteStreamType) -> Chunks
1209    where
1210        T: AsRef<[u8]> + Default + Send + 'static,
1211    {
1212        let reader = ReadIterator {
1213            iter: data.into_iter(),
1214            cursor: Some(Cursor::new(T::default())),
1215        };
1216        Chunks::new(
1217            SourceReader::Read(Box::new(reader)),
1218            Span::test_data(),
1219            Signals::empty(),
1220            type_,
1221        )
1222    }
1223
1224    #[test]
1225    fn chunks_read_binary_passthrough() {
1226        let bins = vec![&[0, 1][..], &[2, 3][..]];
1227        let iter = test_chunks(bins.clone(), ByteStreamType::Binary);
1228
1229        let bins_values: Vec<Value> = bins
1230            .into_iter()
1231            .map(|bin| Value::binary(bin, Span::test_data()))
1232            .collect();
1233        assert_eq!(
1234            bins_values,
1235            iter.collect::<Result<Vec<Value>, _>>().expect("error")
1236        );
1237    }
1238
1239    #[test]
1240    fn chunks_read_string_clean() {
1241        let strs = vec!["Nushell", "が好きです"];
1242        let iter = test_chunks(strs.clone(), ByteStreamType::String);
1243
1244        let strs_values: Vec<Value> = strs
1245            .into_iter()
1246            .map(|string| Value::string(string, Span::test_data()))
1247            .collect();
1248        assert_eq!(
1249            strs_values,
1250            iter.collect::<Result<Vec<Value>, _>>().expect("error")
1251        );
1252    }
1253
1254    #[test]
1255    fn chunks_read_string_split_boundary() {
1256        let real = "Nushell最高!";
1257        let chunks = vec![&b"Nushell\xe6"[..], &b"\x9c\x80\xe9"[..], &b"\xab\x98!"[..]];
1258        let iter = test_chunks(chunks.clone(), ByteStreamType::String);
1259
1260        let mut string = String::new();
1261        for value in iter {
1262            let chunk_string = value.expect("error").into_string().expect("not a string");
1263            string.push_str(&chunk_string);
1264        }
1265        assert_eq!(real, string);
1266    }
1267
1268    #[test]
1269    fn chunks_read_string_utf8_error() {
1270        let chunks = vec![&b"Nushell\xe6"[..], &b"\x9c\x80\xe9"[..], &b"\xab"[..]];
1271        let iter = test_chunks(chunks, ByteStreamType::String);
1272
1273        let mut string = String::new();
1274        for value in iter {
1275            match value {
1276                Ok(value) => string.push_str(&value.into_string().expect("not a string")),
1277                Err(err) => {
1278                    println!("string so far: {string:?}");
1279                    println!("got error: {err:?}");
1280                    assert!(!string.is_empty());
1281                    assert!(matches!(err, ShellError::NonUtf8Custom { .. }));
1282                    return;
1283                }
1284            }
1285        }
1286        panic!("no error");
1287    }
1288
1289    #[test]
1290    fn chunks_read_unknown_fallback() {
1291        let chunks = vec![&b"Nushell"[..], &b"\x9c\x80\xe9abcd"[..], &b"efgh"[..]];
1292        let mut iter = test_chunks(chunks, ByteStreamType::Unknown);
1293
1294        let mut get = || iter.next().expect("end of iter").expect("error");
1295
1296        assert_eq!(Value::test_string("Nushell"), get());
1297        assert_eq!(Value::test_binary(b"\x9c\x80\xe9abcd"), get());
1298        // Once it's in binary mode it won't go back
1299        assert_eq!(Value::test_binary(b"efgh"), get());
1300    }
1301}