nu_protocol/pipeline/
byte_stream.rs

1//! Module managing the streaming of raw bytes between pipeline elements
2//!
3//! This module also handles conversions the [`ShellError`] <-> [`io::Error`](std::io::Error),
4//! so remember the usage of [`ShellErrorBridge`] where applicable.
5#[cfg(feature = "os")]
6use crate::process::{ChildPipe, ChildProcess};
7use crate::{
8    IntRange, PipelineData, ShellError, Signals, Span, Type, Value,
9    shell_error::{bridge::ShellErrorBridge, io::IoError},
10};
11use serde::{Deserialize, Serialize};
12use std::ops::Bound;
13#[cfg(unix)]
14use std::os::fd::OwnedFd;
15#[cfg(windows)]
16use std::os::windows::io::OwnedHandle;
17use std::{
18    fmt::Debug,
19    fs::File,
20    io::{self, BufRead, BufReader, Cursor, ErrorKind, Read, Write},
21    process::Stdio,
22};
23
24/// The source of bytes for a [`ByteStream`].
25///
26/// Currently, there are only three possibilities:
27/// 1. `Read` (any `dyn` type that implements [`Read`])
28/// 2. [`File`]
29/// 3. [`ChildProcess`]
30pub enum ByteStreamSource {
31    Read(Box<dyn Read + Send + 'static>),
32    File(File),
33    #[cfg(feature = "os")]
34    Child(Box<ChildProcess>),
35}
36
37impl ByteStreamSource {
38    fn reader(self) -> Option<SourceReader> {
39        match self {
40            ByteStreamSource::Read(read) => Some(SourceReader::Read(read)),
41            ByteStreamSource::File(file) => Some(SourceReader::File(file)),
42            #[cfg(feature = "os")]
43            ByteStreamSource::Child(mut child) => child.stdout.take().map(|stdout| match stdout {
44                ChildPipe::Pipe(pipe) => SourceReader::File(convert_file(pipe)),
45                ChildPipe::Tee(tee) => SourceReader::Read(tee),
46            }),
47        }
48    }
49
50    /// Source is a `Child` or `File`, rather than `Read`. Currently affects trimming
51    #[cfg(feature = "os")]
52    pub fn is_external(&self) -> bool {
53        matches!(self, ByteStreamSource::Child(..))
54    }
55
56    #[cfg(not(feature = "os"))]
57    pub fn is_external(&self) -> bool {
58        // without os support we never have externals
59        false
60    }
61}
62
63impl Debug for ByteStreamSource {
64    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        match self {
66            ByteStreamSource::Read(_) => f.debug_tuple("Read").field(&"..").finish(),
67            ByteStreamSource::File(file) => f.debug_tuple("File").field(file).finish(),
68            #[cfg(feature = "os")]
69            ByteStreamSource::Child(child) => f.debug_tuple("Child").field(child).finish(),
70        }
71    }
72}
73
74enum SourceReader {
75    Read(Box<dyn Read + Send + 'static>),
76    File(File),
77}
78
79impl Read for SourceReader {
80    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
81        match self {
82            SourceReader::Read(reader) => reader.read(buf),
83            SourceReader::File(file) => file.read(buf),
84        }
85    }
86}
87
88impl Debug for SourceReader {
89    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90        match self {
91            SourceReader::Read(_) => f.debug_tuple("Read").field(&"..").finish(),
92            SourceReader::File(file) => f.debug_tuple("File").field(file).finish(),
93        }
94    }
95}
96
97/// Optional type color for [`ByteStream`], which determines type compatibility.
98#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
99pub enum ByteStreamType {
100    /// Compatible with [`Type::Binary`], and should only be converted to binary, even when the
101    /// desired type is unknown.
102    Binary,
103    /// Compatible with [`Type::String`], and should only be converted to string, even when the
104    /// desired type is unknown.
105    ///
106    /// This does not guarantee valid UTF-8 data, but it is conventionally so. Converting to
107    /// `String` still requires validation of the data.
108    String,
109    /// Unknown whether the stream should contain binary or string data. This usually is the result
110    /// of an external stream, e.g. an external command or file.
111    #[default]
112    Unknown,
113}
114
115impl ByteStreamType {
116    /// Returns the string that describes the byte stream type - i.e., the same as what `describe`
117    /// produces. This can be used in type mismatch error messages.
118    pub fn describe(self) -> &'static str {
119        match self {
120            ByteStreamType::Binary => "binary (stream)",
121            ByteStreamType::String => "string (stream)",
122            ByteStreamType::Unknown => "byte stream",
123        }
124    }
125
126    /// Returns true if the type is `Binary` or `Unknown`
127    pub fn is_binary_coercible(self) -> bool {
128        matches!(self, ByteStreamType::Binary | ByteStreamType::Unknown)
129    }
130
131    /// Returns true if the type is `String` or `Unknown`
132    pub fn is_string_coercible(self) -> bool {
133        matches!(self, ByteStreamType::String | ByteStreamType::Unknown)
134    }
135}
136
137impl From<ByteStreamType> for Type {
138    fn from(value: ByteStreamType) -> Self {
139        match value {
140            ByteStreamType::Binary => Type::Binary,
141            ByteStreamType::String => Type::String,
142            ByteStreamType::Unknown => Type::Any,
143        }
144    }
145}
146
147/// A potentially infinite, interruptible stream of bytes.
148///
149/// To create a [`ByteStream`], you can use any of the following methods:
150/// - [`read`](ByteStream::read): takes any type that implements [`Read`].
151/// - [`file`](ByteStream::file): takes a [`File`].
152/// - [`from_iter`](ByteStream::from_iter): takes an [`Iterator`] whose items implement `AsRef<[u8]>`.
153/// - [`from_result_iter`](ByteStream::from_result_iter): same as [`from_iter`](ByteStream::from_iter),
154///   but each item is a `Result<T, ShellError>`.
155/// - [`from_fn`](ByteStream::from_fn): uses a generator function to fill a buffer whenever it is
156///   empty. This has high performance because it doesn't need to allocate for each chunk of data,
157///   and can just reuse the same buffer.
158///
159/// Byte streams have a [type](.type_()) which is used to preserve type compatibility when they
160/// are the result of an internal command. It is important that this be set to the correct value.
161/// [`Unknown`](ByteStreamType::Unknown) is used only for external sources where the type can not
162/// be inherently determined, and having it automatically act as a string or binary depending on
163/// whether it parses as UTF-8 or not is desirable.
164///
165/// The data of a [`ByteStream`] can be accessed using one of the following methods:
166/// - [`reader`](ByteStream::reader): returns a [`Read`]-able type to get the raw bytes in the stream.
167/// - [`lines`](ByteStream::lines): splits the bytes on lines and returns an [`Iterator`]
168///   where each item is a `Result<String, ShellError>`.
169/// - [`chunks`](ByteStream::chunks): returns an [`Iterator`] of [`Value`]s where each value is
170///   either a string or binary.
171///   Try not to use this method if possible. Rather, please use [`reader`](ByteStream::reader)
172///   (or [`lines`](ByteStream::lines) if it matches the situation).
173///
174/// Additionally, there are few methods to collect a [`ByteStream`] into memory:
175/// - [`into_bytes`](ByteStream::into_bytes): collects all bytes into a [`Vec<u8>`].
176/// - [`into_string`](ByteStream::into_string): collects all bytes into a [`String`], erroring if utf-8 decoding failed.
177/// - [`into_value`](ByteStream::into_value): collects all bytes into a value typed appropriately
178///   for the [type](.type_()) of this stream. If the type is [`Unknown`](ByteStreamType::Unknown),
179///   it will produce a string value if the data is valid UTF-8, or a binary value otherwise.
180///
181/// There are also a few other methods to consume all the data of a [`ByteStream`]:
182/// - [`drain`](ByteStream::drain): consumes all bytes and outputs nothing.
183/// - [`write_to`](ByteStream::write_to): writes all bytes to the given [`Write`] destination.
184/// - [`print`](ByteStream::print): a convenience wrapper around [`write_to`](ByteStream::write_to).
185///   It prints all bytes to stdout or stderr.
186///
187/// Internally, [`ByteStream`]s currently come in three flavors according to [`ByteStreamSource`].
188/// See its documentation for more information.
189#[derive(Debug)]
190pub struct ByteStream {
191    stream: ByteStreamSource,
192    span: Span,
193    signals: Signals,
194    type_: ByteStreamType,
195    known_size: Option<u64>,
196    caller_spans: Vec<Span>,
197}
198
199impl ByteStream {
200    /// Create a new [`ByteStream`] from a [`ByteStreamSource`].
201    pub fn new(
202        stream: ByteStreamSource,
203        span: Span,
204        signals: Signals,
205        type_: ByteStreamType,
206    ) -> Self {
207        Self {
208            stream,
209            span,
210            signals,
211            type_,
212            known_size: None,
213            caller_spans: vec![],
214        }
215    }
216
217    /// Push a caller [`Span`] to the bytestream, it's useful to construct a backtrace.
218    pub fn push_caller_span(&mut self, span: Span) {
219        if span != self.span {
220            self.caller_spans.push(span)
221        }
222    }
223
224    /// Get all caller [`Span`], it's useful to construct a backtrace.
225    pub fn get_caller_spans(&self) -> &Vec<Span> {
226        &self.caller_spans
227    }
228
229    /// Create a [`ByteStream`] from an arbitrary reader. The type must be provided.
230    pub fn read(
231        reader: impl Read + Send + 'static,
232        span: Span,
233        signals: Signals,
234        type_: ByteStreamType,
235    ) -> Self {
236        Self::new(
237            ByteStreamSource::Read(Box::new(reader)),
238            span,
239            signals,
240            type_,
241        )
242    }
243
244    pub fn skip(self, span: Span, n: u64) -> Result<Self, ShellError> {
245        let known_size = self.known_size.map(|len| len.saturating_sub(n));
246        if let Some(mut reader) = self.reader() {
247            // Copy the number of skipped bytes into the sink before proceeding
248            io::copy(&mut (&mut reader).take(n), &mut io::sink())
249                .map_err(|err| IoError::new(err, span, None))?;
250            Ok(
251                ByteStream::read(reader, span, Signals::empty(), ByteStreamType::Binary)
252                    .with_known_size(known_size),
253            )
254        } else {
255            Err(ShellError::TypeMismatch {
256                err_message: "expected readable stream".into(),
257                span,
258            })
259        }
260    }
261
262    pub fn take(self, span: Span, n: u64) -> Result<Self, ShellError> {
263        let known_size = self.known_size.map(|s| s.min(n));
264        if let Some(reader) = self.reader() {
265            Ok(ByteStream::read(
266                reader.take(n),
267                span,
268                Signals::empty(),
269                ByteStreamType::Binary,
270            )
271            .with_known_size(known_size))
272        } else {
273            Err(ShellError::TypeMismatch {
274                err_message: "expected readable stream".into(),
275                span,
276            })
277        }
278    }
279
280    pub fn slice(
281        self,
282        val_span: Span,
283        call_span: Span,
284        range: IntRange,
285    ) -> Result<Self, ShellError> {
286        if let Some(len) = self.known_size {
287            let start = range.absolute_start(len);
288            let stream = self.skip(val_span, start);
289
290            match range.absolute_end(len) {
291                Bound::Unbounded => stream,
292                Bound::Included(end) | Bound::Excluded(end) if end < start => {
293                    stream.and_then(|s| s.take(val_span, 0))
294                }
295                Bound::Included(end) => {
296                    let distance = end - start + 1;
297                    stream.and_then(|s| s.take(val_span, distance.min(len)))
298                }
299                Bound::Excluded(end) => {
300                    let distance = end - start;
301                    stream.and_then(|s| s.take(val_span, distance.min(len)))
302                }
303            }
304        } else if range.is_relative() {
305            Err(ShellError::RelativeRangeOnInfiniteStream { span: call_span })
306        } else {
307            let start = range.start() as u64;
308            let stream = self.skip(val_span, start);
309
310            match range.distance() {
311                Bound::Unbounded => stream,
312                Bound::Included(distance) => stream.and_then(|s| s.take(val_span, distance + 1)),
313                Bound::Excluded(distance) => stream.and_then(|s| s.take(val_span, distance)),
314            }
315        }
316    }
317
318    /// Create a [`ByteStream`] from a string. The type of the stream is always `String`.
319    pub fn read_string(string: String, span: Span, signals: Signals) -> Self {
320        let len = string.len();
321        ByteStream::read(
322            Cursor::new(string.into_bytes()),
323            span,
324            signals,
325            ByteStreamType::String,
326        )
327        .with_known_size(Some(len as u64))
328    }
329
330    /// Create a [`ByteStream`] from a byte vector. The type of the stream is always `Binary`.
331    pub fn read_binary(bytes: Vec<u8>, span: Span, signals: Signals) -> Self {
332        let len = bytes.len();
333        ByteStream::read(Cursor::new(bytes), span, signals, ByteStreamType::Binary)
334            .with_known_size(Some(len as u64))
335    }
336
337    /// Create a [`ByteStream`] from a file.
338    ///
339    /// The type is implicitly `Unknown`, as it's not typically known whether files will
340    /// return text or binary.
341    pub fn file(file: File, span: Span, signals: Signals) -> Self {
342        Self::new(
343            ByteStreamSource::File(file),
344            span,
345            signals,
346            ByteStreamType::Unknown,
347        )
348    }
349
350    /// Create a [`ByteStream`] from a child process's stdout and stderr.
351    ///
352    /// The type is implicitly `Unknown`, as it's not typically known whether child processes will
353    /// return text or binary.
354    #[cfg(feature = "os")]
355    pub fn child(child: ChildProcess, span: Span) -> Self {
356        Self::new(
357            ByteStreamSource::Child(Box::new(child)),
358            span,
359            Signals::empty(),
360            ByteStreamType::Unknown,
361        )
362    }
363
364    /// Create a [`ByteStream`] that reads from stdin.
365    ///
366    /// The type is implicitly `Unknown`, as it's not typically known whether stdin is text or
367    /// binary.
368    #[cfg(feature = "os")]
369    pub fn stdin(span: Span) -> Result<Self, ShellError> {
370        let stdin = os_pipe::dup_stdin().map_err(|err| IoError::new(err, span, None))?;
371        let source = ByteStreamSource::File(convert_file(stdin));
372        Ok(Self::new(
373            source,
374            span,
375            Signals::empty(),
376            ByteStreamType::Unknown,
377        ))
378    }
379
380    #[cfg(not(feature = "os"))]
381    pub fn stdin(span: Span) -> Result<Self, ShellError> {
382        Err(ShellError::DisabledOsSupport {
383            msg: "Stdin is not supported".to_string(),
384            span: Some(span),
385        })
386    }
387
388    /// Create a [`ByteStream`] from a generator function that writes data to the given buffer
389    /// when called, and returns `Ok(false)` on end of stream.
390    pub fn from_fn(
391        span: Span,
392        signals: Signals,
393        type_: ByteStreamType,
394        generator: impl FnMut(&mut Vec<u8>) -> Result<bool, ShellError> + Send + 'static,
395    ) -> Self {
396        Self::read(
397            ReadGenerator {
398                buffer: Cursor::new(Vec::new()),
399                generator,
400            },
401            span,
402            signals,
403            type_,
404        )
405    }
406
407    pub fn with_type(mut self, type_: ByteStreamType) -> Self {
408        self.type_ = type_;
409        self
410    }
411
412    /// Create a new [`ByteStream`] from an [`Iterator`] of bytes slices.
413    ///
414    /// The returned [`ByteStream`] will have a [`ByteStreamSource`] of `Read`.
415    pub fn from_iter<I>(iter: I, span: Span, signals: Signals, type_: ByteStreamType) -> Self
416    where
417        I: IntoIterator,
418        I::IntoIter: Send + 'static,
419        I::Item: AsRef<[u8]> + Default + Send + 'static,
420    {
421        let iter = iter.into_iter();
422        let cursor = Some(Cursor::new(I::Item::default()));
423        Self::read(ReadIterator { iter, cursor }, span, signals, type_)
424    }
425
426    /// Create a new [`ByteStream`] from an [`Iterator`] of [`Result`] bytes slices.
427    ///
428    /// The returned [`ByteStream`] will have a [`ByteStreamSource`] of `Read`.
429    pub fn from_result_iter<I, T>(
430        iter: I,
431        span: Span,
432        signals: Signals,
433        type_: ByteStreamType,
434    ) -> Self
435    where
436        I: IntoIterator<Item = Result<T, ShellError>>,
437        I::IntoIter: Send + 'static,
438        T: AsRef<[u8]> + Default + Send + 'static,
439    {
440        let iter = iter.into_iter();
441        let cursor = Some(Cursor::new(T::default()));
442        Self::read(ReadResultIterator { iter, cursor }, span, signals, type_)
443    }
444
445    /// Set the known size, in number of bytes, of the [`ByteStream`].
446    pub fn with_known_size(mut self, size: Option<u64>) -> Self {
447        self.known_size = size;
448        self
449    }
450
451    /// Get a reference to the inner [`ByteStreamSource`] of the [`ByteStream`].
452    pub fn source(&self) -> &ByteStreamSource {
453        &self.stream
454    }
455
456    /// Get a mutable reference to the inner [`ByteStreamSource`] of the [`ByteStream`].
457    pub fn source_mut(&mut self) -> &mut ByteStreamSource {
458        &mut self.stream
459    }
460
461    /// Returns the [`Span`] associated with the [`ByteStream`].
462    pub fn span(&self) -> Span {
463        self.span
464    }
465
466    /// Changes the [`Span`] associated with the [`ByteStream`].
467    pub fn with_span(mut self, span: Span) -> Self {
468        self.span = span;
469        self
470    }
471
472    /// Returns the [`ByteStreamType`] associated with the [`ByteStream`].
473    pub fn type_(&self) -> ByteStreamType {
474        self.type_
475    }
476
477    /// Returns the known size, in number of bytes, of the [`ByteStream`].
478    pub fn known_size(&self) -> Option<u64> {
479        self.known_size
480    }
481
482    /// Convert the [`ByteStream`] into its [`Reader`] which allows one to [`Read`] the raw bytes of the stream.
483    ///
484    /// [`Reader`] is buffered and also implements [`BufRead`].
485    ///
486    /// If the source of the [`ByteStream`] is [`ByteStreamSource::Child`] and the child has no stdout,
487    /// then the stream is considered empty and `None` will be returned.
488    pub fn reader(self) -> Option<Reader> {
489        let reader = self.stream.reader()?;
490        Some(Reader {
491            reader: BufReader::new(reader),
492            span: self.span,
493            signals: self.signals,
494        })
495    }
496
497    /// Convert the [`ByteStream`] into a [`Lines`] iterator where each element is a `Result<String, ShellError>`.
498    ///
499    /// There is no limit on how large each line will be. Ending new lines (`\n` or `\r\n`) are
500    /// stripped from each line. If a line fails to be decoded as utf-8, then it will become a [`ShellError`].
501    ///
502    /// If the source of the [`ByteStream`] is [`ByteStreamSource::Child`] and the child has no stdout,
503    /// then the stream is considered empty and `None` will be returned.
504    pub fn lines(self) -> Option<Lines> {
505        let reader = self.stream.reader()?;
506        Some(Lines {
507            reader: BufReader::new(reader),
508            span: self.span,
509            signals: self.signals,
510        })
511    }
512
513    /// Convert the [`ByteStream`] into a [`SplitRead`] iterator where each element is a `Result<String, ShellError>`.
514    ///
515    /// Each call to [`next`](Iterator::next) reads the currently available data from the byte
516    /// stream source, until `delimiter` or the end of the stream is encountered.
517    ///
518    /// If the source of the [`ByteStream`] is [`ByteStreamSource::Child`] and the child has no stdout,
519    /// then the stream is considered empty and `None` will be returned.
520    pub fn split(self, delimiter: Vec<u8>) -> Option<SplitRead> {
521        let reader = self.stream.reader()?;
522        Some(SplitRead::new(reader, delimiter, self.span, self.signals))
523    }
524
525    /// Convert the [`ByteStream`] into a [`Chunks`] iterator where each element is a `Result<Value, ShellError>`.
526    ///
527    /// Each call to [`next`](Iterator::next) reads the currently available data from the byte stream source,
528    /// up to a maximum size. The values are typed according to the [type](.type_()) of the
529    /// stream, and if that type is [`Unknown`](ByteStreamType::Unknown), string values will be
530    /// produced as long as the stream continues to parse as valid UTF-8, but binary values will
531    /// be produced instead of the stream fails to parse as UTF-8 instead at any point.
532    /// Any and all newlines are kept intact in each chunk.
533    ///
534    /// Where possible, prefer [`reader`](ByteStream::reader) or [`lines`](ByteStream::lines) over this method.
535    /// Those methods are more likely to be used in a semantically correct way
536    /// (and [`reader`](ByteStream::reader) is more efficient too).
537    ///
538    /// If the source of the [`ByteStream`] is [`ByteStreamSource::Child`] and the child has no stdout,
539    /// then the stream is considered empty and `None` will be returned.
540    pub fn chunks(self) -> Option<Chunks> {
541        let reader = self.stream.reader()?;
542        Some(Chunks::new(reader, self.span, self.signals, self.type_))
543    }
544
545    /// Convert the [`ByteStream`] into its inner [`ByteStreamSource`].
546    pub fn into_source(self) -> ByteStreamSource {
547        self.stream
548    }
549
550    /// Attempt to convert the [`ByteStream`] into a [`Stdio`].
551    ///
552    /// This will succeed if the [`ByteStreamSource`] of the [`ByteStream`] is either:
553    /// - [`File`](ByteStreamSource::File)
554    /// - [`Child`](ByteStreamSource::Child) and the child has a stdout that is `Some(ChildPipe::Pipe(..))`.
555    ///
556    /// All other cases return an `Err` with the original [`ByteStream`] in it.
557    pub fn into_stdio(mut self) -> Result<Stdio, Self> {
558        match self.stream {
559            ByteStreamSource::Read(..) => Err(self),
560            ByteStreamSource::File(file) => Ok(file.into()),
561            #[cfg(feature = "os")]
562            ByteStreamSource::Child(child) => {
563                if let ChildProcess {
564                    stdout: Some(ChildPipe::Pipe(stdout)),
565                    stderr,
566                    ..
567                } = *child
568                {
569                    debug_assert!(stderr.is_none(), "stderr should not exist");
570                    Ok(stdout.into())
571                } else {
572                    self.stream = ByteStreamSource::Child(child);
573                    Err(self)
574                }
575            }
576        }
577    }
578
579    /// Attempt to convert the [`ByteStream`] into a [`ChildProcess`].
580    ///
581    /// This will only succeed if the [`ByteStreamSource`] of the [`ByteStream`] is [`Child`](ByteStreamSource::Child).
582    /// All other cases return an `Err` with the original [`ByteStream`] in it.
583    #[cfg(feature = "os")]
584    pub fn into_child(self) -> Result<ChildProcess, Self> {
585        if let ByteStreamSource::Child(child) = self.stream {
586            Ok(*child)
587        } else {
588            Err(self)
589        }
590    }
591
592    /// Collect all the bytes of the [`ByteStream`] into a [`Vec<u8>`].
593    ///
594    /// Any trailing new lines are kept in the returned [`Vec`].
595    pub fn into_bytes(self) -> Result<Vec<u8>, ShellError> {
596        // todo!() ctrlc
597        let from_io_error = IoError::factory(self.span, None);
598        match self.stream {
599            ByteStreamSource::Read(mut read) => {
600                let mut buf = Vec::new();
601                read.read_to_end(&mut buf).map_err(|err| {
602                    match ShellErrorBridge::try_from(err) {
603                        Ok(ShellErrorBridge(err)) => err,
604                        Err(err) => ShellError::Io(from_io_error(err)),
605                    }
606                })?;
607                Ok(buf)
608            }
609            ByteStreamSource::File(mut file) => {
610                let mut buf = Vec::new();
611                file.read_to_end(&mut buf).map_err(&from_io_error)?;
612                Ok(buf)
613            }
614            #[cfg(feature = "os")]
615            ByteStreamSource::Child(child) => child.into_bytes(),
616        }
617    }
618
619    /// Collect the stream into a `String` in-memory. This can only succeed if the data contained is
620    /// valid UTF-8.
621    ///
622    /// The trailing new line (`\n` or `\r\n`), if any, is removed from the [`String`] prior to
623    /// being returned, if this is a stream coming from an external process or file.
624    ///
625    /// If the [type](.type_()) is specified as `Binary`, this operation always fails, even if the
626    /// data would have been valid UTF-8.
627    pub fn into_string(self) -> Result<String, ShellError> {
628        let span = self.span;
629        if self.type_.is_string_coercible() {
630            let trim = self.stream.is_external();
631            let bytes = self.into_bytes()?;
632            let mut string = String::from_utf8(bytes).map_err(|err| ShellError::NonUtf8Custom {
633                span,
634                msg: err.to_string(),
635            })?;
636            if trim {
637                trim_end_newline(&mut string);
638            }
639            Ok(string)
640        } else {
641            Err(ShellError::TypeMismatch {
642                err_message: "expected string, but got binary".into(),
643                span,
644            })
645        }
646    }
647
648    /// Collect all the bytes of the [`ByteStream`] into a [`Value`].
649    ///
650    /// If this is a `String` stream, the stream is decoded to UTF-8. If the stream came from an
651    /// external process or file, the trailing new line (`\n` or `\r\n`), if any, is removed from
652    /// the [`String`] prior to being returned.
653    ///
654    /// If this is a `Binary` stream, a [`Value::Binary`] is returned with any trailing new lines
655    /// preserved.
656    ///
657    /// If this is an `Unknown` stream, the behavior depends on whether the stream parses as valid
658    /// UTF-8 or not. If it does, this is uses the `String` behavior; if not, it uses the `Binary`
659    /// behavior.
660    pub fn into_value(self) -> Result<Value, ShellError> {
661        let span = self.span;
662        let trim = self.stream.is_external();
663        let value = match self.type_ {
664            // If the type is specified, then the stream should always become that type:
665            ByteStreamType::Binary => Value::binary(self.into_bytes()?, span),
666            ByteStreamType::String => Value::string(self.into_string()?, span),
667            // If the type is not specified, then it just depends on whether it parses or not:
668            ByteStreamType::Unknown => match String::from_utf8(self.into_bytes()?) {
669                Ok(mut str) => {
670                    if trim {
671                        trim_end_newline(&mut str);
672                    }
673                    Value::string(str, span)
674                }
675                Err(err) => Value::binary(err.into_bytes(), span),
676            },
677        };
678        Ok(value)
679    }
680
681    /// Consume and drop all bytes of the [`ByteStream`].
682    pub fn drain(self) -> Result<(), ShellError> {
683        match self.stream {
684            ByteStreamSource::Read(read) => {
685                copy_with_signals(read, io::sink(), self.span, &self.signals)?;
686                Ok(())
687            }
688            ByteStreamSource::File(_) => Ok(()),
689            #[cfg(feature = "os")]
690            ByteStreamSource::Child(child) => child.wait(),
691        }
692    }
693
694    /// Print all bytes of the [`ByteStream`] to stdout or stderr.
695    pub fn print(self, to_stderr: bool) -> Result<(), ShellError> {
696        if to_stderr {
697            self.write_to(&mut io::stderr())
698        } else {
699            self.write_to(&mut io::stdout())
700        }
701    }
702
703    /// Write all bytes of the [`ByteStream`] to `dest`.
704    pub fn write_to(self, dest: impl Write) -> Result<(), ShellError> {
705        let span = self.span;
706        let signals = &self.signals;
707        match self.stream {
708            ByteStreamSource::Read(read) => {
709                copy_with_signals(read, dest, span, signals)?;
710            }
711            ByteStreamSource::File(file) => {
712                copy_with_signals(file, dest, span, signals)?;
713            }
714            #[cfg(feature = "os")]
715            ByteStreamSource::Child(mut child) => {
716                // All `OutDest`s except `OutDest::PipeSeparate` will cause `stderr` to be `None`.
717                // Only `save`, `tee`, and `complete` set the stderr `OutDest` to `OutDest::PipeSeparate`,
718                // and those commands have proper simultaneous handling of stdout and stderr.
719                debug_assert!(child.stderr.is_none(), "stderr should not exist");
720
721                if let Some(stdout) = child.stdout.take() {
722                    match stdout {
723                        ChildPipe::Pipe(pipe) => {
724                            copy_with_signals(pipe, dest, span, signals)?;
725                        }
726                        ChildPipe::Tee(tee) => {
727                            copy_with_signals(tee, dest, span, signals)?;
728                        }
729                    }
730                }
731                child.wait()?;
732            }
733        }
734        Ok(())
735    }
736}
737
738impl From<ByteStream> for PipelineData {
739    fn from(stream: ByteStream) -> Self {
740        Self::ByteStream(stream, None)
741    }
742}
743
744struct ReadIterator<I>
745where
746    I: Iterator,
747    I::Item: AsRef<[u8]>,
748{
749    iter: I,
750    cursor: Option<Cursor<I::Item>>,
751}
752
753impl<I> Read for ReadIterator<I>
754where
755    I: Iterator,
756    I::Item: AsRef<[u8]>,
757{
758    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
759        while let Some(cursor) = self.cursor.as_mut() {
760            let read = cursor.read(buf)?;
761            if read == 0 {
762                self.cursor = self.iter.next().map(Cursor::new);
763            } else {
764                return Ok(read);
765            }
766        }
767        Ok(0)
768    }
769}
770
771struct ReadResultIterator<I, T>
772where
773    I: Iterator<Item = Result<T, ShellError>>,
774    T: AsRef<[u8]>,
775{
776    iter: I,
777    cursor: Option<Cursor<T>>,
778}
779
780impl<I, T> Read for ReadResultIterator<I, T>
781where
782    I: Iterator<Item = Result<T, ShellError>>,
783    T: AsRef<[u8]>,
784{
785    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
786        while let Some(cursor) = self.cursor.as_mut() {
787            let read = cursor.read(buf)?;
788            if read == 0 {
789                self.cursor = self
790                    .iter
791                    .next()
792                    .transpose()
793                    .map_err(ShellErrorBridge)?
794                    .map(Cursor::new);
795            } else {
796                return Ok(read);
797            }
798        }
799        Ok(0)
800    }
801}
802
803pub struct Reader {
804    reader: BufReader<SourceReader>,
805    span: Span,
806    signals: Signals,
807}
808
809impl Reader {
810    pub fn span(&self) -> Span {
811        self.span
812    }
813}
814
815impl Read for Reader {
816    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
817        self.signals.check(&self.span).map_err(ShellErrorBridge)?;
818        self.reader.read(buf)
819    }
820}
821
822impl BufRead for Reader {
823    fn fill_buf(&mut self) -> io::Result<&[u8]> {
824        self.reader.fill_buf()
825    }
826
827    fn consume(&mut self, amt: usize) {
828        self.reader.consume(amt)
829    }
830}
831
832pub struct Lines {
833    reader: BufReader<SourceReader>,
834    span: Span,
835    signals: Signals,
836}
837
838impl Lines {
839    pub fn span(&self) -> Span {
840        self.span
841    }
842}
843
844impl Iterator for Lines {
845    type Item = Result<String, ShellError>;
846
847    fn next(&mut self) -> Option<Self::Item> {
848        if self.signals.interrupted() {
849            None
850        } else {
851            let mut buf = Vec::new();
852            match self.reader.read_until(b'\n', &mut buf) {
853                Ok(0) => None,
854                Ok(_) => {
855                    let Ok(mut string) = String::from_utf8(buf) else {
856                        return Some(Err(ShellError::NonUtf8 { span: self.span }));
857                    };
858                    trim_end_newline(&mut string);
859                    Some(Ok(string))
860                }
861                Err(err) => Some(Err(IoError::new(err, self.span, None).into())),
862            }
863        }
864    }
865}
866
867mod split_read {
868    use std::io::{BufRead, ErrorKind};
869
870    use memchr::memmem::Finder;
871
872    pub struct SplitRead<R> {
873        reader: Option<R>,
874        buf: Option<Vec<u8>>,
875        finder: Finder<'static>,
876    }
877
878    impl<R: BufRead> SplitRead<R> {
879        pub fn new(reader: R, delim: impl AsRef<[u8]>) -> Self {
880            // empty delimiter results in an infinite stream of empty items
881            debug_assert!(!delim.as_ref().is_empty(), "delimiter can't be empty");
882            Self {
883                reader: Some(reader),
884                buf: Some(Vec::new()),
885                finder: Finder::new(delim.as_ref()).into_owned(),
886            }
887        }
888    }
889
890    impl<R: BufRead> Iterator for SplitRead<R> {
891        type Item = Result<Vec<u8>, std::io::Error>;
892
893        fn next(&mut self) -> Option<Self::Item> {
894            let buf = self.buf.as_mut()?;
895            let mut search_start = 0usize;
896
897            loop {
898                if let Some(i) = self.finder.find(&buf[search_start..]) {
899                    let needle_idx = search_start + i;
900                    let right = buf.split_off(needle_idx + self.finder.needle().len());
901                    buf.truncate(needle_idx);
902                    let left = std::mem::replace(buf, right);
903                    return Some(Ok(left));
904                }
905
906                if let Some(mut r) = self.reader.take() {
907                    search_start = buf.len().saturating_sub(self.finder.needle().len() + 1);
908                    let available = match r.fill_buf() {
909                        Ok(n) => n,
910                        Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
911                        Err(e) => return Some(Err(e)),
912                    };
913
914                    buf.extend_from_slice(available);
915                    let used = available.len();
916                    r.consume(used);
917                    if used != 0 {
918                        self.reader = Some(r);
919                    }
920                    continue;
921                } else {
922                    return self.buf.take().map(Ok);
923                }
924            }
925        }
926    }
927
928    #[cfg(test)]
929    mod tests {
930        use super::*;
931        use std::io::{self, Cursor, Read};
932
933        #[test]
934        fn simple() {
935            let s = "foo-bar-baz";
936            let cursor = Cursor::new(String::from(s));
937            let mut split =
938                SplitRead::new(cursor, "-").map(|r| String::from_utf8(r.unwrap()).unwrap());
939
940            assert_eq!(split.next().as_deref(), Some("foo"));
941            assert_eq!(split.next().as_deref(), Some("bar"));
942            assert_eq!(split.next().as_deref(), Some("baz"));
943            assert_eq!(split.next(), None);
944        }
945
946        #[test]
947        fn with_empty_fields() -> Result<(), io::Error> {
948            let s = "\0\0foo\0\0bar\0\0\0\0baz\0\0";
949            let cursor = Cursor::new(String::from(s));
950            let mut split =
951                SplitRead::new(cursor, "\0\0").map(|r| String::from_utf8(r.unwrap()).unwrap());
952
953            assert_eq!(split.next().as_deref(), Some(""));
954            assert_eq!(split.next().as_deref(), Some("foo"));
955            assert_eq!(split.next().as_deref(), Some("bar"));
956            assert_eq!(split.next().as_deref(), Some(""));
957            assert_eq!(split.next().as_deref(), Some("baz"));
958            assert_eq!(split.next().as_deref(), Some(""));
959            assert_eq!(split.next().as_deref(), None);
960
961            Ok(())
962        }
963
964        #[test]
965        fn complex_delimiter() -> Result<(), io::Error> {
966            let s = "<|>foo<|>bar<|><|>baz<|>";
967            let cursor = Cursor::new(String::from(s));
968            let mut split =
969                SplitRead::new(cursor, "<|>").map(|r| String::from_utf8(r.unwrap()).unwrap());
970
971            assert_eq!(split.next().as_deref(), Some(""));
972            assert_eq!(split.next().as_deref(), Some("foo"));
973            assert_eq!(split.next().as_deref(), Some("bar"));
974            assert_eq!(split.next().as_deref(), Some(""));
975            assert_eq!(split.next().as_deref(), Some("baz"));
976            assert_eq!(split.next().as_deref(), Some(""));
977            assert_eq!(split.next().as_deref(), None);
978
979            Ok(())
980        }
981
982        #[test]
983        fn all_empty() -> Result<(), io::Error> {
984            let s = "<><>";
985            let cursor = Cursor::new(String::from(s));
986            let mut split =
987                SplitRead::new(cursor, "<>").map(|r| String::from_utf8(r.unwrap()).unwrap());
988
989            assert_eq!(split.next().as_deref(), Some(""));
990            assert_eq!(split.next().as_deref(), Some(""));
991            assert_eq!(split.next().as_deref(), Some(""));
992            assert_eq!(split.next(), None);
993
994            Ok(())
995        }
996
997        #[should_panic = "delimiter can't be empty"]
998        #[test]
999        fn empty_delimiter() {
1000            let s = "abc";
1001            let cursor = Cursor::new(String::from(s));
1002            let _split = SplitRead::new(cursor, "").map(|e| e.unwrap());
1003        }
1004
1005        #[test]
1006        fn delimiter_spread_across_reads() {
1007            let reader = Cursor::new("<|>foo<|")
1008                .chain(Cursor::new(">bar<|><"))
1009                .chain(Cursor::new("|>baz<|>"));
1010
1011            let mut split =
1012                SplitRead::new(reader, "<|>").map(|r| String::from_utf8(r.unwrap()).unwrap());
1013
1014            assert_eq!(split.next().unwrap(), "");
1015            assert_eq!(split.next().unwrap(), "foo");
1016            assert_eq!(split.next().unwrap(), "bar");
1017            assert_eq!(split.next().unwrap(), "");
1018            assert_eq!(split.next().unwrap(), "baz");
1019            assert_eq!(split.next().unwrap(), "");
1020            assert_eq!(split.next(), None);
1021        }
1022    }
1023}
1024
1025pub struct SplitRead {
1026    internal: split_read::SplitRead<BufReader<SourceReader>>,
1027    span: Span,
1028    signals: Signals,
1029}
1030
1031impl SplitRead {
1032    fn new(
1033        reader: SourceReader,
1034        delimiter: impl AsRef<[u8]>,
1035        span: Span,
1036        signals: Signals,
1037    ) -> Self {
1038        Self {
1039            internal: split_read::SplitRead::new(BufReader::new(reader), delimiter),
1040            span,
1041            signals,
1042        }
1043    }
1044
1045    pub fn span(&self) -> Span {
1046        self.span
1047    }
1048}
1049
1050impl Iterator for SplitRead {
1051    type Item = Result<Vec<u8>, ShellError>;
1052
1053    fn next(&mut self) -> Option<Self::Item> {
1054        if self.signals.interrupted() {
1055            return None;
1056        }
1057        self.internal.next().map(|r| {
1058            r.map_err(|err| {
1059                ShellError::Io(IoError::new_internal(
1060                    err,
1061                    "Could not get next value for SplitRead",
1062                    crate::location!(),
1063                ))
1064            })
1065        })
1066    }
1067}
1068
1069/// Turn a readable stream into [`Value`]s.
1070///
1071/// The `Value` type depends on the type of the stream ([`ByteStreamType`]). If `Unknown`, the
1072/// stream will return strings as long as UTF-8 parsing succeeds, but will start returning binary
1073/// if it fails.
1074pub struct Chunks {
1075    reader: BufReader<SourceReader>,
1076    pos: u64,
1077    error: bool,
1078    span: Span,
1079    signals: Signals,
1080    type_: ByteStreamType,
1081}
1082
1083impl Chunks {
1084    fn new(reader: SourceReader, span: Span, signals: Signals, type_: ByteStreamType) -> Self {
1085        Self {
1086            reader: BufReader::new(reader),
1087            pos: 0,
1088            error: false,
1089            span,
1090            signals,
1091            type_,
1092        }
1093    }
1094
1095    pub fn span(&self) -> Span {
1096        self.span
1097    }
1098
1099    fn next_string(&mut self) -> Result<Option<String>, (Vec<u8>, ShellError)> {
1100        let from_io_error = |err: std::io::Error| match ShellErrorBridge::try_from(err) {
1101            Ok(err) => err.0,
1102            Err(err) => IoError::new(err, self.span, None).into(),
1103        };
1104
1105        // Get some data from the reader
1106        let buf = self
1107            .reader
1108            .fill_buf()
1109            .map_err(from_io_error)
1110            .map_err(|err| (vec![], err))?;
1111
1112        // If empty, this is EOF
1113        if buf.is_empty() {
1114            return Ok(None);
1115        }
1116
1117        let mut buf = buf.to_vec();
1118        let mut consumed = 0;
1119
1120        // If the buf length is under 4 bytes, it could be invalid, so try to get more
1121        if buf.len() < 4 {
1122            consumed += buf.len();
1123            self.reader.consume(buf.len());
1124            match self.reader.fill_buf() {
1125                Ok(more_bytes) => buf.extend_from_slice(more_bytes),
1126                Err(err) => return Err((buf, from_io_error(err))),
1127            }
1128        }
1129
1130        // Try to parse utf-8 and decide what to do
1131        match String::from_utf8(buf) {
1132            Ok(string) => {
1133                self.reader.consume(string.len() - consumed);
1134                self.pos += string.len() as u64;
1135                Ok(Some(string))
1136            }
1137            Err(err) if err.utf8_error().error_len().is_none() => {
1138                // There is some valid data at the beginning, and this is just incomplete, so just
1139                // consume that and return it
1140                let valid_up_to = err.utf8_error().valid_up_to();
1141                if valid_up_to > consumed {
1142                    self.reader.consume(valid_up_to - consumed);
1143                }
1144                let mut buf = err.into_bytes();
1145                buf.truncate(valid_up_to);
1146                buf.shrink_to_fit();
1147                let string = String::from_utf8(buf)
1148                    .expect("failed to parse utf-8 even after correcting error");
1149                self.pos += string.len() as u64;
1150                Ok(Some(string))
1151            }
1152            Err(err) => {
1153                // There is an error at the beginning and we have no hope of parsing further.
1154                let shell_error = ShellError::NonUtf8Custom {
1155                    msg: format!("invalid utf-8 sequence starting at index {}", self.pos),
1156                    span: self.span,
1157                };
1158                let buf = err.into_bytes();
1159                // We are consuming the entire buf though, because we're returning it in case it
1160                // will be cast to binary
1161                if buf.len() > consumed {
1162                    self.reader.consume(buf.len() - consumed);
1163                }
1164                self.pos += buf.len() as u64;
1165                Err((buf, shell_error))
1166            }
1167        }
1168    }
1169}
1170
1171impl Iterator for Chunks {
1172    type Item = Result<Value, ShellError>;
1173
1174    fn next(&mut self) -> Option<Self::Item> {
1175        if self.error || self.signals.interrupted() {
1176            None
1177        } else {
1178            match self.type_ {
1179                // Binary should always be binary
1180                ByteStreamType::Binary => {
1181                    let buf = match self.reader.fill_buf() {
1182                        Ok(buf) => buf,
1183                        Err(err) => {
1184                            self.error = true;
1185                            return Some(Err(ShellError::Io(IoError::new(err, self.span, None))));
1186                        }
1187                    };
1188                    if !buf.is_empty() {
1189                        let len = buf.len();
1190                        let value = Value::binary(buf, self.span);
1191                        self.reader.consume(len);
1192                        self.pos += len as u64;
1193                        Some(Ok(value))
1194                    } else {
1195                        None
1196                    }
1197                }
1198                // String produces an error if UTF-8 can't be parsed
1199                ByteStreamType::String => match self.next_string().transpose()? {
1200                    Ok(string) => Some(Ok(Value::string(string, self.span))),
1201                    Err((_, err)) => {
1202                        self.error = true;
1203                        Some(Err(err))
1204                    }
1205                },
1206                // For Unknown, we try to create strings, but we switch to binary mode if we
1207                // fail
1208                ByteStreamType::Unknown => {
1209                    match self.next_string().transpose()? {
1210                        Ok(string) => Some(Ok(Value::string(string, self.span))),
1211                        Err((buf, _)) if !buf.is_empty() => {
1212                            // Switch to binary mode
1213                            self.type_ = ByteStreamType::Binary;
1214                            Some(Ok(Value::binary(buf, self.span)))
1215                        }
1216                        Err((_, err)) => {
1217                            self.error = true;
1218                            Some(Err(err))
1219                        }
1220                    }
1221                }
1222            }
1223        }
1224    }
1225}
1226
1227fn trim_end_newline(string: &mut String) {
1228    if string.ends_with('\n') {
1229        string.pop();
1230        if string.ends_with('\r') {
1231            string.pop();
1232        }
1233    }
1234}
1235
1236#[cfg(unix)]
1237pub(crate) fn convert_file<T: From<OwnedFd>>(file: impl Into<OwnedFd>) -> T {
1238    file.into().into()
1239}
1240
1241#[cfg(windows)]
1242pub(crate) fn convert_file<T: From<OwnedHandle>>(file: impl Into<OwnedHandle>) -> T {
1243    file.into().into()
1244}
1245
1246const DEFAULT_BUF_SIZE: usize = 8192;
1247
1248pub fn copy_with_signals(
1249    mut reader: impl Read,
1250    mut writer: impl Write,
1251    span: Span,
1252    signals: &Signals,
1253) -> Result<u64, ShellError> {
1254    let from_io_error = IoError::factory(span, None);
1255    if signals.is_empty() {
1256        match io::copy(&mut reader, &mut writer) {
1257            Ok(n) => {
1258                writer.flush().map_err(&from_io_error)?;
1259                Ok(n)
1260            }
1261            Err(err) => {
1262                let _ = writer.flush();
1263                match ShellErrorBridge::try_from(err) {
1264                    Ok(ShellErrorBridge(shell_error)) => Err(shell_error),
1265                    Err(err) => Err(from_io_error(err).into()),
1266                }
1267            }
1268        }
1269    } else {
1270        // #[cfg(any(target_os = "linux", target_os = "android"))]
1271        // {
1272        //     return crate::sys::kernel_copy::copy_spec(reader, writer);
1273        // }
1274        match generic_copy(&mut reader, &mut writer, span, signals) {
1275            Ok(len) => {
1276                writer.flush().map_err(&from_io_error)?;
1277                Ok(len)
1278            }
1279            Err(err) => {
1280                let _ = writer.flush();
1281                Err(err)
1282            }
1283        }
1284    }
1285}
1286
1287// Copied from [`std::io::copy`]
1288fn generic_copy(
1289    mut reader: impl Read,
1290    mut writer: impl Write,
1291    span: Span,
1292    signals: &Signals,
1293) -> Result<u64, ShellError> {
1294    let from_io_error = IoError::factory(span, None);
1295    let buf = &mut [0; DEFAULT_BUF_SIZE];
1296    let mut len = 0;
1297    loop {
1298        signals.check(&span)?;
1299        let n = match reader.read(buf) {
1300            Ok(0) => break,
1301            Ok(n) => n,
1302            Err(e) if e.kind() == ErrorKind::Interrupted => continue,
1303            Err(e) => match ShellErrorBridge::try_from(e) {
1304                Ok(ShellErrorBridge(e)) => return Err(e),
1305                Err(e) => return Err(from_io_error(e).into()),
1306            },
1307        };
1308        len += n;
1309        writer.write_all(&buf[..n]).map_err(&from_io_error)?;
1310    }
1311    Ok(len as u64)
1312}
1313
1314struct ReadGenerator<F>
1315where
1316    F: FnMut(&mut Vec<u8>) -> Result<bool, ShellError> + Send + 'static,
1317{
1318    buffer: Cursor<Vec<u8>>,
1319    generator: F,
1320}
1321
1322impl<F> BufRead for ReadGenerator<F>
1323where
1324    F: FnMut(&mut Vec<u8>) -> Result<bool, ShellError> + Send + 'static,
1325{
1326    fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
1327        // We have to loop, because it's important that we don't leave the buffer empty unless we're
1328        // truly at the end of the stream.
1329        while self.buffer.fill_buf()?.is_empty() {
1330            // Reset the cursor to the beginning and truncate
1331            self.buffer.set_position(0);
1332            self.buffer.get_mut().clear();
1333            // Ask the generator to generate data
1334            if !(self.generator)(self.buffer.get_mut()).map_err(ShellErrorBridge)? {
1335                // End of stream
1336                break;
1337            }
1338        }
1339        self.buffer.fill_buf()
1340    }
1341
1342    fn consume(&mut self, amt: usize) {
1343        self.buffer.consume(amt);
1344    }
1345}
1346
1347impl<F> Read for ReadGenerator<F>
1348where
1349    F: FnMut(&mut Vec<u8>) -> Result<bool, ShellError> + Send + 'static,
1350{
1351    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
1352        // Straightforward implementation on top of BufRead
1353        let slice = self.fill_buf()?;
1354        let len = buf.len().min(slice.len());
1355        buf[..len].copy_from_slice(&slice[..len]);
1356        self.consume(len);
1357        Ok(len)
1358    }
1359}
1360
1361#[cfg(test)]
1362mod tests {
1363    use super::*;
1364
1365    fn test_chunks<T>(data: Vec<T>, type_: ByteStreamType) -> Chunks
1366    where
1367        T: AsRef<[u8]> + Default + Send + 'static,
1368    {
1369        let reader = ReadIterator {
1370            iter: data.into_iter(),
1371            cursor: Some(Cursor::new(T::default())),
1372        };
1373        Chunks::new(
1374            SourceReader::Read(Box::new(reader)),
1375            Span::test_data(),
1376            Signals::empty(),
1377            type_,
1378        )
1379    }
1380
1381    #[test]
1382    fn chunks_read_binary_passthrough() {
1383        let bins = vec![&[0, 1][..], &[2, 3][..]];
1384        let iter = test_chunks(bins.clone(), ByteStreamType::Binary);
1385
1386        let bins_values: Vec<Value> = bins
1387            .into_iter()
1388            .map(|bin| Value::binary(bin, Span::test_data()))
1389            .collect();
1390        assert_eq!(
1391            bins_values,
1392            iter.collect::<Result<Vec<Value>, _>>().expect("error")
1393        );
1394    }
1395
1396    #[test]
1397    fn chunks_read_string_clean() {
1398        let strs = vec!["Nushell", "が好きです"];
1399        let iter = test_chunks(strs.clone(), ByteStreamType::String);
1400
1401        let strs_values: Vec<Value> = strs
1402            .into_iter()
1403            .map(|string| Value::string(string, Span::test_data()))
1404            .collect();
1405        assert_eq!(
1406            strs_values,
1407            iter.collect::<Result<Vec<Value>, _>>().expect("error")
1408        );
1409    }
1410
1411    #[test]
1412    fn chunks_read_string_split_boundary() {
1413        let real = "Nushell最高!";
1414        let chunks = vec![&b"Nushell\xe6"[..], &b"\x9c\x80\xe9"[..], &b"\xab\x98!"[..]];
1415        let iter = test_chunks(chunks.clone(), ByteStreamType::String);
1416
1417        let mut string = String::new();
1418        for value in iter {
1419            let chunk_string = value.expect("error").into_string().expect("not a string");
1420            string.push_str(&chunk_string);
1421        }
1422        assert_eq!(real, string);
1423    }
1424
1425    #[test]
1426    fn chunks_read_string_utf8_error() {
1427        let chunks = vec![&b"Nushell\xe6"[..], &b"\x9c\x80\xe9"[..], &b"\xab"[..]];
1428        let iter = test_chunks(chunks, ByteStreamType::String);
1429
1430        let mut string = String::new();
1431        for value in iter {
1432            match value {
1433                Ok(value) => string.push_str(&value.into_string().expect("not a string")),
1434                Err(err) => {
1435                    println!("string so far: {string:?}");
1436                    println!("got error: {err:?}");
1437                    assert!(!string.is_empty());
1438                    assert!(matches!(err, ShellError::NonUtf8Custom { .. }));
1439                    return;
1440                }
1441            }
1442        }
1443        panic!("no error");
1444    }
1445
1446    #[test]
1447    fn chunks_read_unknown_fallback() {
1448        let chunks = vec![&b"Nushell"[..], &b"\x9c\x80\xe9abcd"[..], &b"efgh"[..]];
1449        let mut iter = test_chunks(chunks, ByteStreamType::Unknown);
1450
1451        let mut get = || iter.next().expect("end of iter").expect("error");
1452
1453        assert_eq!(Value::test_string("Nushell"), get());
1454        assert_eq!(Value::test_binary(b"\x9c\x80\xe9abcd"), get());
1455        // Once it's in binary mode it won't go back
1456        assert_eq!(Value::test_binary(b"efgh"), get());
1457    }
1458}