nu_protocol/pipeline/
pipeline_data.rs

1use crate::{
2    ByteStream, ByteStreamType, Config, ListStream, OutDest, PipelineMetadata, Range, ShellError,
3    Signals, Span, Type, Value,
4    ast::{Call, PathMember},
5    engine::{EngineState, Stack},
6    location,
7    shell_error::{io::IoError, location::Location},
8};
9use std::{borrow::Cow, io::Write};
10
11const LINE_ENDING_PATTERN: &[char] = &['\r', '\n'];
12
13/// The foundational abstraction for input and output to commands
14///
15/// This represents either a single Value or a stream of values coming into the command or leaving a command.
16///
17/// A note on implementation:
18///
19/// We've tried a few variations of this structure. Listing these below so we have a record.
20///
21/// * We tried always assuming a stream in Nushell. This was a great 80% solution, but it had some rough edges.
22///   Namely, how do you know the difference between a single string and a list of one string. How do you know
23///   when to flatten the data given to you from a data source into the stream or to keep it as an unflattened
24///   list?
25///
26/// * We tried putting the stream into Value. This had some interesting properties as now commands "just worked
27///   on values", but lead to a few unfortunate issues.
28///
29/// The first is that you can't easily clone Values in a way that felt largely immutable. For example, if
30/// you cloned a Value which contained a stream, and in one variable drained some part of it, then the second
31/// variable would see different values based on what you did to the first.
32///
33/// To make this kind of mutation thread-safe, we would have had to produce a lock for the stream, which in
34/// practice would have meant always locking the stream before reading from it. But more fundamentally, it
35/// felt wrong in practice that observation of a value at runtime could affect other values which happen to
36/// alias the same stream. By separating these, we don't have this effect. Instead, variables could get
37/// concrete list values rather than streams, and be able to view them without non-local effects.
38///
39/// * A balance of the two approaches is what we've landed on: Values are thread-safe to pass, and we can stream
40///   them into any sources. Streams are still available to model the infinite streams approach of original
41///   Nushell.
42#[derive(Debug)]
43pub enum PipelineData {
44    Empty,
45    Value(Value, Option<PipelineMetadata>),
46    ListStream(ListStream, Option<PipelineMetadata>),
47    ByteStream(ByteStream, Option<PipelineMetadata>),
48}
49
50impl PipelineData {
51    pub const fn empty() -> PipelineData {
52        PipelineData::Empty
53    }
54
55    pub fn value(val: Value, metadata: impl Into<Option<PipelineMetadata>>) -> Self {
56        PipelineData::Value(val, metadata.into())
57    }
58
59    pub fn list_stream(stream: ListStream, metadata: impl Into<Option<PipelineMetadata>>) -> Self {
60        PipelineData::ListStream(stream, metadata.into())
61    }
62
63    pub fn byte_stream(stream: ByteStream, metadata: impl Into<Option<PipelineMetadata>>) -> Self {
64        PipelineData::ByteStream(stream, metadata.into())
65    }
66
67    pub fn metadata(&self) -> Option<PipelineMetadata> {
68        match self {
69            PipelineData::Empty => None,
70            PipelineData::Value(_, meta)
71            | PipelineData::ListStream(_, meta)
72            | PipelineData::ByteStream(_, meta) => meta.clone(),
73        }
74    }
75
76    pub fn set_metadata(mut self, metadata: Option<PipelineMetadata>) -> Self {
77        match &mut self {
78            PipelineData::Empty => {}
79            PipelineData::Value(_, meta)
80            | PipelineData::ListStream(_, meta)
81            | PipelineData::ByteStream(_, meta) => *meta = metadata,
82        }
83        self
84    }
85
86    pub fn is_nothing(&self) -> bool {
87        matches!(self, PipelineData::Value(Value::Nothing { .. }, ..))
88            || matches!(self, PipelineData::Empty)
89    }
90
91    /// PipelineData doesn't always have a Span, but we can try!
92    pub fn span(&self) -> Option<Span> {
93        match self {
94            PipelineData::Empty => None,
95            PipelineData::Value(value, ..) => Some(value.span()),
96            PipelineData::ListStream(stream, ..) => Some(stream.span()),
97            PipelineData::ByteStream(stream, ..) => Some(stream.span()),
98        }
99    }
100
101    /// Change the span of the [`PipelineData`].
102    ///
103    /// Returns `Value(Nothing)` with the given span if it was [`PipelineData::empty()`].
104    pub fn with_span(self, span: Span) -> Self {
105        match self {
106            PipelineData::Empty => PipelineData::value(Value::nothing(span), None),
107            PipelineData::Value(value, metadata) => {
108                PipelineData::value(value.with_span(span), metadata)
109            }
110            PipelineData::ListStream(stream, metadata) => {
111                PipelineData::list_stream(stream.with_span(span), metadata)
112            }
113            PipelineData::ByteStream(stream, metadata) => {
114                PipelineData::byte_stream(stream.with_span(span), metadata)
115            }
116        }
117    }
118
119    /// Get a type that is representative of the `PipelineData`.
120    ///
121    /// The type returned here makes no effort to collect a stream, so it may be a different type
122    /// than would be returned by [`Value::get_type()`] on the result of
123    /// [`.into_value()`](Self::into_value).
124    ///
125    /// Specifically, a `ListStream` results in `list<any>` rather than
126    /// the fully complete [`list`](Type::List) type (which would require knowing the contents),
127    /// and a `ByteStream` with [unknown](crate::ByteStreamType::Unknown) type results in
128    /// [`any`](Type::Any) rather than [`string`](Type::String) or [`binary`](Type::Binary).
129    pub fn get_type(&self) -> Type {
130        match self {
131            PipelineData::Empty => Type::Nothing,
132            PipelineData::Value(value, _) => value.get_type(),
133            PipelineData::ListStream(_, _) => Type::list(Type::Any),
134            PipelineData::ByteStream(stream, _) => stream.type_().into(),
135        }
136    }
137
138    /// Determine if the `PipelineData` is a [subtype](https://en.wikipedia.org/wiki/Subtyping) of `other`.
139    ///
140    /// This check makes no effort to collect a stream, so it may be a different result
141    /// than would be returned by calling [`Value::is_subtype_of()`] on the result of
142    /// [`.into_value()`](Self::into_value).
143    ///
144    /// A `ListStream` acts the same as an empty list type: it is a subtype of any [`list`](Type::List)
145    /// or [`table`](Type::Table) type. After converting to a value, it may become a more specific type.
146    /// For example, a `ListStream` is a subtype of `list<int>` and `list<string>`.
147    /// If calling [`.into_value()`](Self::into_value) results in a `list<int>`,
148    /// then the value would not be a subtype of `list<string>`, in contrast to the original `ListStream`.
149    ///
150    /// A `ByteStream` is a subtype of [`string`](Type::String) if it is coercible into a string.
151    /// Likewise, a `ByteStream` is a subtype of [`binary`](Type::Binary) if it is coercible into a binary value.
152    pub fn is_subtype_of(&self, other: &Type) -> bool {
153        match (self, other) {
154            (_, Type::Any) => true,
155            (PipelineData::Empty, Type::Nothing) => true,
156            (PipelineData::Value(val, ..), ty) => val.is_subtype_of(ty),
157
158            // a list stream could be a list with any type, including a table
159            (PipelineData::ListStream(..), Type::List(..) | Type::Table(..)) => true,
160
161            (PipelineData::ByteStream(stream, ..), Type::String)
162                if stream.type_().is_string_coercible() =>
163            {
164                true
165            }
166            (PipelineData::ByteStream(stream, ..), Type::Binary)
167                if stream.type_().is_binary_coercible() =>
168            {
169                true
170            }
171
172            (PipelineData::Empty, _) => false,
173            (PipelineData::ListStream(..), _) => false,
174            (PipelineData::ByteStream(..), _) => false,
175        }
176    }
177
178    pub fn into_value(self, span: Span) -> Result<Value, ShellError> {
179        match self {
180            PipelineData::Empty => Ok(Value::nothing(span)),
181            PipelineData::Value(value, ..) => {
182                if value.span() == Span::unknown() {
183                    Ok(value.with_span(span))
184                } else {
185                    Ok(value)
186                }
187            }
188            PipelineData::ListStream(stream, ..) => Ok(stream.into_value()),
189            PipelineData::ByteStream(stream, ..) => stream.into_value(),
190        }
191    }
192
193    /// Converts any `Value` variant that can be represented as a stream into its stream variant.
194    ///
195    /// This means that lists and ranges are converted into list streams, and strings and binary are
196    /// converted into byte streams.
197    ///
198    /// Returns an `Err` with the original stream if the variant couldn't be converted to a stream
199    /// variant. If the variant is already a stream variant, it is returned as-is.
200    pub fn try_into_stream(self, engine_state: &EngineState) -> Result<PipelineData, PipelineData> {
201        let span = self.span().unwrap_or(Span::unknown());
202        match self {
203            PipelineData::ListStream(..) | PipelineData::ByteStream(..) => Ok(self),
204            PipelineData::Value(Value::List { .. } | Value::Range { .. }, ref metadata) => {
205                let metadata = metadata.clone();
206                Ok(PipelineData::list_stream(
207                    ListStream::new(self.into_iter(), span, engine_state.signals().clone()),
208                    metadata,
209                ))
210            }
211            PipelineData::Value(Value::String { val, .. }, metadata) => {
212                Ok(PipelineData::byte_stream(
213                    ByteStream::read_string(val, span, engine_state.signals().clone()),
214                    metadata,
215                ))
216            }
217            PipelineData::Value(Value::Binary { val, .. }, metadata) => {
218                Ok(PipelineData::byte_stream(
219                    ByteStream::read_binary(val, span, engine_state.signals().clone()),
220                    metadata,
221                ))
222            }
223            _ => Err(self),
224        }
225    }
226
227    /// Drain and write this [`PipelineData`] to `dest`.
228    ///
229    /// Values are converted to bytes and separated by newlines if this is a `ListStream`.
230    pub fn write_to(self, mut dest: impl Write) -> Result<(), ShellError> {
231        match self {
232            PipelineData::Empty => Ok(()),
233            PipelineData::Value(value, ..) => {
234                let bytes = value_to_bytes(value)?;
235                dest.write_all(&bytes).map_err(|err| {
236                    IoError::new_internal(
237                        err,
238                        "Could not write PipelineData to dest",
239                        crate::location!(),
240                    )
241                })?;
242                dest.flush().map_err(|err| {
243                    IoError::new_internal(
244                        err,
245                        "Could not flush PipelineData to dest",
246                        crate::location!(),
247                    )
248                })?;
249                Ok(())
250            }
251            PipelineData::ListStream(stream, ..) => {
252                for value in stream {
253                    let bytes = value_to_bytes(value)?;
254                    dest.write_all(&bytes).map_err(|err| {
255                        IoError::new_internal(
256                            err,
257                            "Could not write PipelineData to dest",
258                            crate::location!(),
259                        )
260                    })?;
261                    dest.write_all(b"\n").map_err(|err| {
262                        IoError::new_internal(
263                            err,
264                            "Could not write linebreak after PipelineData to dest",
265                            crate::location!(),
266                        )
267                    })?;
268                }
269                dest.flush().map_err(|err| {
270                    IoError::new_internal(
271                        err,
272                        "Could not flush PipelineData to dest",
273                        crate::location!(),
274                    )
275                })?;
276                Ok(())
277            }
278            PipelineData::ByteStream(stream, ..) => stream.write_to(dest),
279        }
280    }
281
282    /// Drain this [`PipelineData`] according to the current stdout [`OutDest`]s in `stack`.
283    ///
284    /// For [`OutDest::Pipe`] and [`OutDest::PipeSeparate`], this will return the [`PipelineData`]
285    /// as is. For [`OutDest::Value`], this will collect into a value and return it. For
286    /// [`OutDest::Print`], the [`PipelineData`] is drained and printed. Otherwise, the
287    /// [`PipelineData`] is drained, but only printed if it is the output of an external command.
288    pub fn drain_to_out_dests(
289        self,
290        engine_state: &EngineState,
291        stack: &mut Stack,
292    ) -> Result<Self, ShellError> {
293        match stack.pipe_stdout().unwrap_or(&OutDest::Inherit) {
294            OutDest::Print => {
295                self.print_table(engine_state, stack, false, false)?;
296                Ok(Self::Empty)
297            }
298            OutDest::Pipe | OutDest::PipeSeparate => Ok(self),
299            OutDest::Value => {
300                let metadata = self.metadata();
301                let span = self.span().unwrap_or(Span::unknown());
302                self.into_value(span).map(|val| Self::Value(val, metadata))
303            }
304            OutDest::File(file) => {
305                self.write_to(file.as_ref())?;
306                Ok(Self::Empty)
307            }
308            OutDest::Null | OutDest::Inherit => {
309                self.drain()?;
310                Ok(Self::Empty)
311            }
312        }
313    }
314
315    pub fn drain(self) -> Result<(), ShellError> {
316        match self {
317            Self::Empty => Ok(()),
318            Self::Value(Value::Error { error, .. }, ..) => Err(*error),
319            Self::Value(..) => Ok(()),
320            Self::ListStream(stream, ..) => stream.drain(),
321            Self::ByteStream(stream, ..) => stream.drain(),
322        }
323    }
324
325    /// Try convert from self into iterator
326    ///
327    /// It returns Err if the `self` cannot be converted to an iterator.
328    ///
329    /// The `span` should be the span of the command or operation that would raise an error.
330    pub fn into_iter_strict(self, span: Span) -> Result<PipelineIterator, ShellError> {
331        Ok(PipelineIterator(match self {
332            PipelineData::Value(value, ..) => {
333                let val_span = value.span();
334                match value {
335                    Value::List { vals, .. } => PipelineIteratorInner::ListStream(
336                        ListStream::new(vals.into_iter(), val_span, Signals::empty()).into_iter(),
337                    ),
338                    Value::Binary { val, .. } => PipelineIteratorInner::ListStream(
339                        ListStream::new(
340                            val.into_iter().map(move |x| Value::int(x as i64, val_span)),
341                            val_span,
342                            Signals::empty(),
343                        )
344                        .into_iter(),
345                    ),
346                    Value::Range { val, .. } => PipelineIteratorInner::ListStream(
347                        ListStream::new(
348                            val.into_range_iter(val_span, Signals::empty()),
349                            val_span,
350                            Signals::empty(),
351                        )
352                        .into_iter(),
353                    ),
354                    // Propagate errors by explicitly matching them before the final case.
355                    Value::Error { error, .. } => return Err(*error),
356                    other => {
357                        return Err(ShellError::OnlySupportsThisInputType {
358                            exp_input_type: "list, binary, range, or byte stream".into(),
359                            wrong_type: other.get_type().to_string(),
360                            dst_span: span,
361                            src_span: val_span,
362                        });
363                    }
364                }
365            }
366            PipelineData::ListStream(stream, ..) => {
367                PipelineIteratorInner::ListStream(stream.into_iter())
368            }
369            PipelineData::Empty => {
370                return Err(ShellError::OnlySupportsThisInputType {
371                    exp_input_type: "list, binary, range, or byte stream".into(),
372                    wrong_type: "null".into(),
373                    dst_span: span,
374                    src_span: span,
375                });
376            }
377            PipelineData::ByteStream(stream, ..) => {
378                if let Some(chunks) = stream.chunks() {
379                    PipelineIteratorInner::ByteStream(chunks)
380                } else {
381                    PipelineIteratorInner::Empty
382                }
383            }
384        }))
385    }
386
387    pub fn collect_string(self, separator: &str, config: &Config) -> Result<String, ShellError> {
388        match self {
389            PipelineData::Empty => Ok(String::new()),
390            PipelineData::Value(value, ..) => Ok(value.to_expanded_string(separator, config)),
391            PipelineData::ListStream(stream, ..) => Ok(stream.into_string(separator, config)),
392            PipelineData::ByteStream(stream, ..) => stream.into_string(),
393        }
394    }
395
396    /// Retrieves string from pipeline data.
397    ///
398    /// As opposed to `collect_string` this raises error rather than converting non-string values.
399    /// The `span` will be used if `ListStream` is encountered since it doesn't carry a span.
400    pub fn collect_string_strict(
401        self,
402        span: Span,
403    ) -> Result<(String, Span, Option<PipelineMetadata>), ShellError> {
404        match self {
405            PipelineData::Empty => Ok((String::new(), span, None)),
406            PipelineData::Value(Value::String { val, .. }, metadata) => Ok((val, span, metadata)),
407            PipelineData::Value(val, ..) => Err(ShellError::TypeMismatch {
408                err_message: "string".into(),
409                span: val.span(),
410            }),
411            PipelineData::ListStream(..) => Err(ShellError::TypeMismatch {
412                err_message: "string".into(),
413                span,
414            }),
415            PipelineData::ByteStream(stream, metadata) => {
416                let span = stream.span();
417                Ok((stream.into_string()?, span, metadata))
418            }
419        }
420    }
421
422    pub fn follow_cell_path(
423        self,
424        cell_path: &[PathMember],
425        head: Span,
426    ) -> Result<Value, ShellError> {
427        match self {
428            // FIXME: there are probably better ways of doing this
429            PipelineData::ListStream(stream, ..) => Value::list(stream.into_iter().collect(), head)
430                .follow_cell_path(cell_path)
431                .map(Cow::into_owned),
432            PipelineData::Value(v, ..) => v.follow_cell_path(cell_path).map(Cow::into_owned),
433            PipelineData::Empty => Err(ShellError::IncompatiblePathAccess {
434                type_name: "empty pipeline".to_string(),
435                span: head,
436            }),
437            PipelineData::ByteStream(stream, ..) => Err(ShellError::IncompatiblePathAccess {
438                type_name: stream.type_().describe().to_owned(),
439                span: stream.span(),
440            }),
441        }
442    }
443
444    /// Simplified mapper to help with simple values also. For full iterator support use `.into_iter()` instead
445    pub fn map<F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
446    where
447        Self: Sized,
448        F: FnMut(Value) -> Value + 'static + Send,
449    {
450        match self {
451            PipelineData::Value(value, metadata) => {
452                let span = value.span();
453                let pipeline = match value {
454                    Value::List { vals, .. } => vals
455                        .into_iter()
456                        .map(f)
457                        .into_pipeline_data(span, signals.clone()),
458                    Value::Range { val, .. } => val
459                        .into_range_iter(span, Signals::empty())
460                        .map(f)
461                        .into_pipeline_data(span, signals.clone()),
462                    value => match f(value) {
463                        Value::Error { error, .. } => return Err(*error),
464                        v => v.into_pipeline_data(),
465                    },
466                };
467                Ok(pipeline.set_metadata(metadata))
468            }
469            PipelineData::Empty => Ok(PipelineData::empty()),
470            PipelineData::ListStream(stream, metadata) => {
471                Ok(PipelineData::list_stream(stream.map(f), metadata))
472            }
473            PipelineData::ByteStream(stream, metadata) => {
474                Ok(f(stream.into_value()?).into_pipeline_data_with_metadata(metadata))
475            }
476        }
477    }
478
479    /// Simplified flatmapper. For full iterator support use `.into_iter()` instead
480    pub fn flat_map<U, F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
481    where
482        Self: Sized,
483        U: IntoIterator<Item = Value> + 'static,
484        <U as IntoIterator>::IntoIter: 'static + Send,
485        F: FnMut(Value) -> U + 'static + Send,
486    {
487        match self {
488            PipelineData::Empty => Ok(PipelineData::empty()),
489            PipelineData::Value(value, metadata) => {
490                let span = value.span();
491                let pipeline = match value {
492                    Value::List { vals, .. } => vals
493                        .into_iter()
494                        .flat_map(f)
495                        .into_pipeline_data(span, signals.clone()),
496                    Value::Range { val, .. } => val
497                        .into_range_iter(span, Signals::empty())
498                        .flat_map(f)
499                        .into_pipeline_data(span, signals.clone()),
500                    value => f(value)
501                        .into_iter()
502                        .into_pipeline_data(span, signals.clone()),
503                };
504                Ok(pipeline.set_metadata(metadata))
505            }
506            PipelineData::ListStream(stream, metadata) => Ok(PipelineData::list_stream(
507                stream.modify(|iter| iter.flat_map(f)),
508                metadata,
509            )),
510            PipelineData::ByteStream(stream, metadata) => {
511                // TODO: is this behavior desired / correct ?
512                let span = stream.span();
513                let iter = match String::from_utf8(stream.into_bytes()?) {
514                    Ok(mut str) => {
515                        str.truncate(str.trim_end_matches(LINE_ENDING_PATTERN).len());
516                        f(Value::string(str, span))
517                    }
518                    Err(err) => f(Value::binary(err.into_bytes(), span)),
519                };
520                Ok(iter.into_iter().into_pipeline_data_with_metadata(
521                    span,
522                    signals.clone(),
523                    metadata,
524                ))
525            }
526        }
527    }
528
529    pub fn filter<F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
530    where
531        Self: Sized,
532        F: FnMut(&Value) -> bool + 'static + Send,
533    {
534        match self {
535            PipelineData::Empty => Ok(PipelineData::empty()),
536            PipelineData::Value(value, metadata) => {
537                let span = value.span();
538                let pipeline = match value {
539                    Value::List { vals, .. } => vals
540                        .into_iter()
541                        .filter(f)
542                        .into_pipeline_data(span, signals.clone()),
543                    Value::Range { val, .. } => val
544                        .into_range_iter(span, Signals::empty())
545                        .filter(f)
546                        .into_pipeline_data(span, signals.clone()),
547                    value => {
548                        if f(&value) {
549                            value.into_pipeline_data()
550                        } else {
551                            Value::nothing(span).into_pipeline_data()
552                        }
553                    }
554                };
555                Ok(pipeline.set_metadata(metadata))
556            }
557            PipelineData::ListStream(stream, metadata) => Ok(PipelineData::list_stream(
558                stream.modify(|iter| iter.filter(f)),
559                metadata,
560            )),
561            PipelineData::ByteStream(stream, metadata) => {
562                // TODO: is this behavior desired / correct ?
563                let span = stream.span();
564                let value = match String::from_utf8(stream.into_bytes()?) {
565                    Ok(mut str) => {
566                        str.truncate(str.trim_end_matches(LINE_ENDING_PATTERN).len());
567                        Value::string(str, span)
568                    }
569                    Err(err) => Value::binary(err.into_bytes(), span),
570                };
571                let value = if f(&value) {
572                    value
573                } else {
574                    Value::nothing(span)
575                };
576                Ok(value.into_pipeline_data_with_metadata(metadata))
577            }
578        }
579    }
580
581    /// Try to convert Value from Value::Range to Value::List.
582    /// This is useful to expand Value::Range into array notation, specifically when
583    /// converting `to json` or `to nuon`.
584    /// `1..3 | to XX -> [1,2,3]`
585    pub fn try_expand_range(self) -> Result<PipelineData, ShellError> {
586        match self {
587            PipelineData::Value(v, metadata) => {
588                let span = v.span();
589                match v {
590                    Value::Range { val, .. } => {
591                        match *val {
592                            Range::IntRange(range) => {
593                                if range.is_unbounded() {
594                                    return Err(ShellError::GenericError {
595                                        error: "Cannot create range".into(),
596                                        msg: "Unbounded ranges are not allowed when converting to this format".into(),
597                                        span: Some(span),
598                                        help: Some("Consider using ranges with valid start and end point.".into()),
599                                        inner: vec![],
600                                    });
601                                }
602                            }
603                            Range::FloatRange(range) => {
604                                if range.is_unbounded() {
605                                    return Err(ShellError::GenericError {
606                                        error: "Cannot create range".into(),
607                                        msg: "Unbounded ranges are not allowed when converting to this format".into(),
608                                        span: Some(span),
609                                        help: Some("Consider using ranges with valid start and end point.".into()),
610                                        inner: vec![],
611                                    });
612                                }
613                            }
614                        }
615                        let range_values: Vec<Value> =
616                            val.into_range_iter(span, Signals::empty()).collect();
617                        Ok(PipelineData::value(Value::list(range_values, span), None))
618                    }
619                    x => Ok(PipelineData::value(x, metadata)),
620                }
621            }
622            _ => Ok(self),
623        }
624    }
625
626    /// Consume and print self data immediately, formatted using table command.
627    ///
628    /// This does not respect the display_output hook. If a value is being printed out by a command,
629    /// this function should be used. Otherwise, `nu_cli::util::print_pipeline` should be preferred.
630    ///
631    /// `no_newline` controls if we need to attach newline character to output.
632    /// `to_stderr` controls if data is output to stderr, when the value is false, the data is output to stdout.
633    pub fn print_table(
634        self,
635        engine_state: &EngineState,
636        stack: &mut Stack,
637        no_newline: bool,
638        to_stderr: bool,
639    ) -> Result<(), ShellError> {
640        match self {
641            // Print byte streams directly as long as they aren't binary.
642            PipelineData::ByteStream(stream, ..) if stream.type_() != ByteStreamType::Binary => {
643                stream.print(to_stderr)
644            }
645            _ => {
646                // If the table function is in the declarations, then we can use it
647                // to create the table value that will be printed in the terminal
648                if let Some(decl_id) = engine_state.table_decl_id {
649                    let command = engine_state.get_decl(decl_id);
650                    if command.block_id().is_some() {
651                        self.write_all_and_flush(engine_state, no_newline, to_stderr)
652                    } else {
653                        let call = Call::new(Span::new(0, 0));
654                        let table = command.run(engine_state, stack, &(&call).into(), self)?;
655                        table.write_all_and_flush(engine_state, no_newline, to_stderr)
656                    }
657                } else {
658                    self.write_all_and_flush(engine_state, no_newline, to_stderr)
659                }
660            }
661        }
662    }
663
664    /// Consume and print self data without any extra formatting.
665    ///
666    /// This does not use the `table` command to format data, and also prints binary values and
667    /// streams in their raw format without generating a hexdump first.
668    ///
669    /// `no_newline` controls if we need to attach newline character to output.
670    /// `to_stderr` controls if data is output to stderr, when the value is false, the data is output to stdout.
671    pub fn print_raw(
672        self,
673        engine_state: &EngineState,
674        no_newline: bool,
675        to_stderr: bool,
676    ) -> Result<(), ShellError> {
677        let span = self.span();
678        if let PipelineData::Value(Value::Binary { val: bytes, .. }, _) = self {
679            if to_stderr {
680                write_all_and_flush(
681                    bytes,
682                    &mut std::io::stderr().lock(),
683                    "stderr",
684                    span,
685                    engine_state.signals(),
686                )?;
687            } else {
688                write_all_and_flush(
689                    bytes,
690                    &mut std::io::stdout().lock(),
691                    "stdout",
692                    span,
693                    engine_state.signals(),
694                )?;
695            }
696            Ok(())
697        } else {
698            self.write_all_and_flush(engine_state, no_newline, to_stderr)
699        }
700    }
701
702    fn write_all_and_flush(
703        self,
704        engine_state: &EngineState,
705        no_newline: bool,
706        to_stderr: bool,
707    ) -> Result<(), ShellError> {
708        let span = self.span();
709        if let PipelineData::ByteStream(stream, ..) = self {
710            // Copy ByteStreams directly
711            stream.print(to_stderr)
712        } else {
713            let config = engine_state.get_config();
714            for item in self {
715                let mut out = if let Value::Error { error, .. } = item {
716                    return Err(*error);
717                } else {
718                    item.to_expanded_string("\n", config)
719                };
720
721                if !no_newline {
722                    out.push('\n');
723                }
724
725                if to_stderr {
726                    write_all_and_flush(
727                        out,
728                        &mut std::io::stderr().lock(),
729                        "stderr",
730                        span,
731                        engine_state.signals(),
732                    )?;
733                } else {
734                    write_all_and_flush(
735                        out,
736                        &mut std::io::stdout().lock(),
737                        "stdout",
738                        span,
739                        engine_state.signals(),
740                    )?;
741                }
742            }
743
744            Ok(())
745        }
746    }
747
748    pub fn unsupported_input_error(
749        self,
750        expected_type: impl Into<String>,
751        span: Span,
752    ) -> ShellError {
753        match self {
754            PipelineData::Empty => ShellError::PipelineEmpty { dst_span: span },
755            PipelineData::Value(value, ..) => ShellError::OnlySupportsThisInputType {
756                exp_input_type: expected_type.into(),
757                wrong_type: value.get_type().get_non_specified_string(),
758                dst_span: span,
759                src_span: value.span(),
760            },
761            PipelineData::ListStream(stream, ..) => ShellError::OnlySupportsThisInputType {
762                exp_input_type: expected_type.into(),
763                wrong_type: "list (stream)".into(),
764                dst_span: span,
765                src_span: stream.span(),
766            },
767            PipelineData::ByteStream(stream, ..) => ShellError::OnlySupportsThisInputType {
768                exp_input_type: expected_type.into(),
769                wrong_type: stream.type_().describe().into(),
770                dst_span: span,
771                src_span: stream.span(),
772            },
773        }
774    }
775}
776
777pub fn write_all_and_flush<T>(
778    data: T,
779    destination: &mut impl Write,
780    destination_name: &str,
781    span: Option<Span>,
782    signals: &Signals,
783) -> Result<(), ShellError>
784where
785    T: AsRef<[u8]>,
786{
787    let io_error_map = |err: std::io::Error, location: Location| {
788        let context = format!("Writing to {destination_name} failed");
789        match span {
790            None => IoError::new_internal(err, context, location),
791            Some(span) if span == Span::unknown() => IoError::new_internal(err, context, location),
792            Some(span) => IoError::new_with_additional_context(err, span, None, context),
793        }
794    };
795
796    let span = span.unwrap_or(Span::unknown());
797    const OUTPUT_CHUNK_SIZE: usize = 8192;
798    for chunk in data.as_ref().chunks(OUTPUT_CHUNK_SIZE) {
799        signals.check(&span)?;
800        destination
801            .write_all(chunk)
802            .map_err(|err| io_error_map(err, location!()))?;
803    }
804    destination
805        .flush()
806        .map_err(|err| io_error_map(err, location!()))?;
807    Ok(())
808}
809
810enum PipelineIteratorInner {
811    Empty,
812    Value(Value),
813    ListStream(crate::list_stream::IntoIter),
814    ByteStream(crate::byte_stream::Chunks),
815}
816
817pub struct PipelineIterator(PipelineIteratorInner);
818
819impl IntoIterator for PipelineData {
820    type Item = Value;
821
822    type IntoIter = PipelineIterator;
823
824    fn into_iter(self) -> Self::IntoIter {
825        PipelineIterator(match self {
826            PipelineData::Empty => PipelineIteratorInner::Empty,
827            PipelineData::Value(value, ..) => {
828                let span = value.span();
829                match value {
830                    Value::List { vals, .. } => PipelineIteratorInner::ListStream(
831                        ListStream::new(vals.into_iter(), span, Signals::empty()).into_iter(),
832                    ),
833                    Value::Range { val, .. } => PipelineIteratorInner::ListStream(
834                        ListStream::new(
835                            val.into_range_iter(span, Signals::empty()),
836                            span,
837                            Signals::empty(),
838                        )
839                        .into_iter(),
840                    ),
841                    x => PipelineIteratorInner::Value(x),
842                }
843            }
844            PipelineData::ListStream(stream, ..) => {
845                PipelineIteratorInner::ListStream(stream.into_iter())
846            }
847            PipelineData::ByteStream(stream, ..) => stream.chunks().map_or(
848                PipelineIteratorInner::Empty,
849                PipelineIteratorInner::ByteStream,
850            ),
851        })
852    }
853}
854
855impl Iterator for PipelineIterator {
856    type Item = Value;
857
858    fn next(&mut self) -> Option<Self::Item> {
859        match &mut self.0 {
860            PipelineIteratorInner::Empty => None,
861            PipelineIteratorInner::Value(Value::Nothing { .. }, ..) => None,
862            PipelineIteratorInner::Value(v, ..) => Some(std::mem::take(v)),
863            PipelineIteratorInner::ListStream(stream, ..) => stream.next(),
864            PipelineIteratorInner::ByteStream(stream) => stream.next().map(|x| match x {
865                Ok(x) => x,
866                Err(err) => Value::error(
867                    err,
868                    Span::unknown(), //FIXME: unclear where this span should come from
869                ),
870            }),
871        }
872    }
873}
874
875pub trait IntoPipelineData {
876    fn into_pipeline_data(self) -> PipelineData;
877
878    fn into_pipeline_data_with_metadata(
879        self,
880        metadata: impl Into<Option<PipelineMetadata>>,
881    ) -> PipelineData;
882}
883
884impl<V> IntoPipelineData for V
885where
886    V: Into<Value>,
887{
888    fn into_pipeline_data(self) -> PipelineData {
889        PipelineData::value(self.into(), None)
890    }
891
892    fn into_pipeline_data_with_metadata(
893        self,
894        metadata: impl Into<Option<PipelineMetadata>>,
895    ) -> PipelineData {
896        PipelineData::value(self.into(), metadata.into())
897    }
898}
899
900pub trait IntoInterruptiblePipelineData {
901    fn into_pipeline_data(self, span: Span, signals: Signals) -> PipelineData;
902    fn into_pipeline_data_with_metadata(
903        self,
904        span: Span,
905        signals: Signals,
906        metadata: impl Into<Option<PipelineMetadata>>,
907    ) -> PipelineData;
908}
909
910impl<I> IntoInterruptiblePipelineData for I
911where
912    I: IntoIterator + Send + 'static,
913    I::IntoIter: Send + 'static,
914    <I::IntoIter as Iterator>::Item: Into<Value>,
915{
916    fn into_pipeline_data(self, span: Span, signals: Signals) -> PipelineData {
917        ListStream::new(self.into_iter().map(Into::into), span, signals).into()
918    }
919
920    fn into_pipeline_data_with_metadata(
921        self,
922        span: Span,
923        signals: Signals,
924        metadata: impl Into<Option<PipelineMetadata>>,
925    ) -> PipelineData {
926        PipelineData::list_stream(
927            ListStream::new(self.into_iter().map(Into::into), span, signals),
928            metadata.into(),
929        )
930    }
931}
932
933fn value_to_bytes(value: Value) -> Result<Vec<u8>, ShellError> {
934    let bytes = match value {
935        Value::String { val, .. } => val.into_bytes(),
936        Value::Binary { val, .. } => val,
937        Value::List { vals, .. } => {
938            let val = vals
939                .into_iter()
940                .map(Value::coerce_into_string)
941                .collect::<Result<Vec<String>, ShellError>>()?
942                .join("\n")
943                + "\n";
944
945            val.into_bytes()
946        }
947        // Propagate errors by explicitly matching them before the final case.
948        Value::Error { error, .. } => return Err(*error),
949        value => value.coerce_into_string()?.into_bytes(),
950    };
951    Ok(bytes)
952}