nu_protocol/pipeline/
pipeline_data.rs

1#[cfg(feature = "os")]
2use crate::process::ExitStatusFuture;
3use crate::{
4    ByteStream, ByteStreamSource, ByteStreamType, Config, ListStream, OutDest, PipelineMetadata,
5    Range, ShellError, Signals, Span, Type, Value,
6    ast::{Call, PathMember},
7    engine::{EngineState, Stack},
8    location,
9    shell_error::{io::IoError, location::Location},
10};
11use std::{
12    borrow::Cow,
13    io::Write,
14    ops::Deref,
15    sync::{Arc, Mutex},
16};
17
18const LINE_ENDING_PATTERN: &[char] = &['\r', '\n'];
19
20/// The foundational abstraction for input and output to commands
21///
22/// This represents either a single Value or a stream of values coming into the command or leaving a command.
23///
24/// A note on implementation:
25///
26/// We've tried a few variations of this structure. Listing these below so we have a record.
27///
28/// * We tried always assuming a stream in Nushell. This was a great 80% solution, but it had some rough edges.
29///   Namely, how do you know the difference between a single string and a list of one string. How do you know
30///   when to flatten the data given to you from a data source into the stream or to keep it as an unflattened
31///   list?
32///
33/// * We tried putting the stream into Value. This had some interesting properties as now commands "just worked
34///   on values", but lead to a few unfortunate issues.
35///
36/// The first is that you can't easily clone Values in a way that felt largely immutable. For example, if
37/// you cloned a Value which contained a stream, and in one variable drained some part of it, then the second
38/// variable would see different values based on what you did to the first.
39///
40/// To make this kind of mutation thread-safe, we would have had to produce a lock for the stream, which in
41/// practice would have meant always locking the stream before reading from it. But more fundamentally, it
42/// felt wrong in practice that observation of a value at runtime could affect other values which happen to
43/// alias the same stream. By separating these, we don't have this effect. Instead, variables could get
44/// concrete list values rather than streams, and be able to view them without non-local effects.
45///
46/// * A balance of the two approaches is what we've landed on: Values are thread-safe to pass, and we can stream
47///   them into any sources. Streams are still available to model the infinite streams approach of original
48///   Nushell.
49#[derive(Debug)]
50pub enum PipelineData {
51    Empty,
52    Value(Value, Option<PipelineMetadata>),
53    ListStream(ListStream, Option<PipelineMetadata>),
54    ByteStream(ByteStream, Option<PipelineMetadata>),
55}
56
57impl PipelineData {
58    pub const fn empty() -> PipelineData {
59        PipelineData::Empty
60    }
61
62    pub fn value(val: Value, metadata: impl Into<Option<PipelineMetadata>>) -> Self {
63        PipelineData::Value(val, metadata.into())
64    }
65
66    pub fn list_stream(stream: ListStream, metadata: impl Into<Option<PipelineMetadata>>) -> Self {
67        PipelineData::ListStream(stream, metadata.into())
68    }
69
70    pub fn byte_stream(stream: ByteStream, metadata: impl Into<Option<PipelineMetadata>>) -> Self {
71        PipelineData::ByteStream(stream, metadata.into())
72    }
73
74    pub fn metadata(&self) -> Option<PipelineMetadata> {
75        match self {
76            PipelineData::Empty => None,
77            PipelineData::Value(_, meta)
78            | PipelineData::ListStream(_, meta)
79            | PipelineData::ByteStream(_, meta) => meta.clone(),
80        }
81    }
82
83    pub fn set_metadata(mut self, metadata: Option<PipelineMetadata>) -> Self {
84        match &mut self {
85            PipelineData::Empty => {}
86            PipelineData::Value(_, meta)
87            | PipelineData::ListStream(_, meta)
88            | PipelineData::ByteStream(_, meta) => *meta = metadata,
89        }
90        self
91    }
92
93    pub fn is_nothing(&self) -> bool {
94        matches!(self, PipelineData::Value(Value::Nothing { .. }, ..))
95            || matches!(self, PipelineData::Empty)
96    }
97
98    /// PipelineData doesn't always have a Span, but we can try!
99    pub fn span(&self) -> Option<Span> {
100        match self {
101            PipelineData::Empty => None,
102            PipelineData::Value(value, ..) => Some(value.span()),
103            PipelineData::ListStream(stream, ..) => Some(stream.span()),
104            PipelineData::ByteStream(stream, ..) => Some(stream.span()),
105        }
106    }
107
108    /// Change the span of the [`PipelineData`].
109    ///
110    /// Returns `Value(Nothing)` with the given span if it was [`PipelineData::empty()`].
111    pub fn with_span(self, span: Span) -> Self {
112        match self {
113            PipelineData::Empty => PipelineData::value(Value::nothing(span), None),
114            PipelineData::Value(value, metadata) => {
115                PipelineData::value(value.with_span(span), metadata)
116            }
117            PipelineData::ListStream(stream, metadata) => {
118                PipelineData::list_stream(stream.with_span(span), metadata)
119            }
120            PipelineData::ByteStream(stream, metadata) => {
121                PipelineData::byte_stream(stream.with_span(span), metadata)
122            }
123        }
124    }
125
126    /// Get a type that is representative of the `PipelineData`.
127    ///
128    /// The type returned here makes no effort to collect a stream, so it may be a different type
129    /// than would be returned by [`Value::get_type()`] on the result of
130    /// [`.into_value()`](Self::into_value).
131    ///
132    /// Specifically, a `ListStream` results in `list<any>` rather than
133    /// the fully complete [`list`](Type::List) type (which would require knowing the contents),
134    /// and a `ByteStream` with [unknown](crate::ByteStreamType::Unknown) type results in
135    /// [`any`](Type::Any) rather than [`string`](Type::String) or [`binary`](Type::Binary).
136    pub fn get_type(&self) -> Type {
137        match self {
138            PipelineData::Empty => Type::Nothing,
139            PipelineData::Value(value, _) => value.get_type(),
140            PipelineData::ListStream(_, _) => Type::list(Type::Any),
141            PipelineData::ByteStream(stream, _) => stream.type_().into(),
142        }
143    }
144
145    /// Determine if the `PipelineData` is a [subtype](https://en.wikipedia.org/wiki/Subtyping) of `other`.
146    ///
147    /// This check makes no effort to collect a stream, so it may be a different result
148    /// than would be returned by calling [`Value::is_subtype_of()`] on the result of
149    /// [`.into_value()`](Self::into_value).
150    ///
151    /// A `ListStream` acts the same as an empty list type: it is a subtype of any [`list`](Type::List)
152    /// or [`table`](Type::Table) type. After converting to a value, it may become a more specific type.
153    /// For example, a `ListStream` is a subtype of `list<int>` and `list<string>`.
154    /// If calling [`.into_value()`](Self::into_value) results in a `list<int>`,
155    /// then the value would not be a subtype of `list<string>`, in contrast to the original `ListStream`.
156    ///
157    /// A `ByteStream` is a subtype of [`string`](Type::String) if it is coercible into a string.
158    /// Likewise, a `ByteStream` is a subtype of [`binary`](Type::Binary) if it is coercible into a binary value.
159    pub fn is_subtype_of(&self, other: &Type) -> bool {
160        match (self, other) {
161            (_, Type::Any) => true,
162            (PipelineData::Empty, Type::Nothing) => true,
163            (PipelineData::Value(val, ..), ty) => val.is_subtype_of(ty),
164
165            // a list stream could be a list with any type, including a table
166            (PipelineData::ListStream(..), Type::List(..) | Type::Table(..)) => true,
167
168            (PipelineData::ByteStream(stream, ..), Type::String)
169                if stream.type_().is_string_coercible() =>
170            {
171                true
172            }
173            (PipelineData::ByteStream(stream, ..), Type::Binary)
174                if stream.type_().is_binary_coercible() =>
175            {
176                true
177            }
178
179            (PipelineData::Empty, _) => false,
180            (PipelineData::ListStream(..), _) => false,
181            (PipelineData::ByteStream(..), _) => false,
182        }
183    }
184
185    pub fn into_value(self, span: Span) -> Result<Value, ShellError> {
186        match self {
187            PipelineData::Empty => Ok(Value::nothing(span)),
188            PipelineData::Value(value, ..) => {
189                if value.span() == Span::unknown() {
190                    Ok(value.with_span(span))
191                } else {
192                    Ok(value)
193                }
194            }
195            PipelineData::ListStream(stream, ..) => stream.into_value(),
196            PipelineData::ByteStream(stream, ..) => stream.into_value(),
197        }
198    }
199
200    /// Converts any `Value` variant that can be represented as a stream into its stream variant.
201    ///
202    /// This means that lists and ranges are converted into list streams, and strings and binary are
203    /// converted into byte streams.
204    ///
205    /// Returns an `Err` with the original stream if the variant couldn't be converted to a stream
206    /// variant. If the variant is already a stream variant, it is returned as-is.
207    pub fn try_into_stream(self, engine_state: &EngineState) -> Result<PipelineData, PipelineData> {
208        let span = self.span().unwrap_or(Span::unknown());
209        match self {
210            PipelineData::ListStream(..) | PipelineData::ByteStream(..) => Ok(self),
211            PipelineData::Value(Value::List { .. } | Value::Range { .. }, ref metadata) => {
212                let metadata = metadata.clone();
213                Ok(PipelineData::list_stream(
214                    ListStream::new(self.into_iter(), span, engine_state.signals().clone()),
215                    metadata,
216                ))
217            }
218            PipelineData::Value(Value::String { val, .. }, metadata) => {
219                Ok(PipelineData::byte_stream(
220                    ByteStream::read_string(val, span, engine_state.signals().clone()),
221                    metadata,
222                ))
223            }
224            PipelineData::Value(Value::Binary { val, .. }, metadata) => {
225                Ok(PipelineData::byte_stream(
226                    ByteStream::read_binary(val, span, engine_state.signals().clone()),
227                    metadata,
228                ))
229            }
230            _ => Err(self),
231        }
232    }
233
234    /// Drain and write this [`PipelineData`] to `dest`.
235    ///
236    /// Values are converted to bytes and separated by newlines if this is a `ListStream`.
237    pub fn write_to(self, mut dest: impl Write) -> Result<(), ShellError> {
238        match self {
239            PipelineData::Empty => Ok(()),
240            PipelineData::Value(value, ..) => {
241                let bytes = value_to_bytes(value)?;
242                dest.write_all(&bytes).map_err(|err| {
243                    IoError::new_internal(
244                        err,
245                        "Could not write PipelineData to dest",
246                        crate::location!(),
247                    )
248                })?;
249                dest.flush().map_err(|err| {
250                    IoError::new_internal(
251                        err,
252                        "Could not flush PipelineData to dest",
253                        crate::location!(),
254                    )
255                })?;
256                Ok(())
257            }
258            PipelineData::ListStream(stream, ..) => {
259                for value in stream {
260                    let bytes = value_to_bytes(value)?;
261                    dest.write_all(&bytes).map_err(|err| {
262                        IoError::new_internal(
263                            err,
264                            "Could not write PipelineData to dest",
265                            crate::location!(),
266                        )
267                    })?;
268                    dest.write_all(b"\n").map_err(|err| {
269                        IoError::new_internal(
270                            err,
271                            "Could not write linebreak after PipelineData to dest",
272                            crate::location!(),
273                        )
274                    })?;
275                }
276                dest.flush().map_err(|err| {
277                    IoError::new_internal(
278                        err,
279                        "Could not flush PipelineData to dest",
280                        crate::location!(),
281                    )
282                })?;
283                Ok(())
284            }
285            PipelineData::ByteStream(stream, ..) => stream.write_to(dest),
286        }
287    }
288
289    /// Drain this [`PipelineData`] according to the current stdout [`OutDest`]s in `stack`.
290    ///
291    /// For [`OutDest::Pipe`] and [`OutDest::PipeSeparate`], this will return the [`PipelineData`]
292    /// as is. For [`OutDest::Value`], this will collect into a value and return it. For
293    /// [`OutDest::Print`], the [`PipelineData`] is drained and printed. Otherwise, the
294    /// [`PipelineData`] is drained, but only printed if it is the output of an external command.
295    pub fn drain_to_out_dests(
296        self,
297        engine_state: &EngineState,
298        stack: &mut Stack,
299    ) -> Result<Self, ShellError> {
300        match stack.pipe_stdout().unwrap_or(&OutDest::Inherit) {
301            OutDest::Print => {
302                self.print_table(engine_state, stack, false, false)?;
303                Ok(Self::Empty)
304            }
305            OutDest::Pipe | OutDest::PipeSeparate => Ok(self),
306            OutDest::Value => {
307                let metadata = self.metadata();
308                let span = self.span().unwrap_or(Span::unknown());
309                self.into_value(span).map(|val| Self::Value(val, metadata))
310            }
311            OutDest::File(file) => {
312                self.write_to(file.as_ref())?;
313                Ok(Self::Empty)
314            }
315            OutDest::Null | OutDest::Inherit => {
316                self.drain()?;
317                Ok(Self::Empty)
318            }
319        }
320    }
321
322    pub fn drain(self) -> Result<(), ShellError> {
323        match self {
324            Self::Empty => Ok(()),
325            Self::Value(Value::Error { error, .. }, ..) => Err(*error),
326            Self::Value(..) => Ok(()),
327            Self::ListStream(stream, ..) => stream.drain(),
328            Self::ByteStream(stream, ..) => stream.drain(),
329        }
330    }
331
332    /// Try convert from self into iterator
333    ///
334    /// It returns Err if the `self` cannot be converted to an iterator.
335    ///
336    /// The `span` should be the span of the command or operation that would raise an error.
337    pub fn into_iter_strict(self, span: Span) -> Result<PipelineIterator, ShellError> {
338        Ok(PipelineIterator(match self {
339            PipelineData::Value(value, ..) => {
340                let val_span = value.span();
341                match value {
342                    Value::List { vals, .. } => PipelineIteratorInner::ListStream(
343                        ListStream::new(vals.into_iter(), val_span, Signals::empty()).into_iter(),
344                    ),
345                    Value::Binary { val, .. } => PipelineIteratorInner::ListStream(
346                        ListStream::new(
347                            val.into_iter().map(move |x| Value::int(x as i64, val_span)),
348                            val_span,
349                            Signals::empty(),
350                        )
351                        .into_iter(),
352                    ),
353                    Value::Range { val, .. } => PipelineIteratorInner::ListStream(
354                        ListStream::new(
355                            val.into_range_iter(val_span, Signals::empty()),
356                            val_span,
357                            Signals::empty(),
358                        )
359                        .into_iter(),
360                    ),
361                    // Propagate errors by explicitly matching them before the final case.
362                    Value::Error { error, .. } => return Err(*error),
363                    other => {
364                        return Err(ShellError::OnlySupportsThisInputType {
365                            exp_input_type: "list, binary, range, or byte stream".into(),
366                            wrong_type: other.get_type().to_string(),
367                            dst_span: span,
368                            src_span: val_span,
369                        });
370                    }
371                }
372            }
373            PipelineData::ListStream(stream, ..) => {
374                PipelineIteratorInner::ListStream(stream.into_iter())
375            }
376            PipelineData::Empty => {
377                return Err(ShellError::OnlySupportsThisInputType {
378                    exp_input_type: "list, binary, range, or byte stream".into(),
379                    wrong_type: "null".into(),
380                    dst_span: span,
381                    src_span: span,
382                });
383            }
384            PipelineData::ByteStream(stream, ..) => {
385                if let Some(chunks) = stream.chunks() {
386                    PipelineIteratorInner::ByteStream(chunks)
387                } else {
388                    PipelineIteratorInner::Empty
389                }
390            }
391        }))
392    }
393
394    pub fn collect_string(self, separator: &str, config: &Config) -> Result<String, ShellError> {
395        match self {
396            PipelineData::Empty => Ok(String::new()),
397            PipelineData::Value(value, ..) => Ok(value.to_expanded_string(separator, config)),
398            PipelineData::ListStream(stream, ..) => Ok(stream.into_string(separator, config)),
399            PipelineData::ByteStream(stream, ..) => stream.into_string(),
400        }
401    }
402
403    /// Retrieves string from pipeline data.
404    ///
405    /// As opposed to `collect_string` this raises error rather than converting non-string values.
406    /// The `span` will be used if `ListStream` is encountered since it doesn't carry a span.
407    pub fn collect_string_strict(
408        self,
409        span: Span,
410    ) -> Result<(String, Span, Option<PipelineMetadata>), ShellError> {
411        match self {
412            PipelineData::Empty => Ok((String::new(), span, None)),
413            PipelineData::Value(Value::String { val, .. }, metadata) => Ok((val, span, metadata)),
414            PipelineData::Value(val, ..) => Err(ShellError::TypeMismatch {
415                err_message: "string".into(),
416                span: val.span(),
417            }),
418            PipelineData::ListStream(..) => Err(ShellError::TypeMismatch {
419                err_message: "string".into(),
420                span,
421            }),
422            PipelineData::ByteStream(stream, metadata) => {
423                let span = stream.span();
424                Ok((stream.into_string()?, span, metadata))
425            }
426        }
427    }
428
429    pub fn follow_cell_path(
430        self,
431        cell_path: &[PathMember],
432        head: Span,
433    ) -> Result<Value, ShellError> {
434        match self {
435            // FIXME: there are probably better ways of doing this
436            PipelineData::ListStream(stream, ..) => Value::list(stream.into_iter().collect(), head)
437                .follow_cell_path(cell_path)
438                .map(Cow::into_owned),
439            PipelineData::Value(v, ..) => v.follow_cell_path(cell_path).map(Cow::into_owned),
440            PipelineData::Empty => Err(ShellError::IncompatiblePathAccess {
441                type_name: "empty pipeline".to_string(),
442                span: head,
443            }),
444            PipelineData::ByteStream(stream, ..) => Err(ShellError::IncompatiblePathAccess {
445                type_name: stream.type_().describe().to_owned(),
446                span: stream.span(),
447            }),
448        }
449    }
450
451    /// Simplified mapper to help with simple values also. For full iterator support use `.into_iter()` instead
452    pub fn map<F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
453    where
454        Self: Sized,
455        F: FnMut(Value) -> Value + 'static + Send,
456    {
457        match self {
458            PipelineData::Value(value, metadata) => {
459                let span = value.span();
460                let pipeline = match value {
461                    Value::List { vals, .. } => vals
462                        .into_iter()
463                        .map(f)
464                        .into_pipeline_data(span, signals.clone()),
465                    Value::Range { val, .. } => val
466                        .into_range_iter(span, Signals::empty())
467                        .map(f)
468                        .into_pipeline_data(span, signals.clone()),
469                    value => match f(value) {
470                        Value::Error { error, .. } => return Err(*error),
471                        v => v.into_pipeline_data(),
472                    },
473                };
474                Ok(pipeline.set_metadata(metadata))
475            }
476            PipelineData::Empty => Ok(PipelineData::empty()),
477            PipelineData::ListStream(stream, metadata) => {
478                Ok(PipelineData::list_stream(stream.map(f), metadata))
479            }
480            PipelineData::ByteStream(stream, metadata) => {
481                Ok(f(stream.into_value()?).into_pipeline_data_with_metadata(metadata))
482            }
483        }
484    }
485
486    /// Simplified flatmapper. For full iterator support use `.into_iter()` instead
487    pub fn flat_map<U, F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
488    where
489        Self: Sized,
490        U: IntoIterator<Item = Value> + 'static,
491        <U as IntoIterator>::IntoIter: 'static + Send,
492        F: FnMut(Value) -> U + 'static + Send,
493    {
494        match self {
495            PipelineData::Empty => Ok(PipelineData::empty()),
496            PipelineData::Value(value, metadata) => {
497                let span = value.span();
498                let pipeline = match value {
499                    Value::List { vals, .. } => vals
500                        .into_iter()
501                        .flat_map(f)
502                        .into_pipeline_data(span, signals.clone()),
503                    Value::Range { val, .. } => val
504                        .into_range_iter(span, Signals::empty())
505                        .flat_map(f)
506                        .into_pipeline_data(span, signals.clone()),
507                    value => f(value)
508                        .into_iter()
509                        .into_pipeline_data(span, signals.clone()),
510                };
511                Ok(pipeline.set_metadata(metadata))
512            }
513            PipelineData::ListStream(stream, metadata) => Ok(PipelineData::list_stream(
514                stream.modify(|iter| iter.flat_map(f)),
515                metadata,
516            )),
517            PipelineData::ByteStream(stream, metadata) => {
518                // TODO: is this behavior desired / correct ?
519                let span = stream.span();
520                let iter = match String::from_utf8(stream.into_bytes()?) {
521                    Ok(mut str) => {
522                        str.truncate(str.trim_end_matches(LINE_ENDING_PATTERN).len());
523                        f(Value::string(str, span))
524                    }
525                    Err(err) => f(Value::binary(err.into_bytes(), span)),
526                };
527                Ok(iter.into_iter().into_pipeline_data_with_metadata(
528                    span,
529                    signals.clone(),
530                    metadata,
531                ))
532            }
533        }
534    }
535
536    pub fn filter<F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
537    where
538        Self: Sized,
539        F: FnMut(&Value) -> bool + 'static + Send,
540    {
541        match self {
542            PipelineData::Empty => Ok(PipelineData::empty()),
543            PipelineData::Value(value, metadata) => {
544                let span = value.span();
545                let pipeline = match value {
546                    Value::List { vals, .. } => vals
547                        .into_iter()
548                        .filter(f)
549                        .into_pipeline_data(span, signals.clone()),
550                    Value::Range { val, .. } => val
551                        .into_range_iter(span, Signals::empty())
552                        .filter(f)
553                        .into_pipeline_data(span, signals.clone()),
554                    value => {
555                        if f(&value) {
556                            value.into_pipeline_data()
557                        } else {
558                            Value::nothing(span).into_pipeline_data()
559                        }
560                    }
561                };
562                Ok(pipeline.set_metadata(metadata))
563            }
564            PipelineData::ListStream(stream, metadata) => Ok(PipelineData::list_stream(
565                stream.modify(|iter| iter.filter(f)),
566                metadata,
567            )),
568            PipelineData::ByteStream(stream, metadata) => {
569                // TODO: is this behavior desired / correct ?
570                let span = stream.span();
571                let value = match String::from_utf8(stream.into_bytes()?) {
572                    Ok(mut str) => {
573                        str.truncate(str.trim_end_matches(LINE_ENDING_PATTERN).len());
574                        Value::string(str, span)
575                    }
576                    Err(err) => Value::binary(err.into_bytes(), span),
577                };
578                let value = if f(&value) {
579                    value
580                } else {
581                    Value::nothing(span)
582                };
583                Ok(value.into_pipeline_data_with_metadata(metadata))
584            }
585        }
586    }
587
588    /// Try to convert Value from Value::Range to Value::List.
589    /// This is useful to expand Value::Range into array notation, specifically when
590    /// converting `to json` or `to nuon`.
591    /// `1..3 | to XX -> [1,2,3]`
592    pub fn try_expand_range(self) -> Result<PipelineData, ShellError> {
593        match self {
594            PipelineData::Value(v, metadata) => {
595                let span = v.span();
596                match v {
597                    Value::Range { val, .. } => {
598                        match *val {
599                            Range::IntRange(range) => {
600                                if range.is_unbounded() {
601                                    return Err(ShellError::GenericError {
602                                        error: "Cannot create range".into(),
603                                        msg: "Unbounded ranges are not allowed when converting to this format".into(),
604                                        span: Some(span),
605                                        help: Some("Consider using ranges with valid start and end point.".into()),
606                                        inner: vec![],
607                                    });
608                                }
609                            }
610                            Range::FloatRange(range) => {
611                                if range.is_unbounded() {
612                                    return Err(ShellError::GenericError {
613                                        error: "Cannot create range".into(),
614                                        msg: "Unbounded ranges are not allowed when converting to this format".into(),
615                                        span: Some(span),
616                                        help: Some("Consider using ranges with valid start and end point.".into()),
617                                        inner: vec![],
618                                    });
619                                }
620                            }
621                        }
622                        let range_values: Vec<Value> =
623                            val.into_range_iter(span, Signals::empty()).collect();
624                        Ok(PipelineData::value(Value::list(range_values, span), None))
625                    }
626                    x => Ok(PipelineData::value(x, metadata)),
627                }
628            }
629            _ => Ok(self),
630        }
631    }
632
633    /// Consume and print self data immediately, formatted using table command.
634    ///
635    /// This does not respect the display_output hook. If a value is being printed out by a command,
636    /// this function should be used. Otherwise, `nu_cli::util::print_pipeline` should be preferred.
637    ///
638    /// `no_newline` controls if we need to attach newline character to output.
639    /// `to_stderr` controls if data is output to stderr, when the value is false, the data is output to stdout.
640    pub fn print_table(
641        self,
642        engine_state: &EngineState,
643        stack: &mut Stack,
644        no_newline: bool,
645        to_stderr: bool,
646    ) -> Result<(), ShellError> {
647        match self {
648            // Print byte streams directly as long as they aren't binary.
649            PipelineData::ByteStream(stream, ..) if stream.type_() != ByteStreamType::Binary => {
650                stream.print(to_stderr)
651            }
652            _ => {
653                // If the table function is in the declarations, then we can use it
654                // to create the table value that will be printed in the terminal
655                if let Some(decl_id) = engine_state.table_decl_id {
656                    let command = engine_state.get_decl(decl_id);
657                    if command.block_id().is_some() {
658                        self.write_all_and_flush(engine_state, no_newline, to_stderr)
659                    } else {
660                        let call = Call::new(Span::new(0, 0));
661                        let table = command.run(engine_state, stack, &(&call).into(), self)?;
662                        table.write_all_and_flush(engine_state, no_newline, to_stderr)
663                    }
664                } else {
665                    self.write_all_and_flush(engine_state, no_newline, to_stderr)
666                }
667            }
668        }
669    }
670
671    /// Consume and print self data without any extra formatting.
672    ///
673    /// This does not use the `table` command to format data, and also prints binary values and
674    /// streams in their raw format without generating a hexdump first.
675    ///
676    /// `no_newline` controls if we need to attach newline character to output.
677    /// `to_stderr` controls if data is output to stderr, when the value is false, the data is output to stdout.
678    pub fn print_raw(
679        self,
680        engine_state: &EngineState,
681        no_newline: bool,
682        to_stderr: bool,
683    ) -> Result<(), ShellError> {
684        let span = self.span();
685        if let PipelineData::Value(Value::Binary { val: bytes, .. }, _) = self {
686            if to_stderr {
687                write_all_and_flush(
688                    bytes,
689                    &mut std::io::stderr().lock(),
690                    "stderr",
691                    span,
692                    engine_state.signals(),
693                )?;
694            } else {
695                write_all_and_flush(
696                    bytes,
697                    &mut std::io::stdout().lock(),
698                    "stdout",
699                    span,
700                    engine_state.signals(),
701                )?;
702            }
703            Ok(())
704        } else {
705            self.write_all_and_flush(engine_state, no_newline, to_stderr)
706        }
707    }
708
709    fn write_all_and_flush(
710        self,
711        engine_state: &EngineState,
712        no_newline: bool,
713        to_stderr: bool,
714    ) -> Result<(), ShellError> {
715        let span = self.span();
716        if let PipelineData::ByteStream(stream, ..) = self {
717            // Copy ByteStreams directly
718            stream.print(to_stderr)
719        } else {
720            let config = engine_state.get_config();
721            for item in self {
722                let mut out = if let Value::Error { error, .. } = item {
723                    return Err(*error);
724                } else {
725                    item.to_expanded_string("\n", config)
726                };
727
728                if !no_newline {
729                    out.push('\n');
730                }
731
732                if to_stderr {
733                    write_all_and_flush(
734                        out,
735                        &mut std::io::stderr().lock(),
736                        "stderr",
737                        span,
738                        engine_state.signals(),
739                    )?;
740                } else {
741                    write_all_and_flush(
742                        out,
743                        &mut std::io::stdout().lock(),
744                        "stdout",
745                        span,
746                        engine_state.signals(),
747                    )?;
748                }
749            }
750
751            Ok(())
752        }
753    }
754
755    pub fn unsupported_input_error(
756        self,
757        expected_type: impl Into<String>,
758        span: Span,
759    ) -> ShellError {
760        match self {
761            PipelineData::Empty => ShellError::PipelineEmpty { dst_span: span },
762            PipelineData::Value(value, ..) => ShellError::OnlySupportsThisInputType {
763                exp_input_type: expected_type.into(),
764                wrong_type: value.get_type().get_non_specified_string(),
765                dst_span: span,
766                src_span: value.span(),
767            },
768            PipelineData::ListStream(stream, ..) => ShellError::OnlySupportsThisInputType {
769                exp_input_type: expected_type.into(),
770                wrong_type: "list (stream)".into(),
771                dst_span: span,
772                src_span: stream.span(),
773            },
774            PipelineData::ByteStream(stream, ..) => ShellError::OnlySupportsThisInputType {
775                exp_input_type: expected_type.into(),
776                wrong_type: stream.type_().describe().into(),
777                dst_span: span,
778                src_span: stream.span(),
779            },
780        }
781    }
782
783    // PipelineData might connect to a running process which has an exit status future
784    // Use this method to retrieve that future, it's useful for implementing `pipefail` feature.
785    #[cfg(feature = "os")]
786    pub fn clone_exit_status_future(&self) -> Option<Arc<Mutex<ExitStatusFuture>>> {
787        match self {
788            PipelineData::Empty | PipelineData::Value(..) | PipelineData::ListStream(..) => None,
789            PipelineData::ByteStream(stream, ..) => match stream.source() {
790                ByteStreamSource::Read(..) | ByteStreamSource::File(..) => None,
791                ByteStreamSource::Child(c) => Some(c.clone_exit_status_future()),
792            },
793        }
794    }
795}
796
797pub fn write_all_and_flush<T>(
798    data: T,
799    destination: &mut impl Write,
800    destination_name: &str,
801    span: Option<Span>,
802    signals: &Signals,
803) -> Result<(), ShellError>
804where
805    T: AsRef<[u8]>,
806{
807    let io_error_map = |err: std::io::Error, location: Location| {
808        let context = format!("Writing to {destination_name} failed");
809        match span {
810            None => IoError::new_internal(err, context, location),
811            Some(span) if span == Span::unknown() => IoError::new_internal(err, context, location),
812            Some(span) => IoError::new_with_additional_context(err, span, None, context),
813        }
814    };
815
816    let span = span.unwrap_or(Span::unknown());
817    const OUTPUT_CHUNK_SIZE: usize = 8192;
818    for chunk in data.as_ref().chunks(OUTPUT_CHUNK_SIZE) {
819        signals.check(&span)?;
820        destination
821            .write_all(chunk)
822            .map_err(|err| io_error_map(err, location!()))?;
823    }
824    destination
825        .flush()
826        .map_err(|err| io_error_map(err, location!()))?;
827    Ok(())
828}
829
830enum PipelineIteratorInner {
831    Empty,
832    Value(Value),
833    ListStream(crate::list_stream::IntoIter),
834    ByteStream(crate::byte_stream::Chunks),
835}
836
837pub struct PipelineIterator(PipelineIteratorInner);
838
839impl IntoIterator for PipelineData {
840    type Item = Value;
841
842    type IntoIter = PipelineIterator;
843
844    fn into_iter(self) -> Self::IntoIter {
845        PipelineIterator(match self {
846            PipelineData::Empty => PipelineIteratorInner::Empty,
847            PipelineData::Value(value, ..) => {
848                let span = value.span();
849                match value {
850                    Value::List { vals, .. } => PipelineIteratorInner::ListStream(
851                        ListStream::new(vals.into_iter(), span, Signals::empty()).into_iter(),
852                    ),
853                    Value::Range { val, .. } => PipelineIteratorInner::ListStream(
854                        ListStream::new(
855                            val.into_range_iter(span, Signals::empty()),
856                            span,
857                            Signals::empty(),
858                        )
859                        .into_iter(),
860                    ),
861                    x => PipelineIteratorInner::Value(x),
862                }
863            }
864            PipelineData::ListStream(stream, ..) => {
865                PipelineIteratorInner::ListStream(stream.into_iter())
866            }
867            PipelineData::ByteStream(stream, ..) => stream.chunks().map_or(
868                PipelineIteratorInner::Empty,
869                PipelineIteratorInner::ByteStream,
870            ),
871        })
872    }
873}
874
875impl Iterator for PipelineIterator {
876    type Item = Value;
877
878    fn next(&mut self) -> Option<Self::Item> {
879        match &mut self.0 {
880            PipelineIteratorInner::Empty => None,
881            PipelineIteratorInner::Value(Value::Nothing { .. }, ..) => None,
882            PipelineIteratorInner::Value(v, ..) => Some(std::mem::take(v)),
883            PipelineIteratorInner::ListStream(stream, ..) => stream.next(),
884            PipelineIteratorInner::ByteStream(stream) => stream.next().map(|x| match x {
885                Ok(x) => x,
886                Err(err) => Value::error(
887                    err,
888                    Span::unknown(), //FIXME: unclear where this span should come from
889                ),
890            }),
891        }
892    }
893}
894
895pub trait IntoPipelineData {
896    fn into_pipeline_data(self) -> PipelineData;
897
898    fn into_pipeline_data_with_metadata(
899        self,
900        metadata: impl Into<Option<PipelineMetadata>>,
901    ) -> PipelineData;
902}
903
904impl<V> IntoPipelineData for V
905where
906    V: Into<Value>,
907{
908    fn into_pipeline_data(self) -> PipelineData {
909        PipelineData::value(self.into(), None)
910    }
911
912    fn into_pipeline_data_with_metadata(
913        self,
914        metadata: impl Into<Option<PipelineMetadata>>,
915    ) -> PipelineData {
916        PipelineData::value(self.into(), metadata.into())
917    }
918}
919
920pub trait IntoInterruptiblePipelineData {
921    fn into_pipeline_data(self, span: Span, signals: Signals) -> PipelineData;
922    fn into_pipeline_data_with_metadata(
923        self,
924        span: Span,
925        signals: Signals,
926        metadata: impl Into<Option<PipelineMetadata>>,
927    ) -> PipelineData;
928}
929
930impl<I> IntoInterruptiblePipelineData for I
931where
932    I: IntoIterator + Send + 'static,
933    I::IntoIter: Send + 'static,
934    <I::IntoIter as Iterator>::Item: Into<Value>,
935{
936    fn into_pipeline_data(self, span: Span, signals: Signals) -> PipelineData {
937        ListStream::new(self.into_iter().map(Into::into), span, signals).into()
938    }
939
940    fn into_pipeline_data_with_metadata(
941        self,
942        span: Span,
943        signals: Signals,
944        metadata: impl Into<Option<PipelineMetadata>>,
945    ) -> PipelineData {
946        PipelineData::list_stream(
947            ListStream::new(self.into_iter().map(Into::into), span, signals),
948            metadata.into(),
949        )
950    }
951}
952
953fn value_to_bytes(value: Value) -> Result<Vec<u8>, ShellError> {
954    let bytes = match value {
955        Value::String { val, .. } => val.into_bytes(),
956        Value::Binary { val, .. } => val,
957        Value::List { vals, .. } => {
958            let val = vals
959                .into_iter()
960                .map(Value::coerce_into_string)
961                .collect::<Result<Vec<String>, ShellError>>()?
962                .join("\n")
963                + "\n";
964
965            val.into_bytes()
966        }
967        // Propagate errors by explicitly matching them before the final case.
968        Value::Error { error, .. } => return Err(*error),
969        value => value.coerce_into_string()?.into_bytes(),
970    };
971    Ok(bytes)
972}
973
974/// A wrapper to [`PipelineData`] which can also track exit status.
975///
976/// We use exit status tracking to implement the `pipefail` feature.
977pub struct PipelineExecutionData {
978    pub body: PipelineData,
979    #[cfg(feature = "os")]
980    pub exit: Vec<Option<(Arc<Mutex<ExitStatusFuture>>, Span)>>,
981}
982
983impl Deref for PipelineExecutionData {
984    type Target = PipelineData;
985
986    fn deref(&self) -> &Self::Target {
987        &self.body
988    }
989}
990
991impl PipelineExecutionData {
992    pub fn empty() -> Self {
993        Self {
994            body: PipelineData::empty(),
995            #[cfg(feature = "os")]
996            exit: vec![],
997        }
998    }
999}
1000
1001impl From<PipelineData> for PipelineExecutionData {
1002    #[cfg(feature = "os")]
1003    fn from(value: PipelineData) -> Self {
1004        let value_span = value.span().unwrap_or_else(Span::unknown);
1005        let exit_status_future = value.clone_exit_status_future().map(|f| (f, value_span));
1006        Self {
1007            body: value,
1008            exit: vec![exit_status_future],
1009        }
1010    }
1011
1012    #[cfg(not(feature = "os"))]
1013    fn from(value: PipelineData) -> Self {
1014        Self { body: value }
1015    }
1016}